Skip to content

Commit a857ae5

Browse files
authored
Kubernetes watcher tweaks (PR #41)
* change timeout_seconds parameter for k8s api connection from 0 to 300 to reset the connection periodically; add lock for events like subscibe, unsubscribe and event dispatching * change timeout_seconds watcher param back to 0 to try and hold connection forever
1 parent d5a72a7 commit a857ae5

File tree

1 file changed

+15
-11
lines changed

1 file changed

+15
-11
lines changed

scrapyd_k8s/k8s_resource_watcher.py

+15-11
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def __init__(self, namespace, config):
3535
self._stop_event = threading.Event()
3636
self.watcher_thread = threading.Thread(target=self.watch_pods, daemon=True)
3737
self.watcher_thread.start()
38+
self._lock = threading.Lock()
3839
logger.info(f"ResourceWatcher thread started for namespace '{self.namespace}'.")
3940

4041
def subscribe(self, callback: Callable):
@@ -46,9 +47,10 @@ def subscribe(self, callback: Callable):
4647
callback : Callable
4748
A function to call when an event is received.
4849
"""
49-
if callback not in self.subscribers:
50-
self.subscribers.append(callback)
51-
logger.debug(f"Subscriber {callback.__name__} added.")
50+
with self._lock:
51+
if callback not in self.subscribers:
52+
self.subscribers.append(callback)
53+
logger.debug(f"Subscriber {callback.__name__} added.")
5254

5355
def unsubscribe(self, callback: Callable):
5456
"""
@@ -59,9 +61,10 @@ def unsubscribe(self, callback: Callable):
5961
callback : Callable
6062
The subscriber function to remove.
6163
"""
62-
if callback in self.subscribers:
63-
self.subscribers.remove(callback)
64-
logger.debug(f"Subscriber {callback.__name__} removed.")
64+
with self._lock:
65+
if callback in self.subscribers:
66+
self.subscribers.remove(callback)
67+
logger.debug(f"Subscriber {callback.__name__} removed.")
6568

6669
def notify_subscribers(self, event: dict):
6770
"""
@@ -72,11 +75,12 @@ def notify_subscribers(self, event: dict):
7275
event : dict
7376
The Kubernetes event data.
7477
"""
75-
for subscriber in self.subscribers:
76-
try:
77-
subscriber(event)
78-
except Exception as e:
79-
logger.exception(f"Error notifying subscriber {subscriber.__name__}: {e}")
78+
with self._lock:
79+
for subscriber in self.subscribers:
80+
try:
81+
subscriber(event)
82+
except Exception as e:
83+
logger.exception(f"Error notifying subscriber {subscriber.__name__}: {e}")
8084

8185
def watch_pods(self):
8286
"""

0 commit comments

Comments
 (0)