From 3a6dd7ff32afce104c2173445bc6fc37b2304768 Mon Sep 17 00:00:00 2001 From: infatum Date: Wed, 8 Jan 2025 15:06:33 +0200 Subject: [PATCH 1/9] Fixed hanging infinitely websocket thread with WebSocketTimeoutException, forced to exit app on exception --- pybit/_websocket_stream.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pybit/_websocket_stream.py b/pybit/_websocket_stream.py index ff9714d..864fa88 100644 --- a/pybit/_websocket_stream.py +++ b/pybit/_websocket_stream.py @@ -255,7 +255,14 @@ def _on_pong(self): self._send_custom_ping() def _send_custom_ping(self): - self.ws.send(self.custom_ping_message) + try: + self.ws.send(self.custom_ping_message) + except websocket.WebSocketTimeoutException as error: + import sys + # Logging error and exiting hanging, non-reposing app (Let it fall). + logger.error(f"WebSocket {self.ws_name} not responding, error: {error}") + sys.exit() + def _send_initial_ping(self): """https://github.com/bybit-exchange/pybit/issues/164""" From f64f7a0ee8c670974ff6986b41f1196f1c41ac16 Mon Sep 17 00:00:00 2001 From: infatum Date: Wed, 8 Jan 2025 16:08:18 +0200 Subject: [PATCH 2/9] Added termination on too many reconnection attempts --- pybit/_websocket_stream.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pybit/_websocket_stream.py b/pybit/_websocket_stream.py index 864fa88..5ff4751 100644 --- a/pybit/_websocket_stream.py +++ b/pybit/_websocket_stream.py @@ -5,6 +5,7 @@ from ._http_manager import generate_signature import logging import copy +import sys from uuid import uuid4 from . import _helpers @@ -42,6 +43,7 @@ def __init__( self.domain = domain self.rsa_authentication = rsa_authentication self.demo = demo + self.terminate = False # Set API keys. self.api_key = api_key self.api_secret = api_secret @@ -173,12 +175,14 @@ def resubscribe_to_topics(): # If connection was not successful, raise error. if not infinitely_reconnect and retries <= 0: - self.exit() raise websocket.WebSocketTimeoutException( f"WebSocket {self.ws_name} ({self.endpoint}) connection " f"failed. Too many connection attempts. pybit will no " f"longer try to reconnect." ) + self.terminate = True + self.exit() + logger.info(f"WebSocket {self.ws_name} connected") @@ -258,7 +262,6 @@ def _send_custom_ping(self): try: self.ws.send(self.custom_ping_message) except websocket.WebSocketTimeoutException as error: - import sys # Logging error and exiting hanging, non-reposing app (Let it fall). logger.error(f"WebSocket {self.ws_name} not responding, error: {error}") sys.exit() @@ -296,6 +299,8 @@ def exit(self): while self.ws.sock: continue self.exited = True + if self.terminate: + sys.exit() class _V5WebSocketManager(_WebSocketManager): From 51988bfd99113099fe5e44b4e2d8a746c56c9fbd Mon Sep 17 00:00:00 2001 From: infatum Date: Wed, 8 Jan 2025 16:26:31 +0200 Subject: [PATCH 3/9] Added termination on too many reconnection attempts --- pybit/_websocket_stream.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pybit/_websocket_stream.py b/pybit/_websocket_stream.py index 5ff4751..9b1e22c 100644 --- a/pybit/_websocket_stream.py +++ b/pybit/_websocket_stream.py @@ -175,13 +175,14 @@ def resubscribe_to_topics(): # If connection was not successful, raise error. if not infinitely_reconnect and retries <= 0: + self.terminate = True + self.exit() raise websocket.WebSocketTimeoutException( f"WebSocket {self.ws_name} ({self.endpoint}) connection " f"failed. Too many connection attempts. pybit will no " f"longer try to reconnect." ) - self.terminate = True - self.exit() + logger.info(f"WebSocket {self.ws_name} connected") From 10dda0514df680eeaf4b3248d06e711362574021 Mon Sep 17 00:00:00 2001 From: infatum Date: Wed, 8 Jan 2025 17:10:20 +0200 Subject: [PATCH 4/9] FIXED: terminate implementation to SIGTERM kill --- pybit/_websocket_stream.py | 60 +++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/pybit/_websocket_stream.py b/pybit/_websocket_stream.py index 9b1e22c..c38fcba 100644 --- a/pybit/_websocket_stream.py +++ b/pybit/_websocket_stream.py @@ -1,3 +1,5 @@ +import os + import websocket import threading import time @@ -9,10 +11,8 @@ from uuid import uuid4 from . import _helpers - logger = logging.getLogger(__name__) - SUBDOMAIN_TESTNET = "stream-testnet" SUBDOMAIN_MAINNET = "stream" DEMO_SUBDOMAIN_TESTNET = "stream-demo-testnet" @@ -23,21 +23,21 @@ class _WebSocketManager: def __init__( - self, - callback_function, - ws_name, - testnet, - domain="", - demo=False, - rsa_authentication=False, - api_key=None, - api_secret=None, - ping_interval=20, - ping_timeout=10, - retries=10, - restart_on_error=True, - trace_logging=False, - private_auth_expire=1, + self, + callback_function, + ws_name, + testnet, + domain="", + demo=False, + rsa_authentication=False, + api_key=None, + api_secret=None, + ping_interval=20, + ping_timeout=10, + retries=10, + restart_on_error=True, + trace_logging=False, + private_auth_expire=1, ): self.testnet = testnet self.domain = domain @@ -52,7 +52,7 @@ def __init__( self.ws_name = ws_name if api_key: self.ws_name += " (Auth)" - + # Delta time for private auth expiration in seconds self.private_auth_expire = private_auth_expire @@ -144,7 +144,7 @@ def resubscribe_to_topics(): infinitely_reconnect = False while ( - infinitely_reconnect or retries > 0 + infinitely_reconnect or retries > 0 ) and not self.is_connected(): logger.info(f"WebSocket {self.ws_name} attempting connection...") self.ws = websocket.WebSocketApp( @@ -183,8 +183,6 @@ def resubscribe_to_topics(): f"longer try to reconnect." ) - - logger.info(f"WebSocket {self.ws_name} connected") # If given an api_key, authenticate. @@ -265,8 +263,8 @@ def _send_custom_ping(self): except websocket.WebSocketTimeoutException as error: # Logging error and exiting hanging, non-reposing app (Let it fall). logger.error(f"WebSocket {self.ws_name} not responding, error: {error}") - sys.exit() - + self.terminate = True + self.exit() def _send_initial_ping(self): """https://github.com/bybit-exchange/pybit/issues/164""" @@ -301,7 +299,9 @@ def exit(self): continue self.exited = True if self.terminate: - sys.exit() + import signal + p_id = os.getpid() + os.kill(p_id, signal.SIGTERM) class _V5WebSocketManager(_WebSocketManager): @@ -381,8 +381,8 @@ def _process_delta_orderbook(self, message, topic): # Make updates according to delta response. book_sides = {"b": message["data"]["b"], "a": message["data"]["a"]} - self.data[topic]["u"]=message["data"]["u"] - self.data[topic]["seq"]=message["data"]["seq"] + self.data[topic]["u"] = message["data"]["u"] + self.data[topic]["seq"] = message["data"]["seq"] for side, entries in book_sides.items(): for entry in entries: @@ -476,8 +476,8 @@ def _process_normal_message(self, message): def _handle_incoming_message(self, message): def is_auth_message(): if ( - message.get("op") == "auth" - or message.get("type") == "AUTH_RESP" + message.get("op") == "auth" + or message.get("type") == "AUTH_RESP" ): return True else: @@ -485,8 +485,8 @@ def is_auth_message(): def is_subscription_message(): if ( - message.get("op") == "subscribe" - or message.get("type") == "COMMAND_RESP" + message.get("op") == "subscribe" + or message.get("type") == "COMMAND_RESP" ): return True else: From 2be6494e852a2ad35205634103f1d8ff5d6a9fe9 Mon Sep 17 00:00:00 2001 From: infatum Date: Wed, 8 Jan 2025 17:51:49 +0200 Subject: [PATCH 5/9] Added some more error handling --- pybit/_websocket_stream.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pybit/_websocket_stream.py b/pybit/_websocket_stream.py index c38fcba..e423560 100644 --- a/pybit/_websocket_stream.py +++ b/pybit/_websocket_stream.py @@ -174,14 +174,17 @@ def resubscribe_to_topics(): break # If connection was not successful, raise error. - if not infinitely_reconnect and retries <= 0: + try: + if not infinitely_reconnect and retries <= 0: + raise websocket.WebSocketTimeoutException( + f"WebSocket {self.ws_name} ({self.endpoint}) connection " + f"failed. Too many connection attempts. pybit will no " + f"longer try to reconnect." + ) + except websocket.WebSocketTimeoutException as error: + logger.error(error) self.terminate = True self.exit() - raise websocket.WebSocketTimeoutException( - f"WebSocket {self.ws_name} ({self.endpoint}) connection " - f"failed. Too many connection attempts. pybit will no " - f"longer try to reconnect." - ) logger.info(f"WebSocket {self.ws_name} connected") From 0500d5a5837f12c9e90133c13a13a137d587c39f Mon Sep 17 00:00:00 2001 From: infatum Date: Sat, 18 Jan 2025 16:17:43 +0200 Subject: [PATCH 6/9] Added WebSocketConnectionClosedException catch --- pybit/_websocket_stream.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pybit/_websocket_stream.py b/pybit/_websocket_stream.py index e423560..d2ea2dd 100644 --- a/pybit/_websocket_stream.py +++ b/pybit/_websocket_stream.py @@ -12,6 +12,7 @@ from . import _helpers logger = logging.getLogger(__name__) +from websocket._exceptions import WebSocketConnectionClosedException SUBDOMAIN_TESTNET = "stream-testnet" SUBDOMAIN_MAINNET = "stream" @@ -263,7 +264,7 @@ def _on_pong(self): def _send_custom_ping(self): try: self.ws.send(self.custom_ping_message) - except websocket.WebSocketTimeoutException as error: + except WebSocketConnectionClosedException as error: # Logging error and exiting hanging, non-reposing app (Let it fall). logger.error(f"WebSocket {self.ws_name} not responding, error: {error}") self.terminate = True From a0c1e8bef78523f3f327faff9185e8a86b961f29 Mon Sep 17 00:00:00 2001 From: infatum Date: Fri, 24 Jan 2025 14:58:17 +0200 Subject: [PATCH 7/9] Added loging on exit --- pybit/_websocket_stream.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pybit/_websocket_stream.py b/pybit/_websocket_stream.py index d2ea2dd..b825c4b 100644 --- a/pybit/_websocket_stream.py +++ b/pybit/_websocket_stream.py @@ -305,6 +305,7 @@ def exit(self): if self.terminate: import signal p_id = os.getpid() + logger.error("Forcing kill after receiving critical error") os.kill(p_id, signal.SIGTERM) From 0eb8c94465a2e0b7b82aff0652fcae07dd2c26a7 Mon Sep 17 00:00:00 2001 From: infatum Date: Fri, 24 Jan 2025 15:39:32 +0200 Subject: [PATCH 8/9] Changed kill pid to 1 to force restarting the container --- pybit/_websocket_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pybit/_websocket_stream.py b/pybit/_websocket_stream.py index b825c4b..538e93d 100644 --- a/pybit/_websocket_stream.py +++ b/pybit/_websocket_stream.py @@ -306,7 +306,7 @@ def exit(self): import signal p_id = os.getpid() logger.error("Forcing kill after receiving critical error") - os.kill(p_id, signal.SIGTERM) + os.kill(1) class _V5WebSocketManager(_WebSocketManager): From 946926b49f94f00aef1843a4a9200b2206510f91 Mon Sep 17 00:00:00 2001 From: infatum Date: Sat, 25 Jan 2025 12:44:49 +0200 Subject: [PATCH 9/9] Added signal.SIGKILL --- pybit/_websocket_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pybit/_websocket_stream.py b/pybit/_websocket_stream.py index 538e93d..de73c4c 100644 --- a/pybit/_websocket_stream.py +++ b/pybit/_websocket_stream.py @@ -306,7 +306,7 @@ def exit(self): import signal p_id = os.getpid() logger.error("Forcing kill after receiving critical error") - os.kill(1) + os.kill(1, signal.SIGKILL) class _V5WebSocketManager(_WebSocketManager):