Skip to content

Commit d5a72a7

Browse files
vlerkinwvengen
andauthored
K8s API connection tweaks, logging configuration (PR #39)
* remove limited reconnection attempts; cap backoff time with 15 min; configure logs in a separate file logging_config.py that is imported to the __main__.py to trigger logs configuration before other modules inheret it; add debug logs to the watcher to better monitor each step * refactor code to make config import and logging configuration separate modules; implement logging to be configured globally but keep separate logger for each module; add level for logging to be configurable in the config file --------- Co-authored-by: wvengen <[email protected]>
1 parent b65050e commit d5a72a7

File tree

8 files changed

+32
-32
lines changed

8 files changed

+32
-32
lines changed

CONFIG.md

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ stick to [scrapyd's configuration](https://scrapyd.readthedocs.io/en/latest/conf
1212
* `launcher` - Python class for managing jobs on the cluster, defaults to `scrapyd_k8s.launcher.K8s`
1313
* `username` - Set this and `password` to enable basic authentication ([](https://scrapyd.readthedocs.io/en/latest/config.html#username))
1414
* `password` - Set this and `username` to enable basic authentication ([](https://scrapyd.readthedocs.io/en/latest/config.html#password))
15+
* `log_level` - Log level, defaults to `INFO`
1516

1617
The Docker and Kubernetes launchers have their own additional options.
1718

scrapyd_k8s/__main__.py

-12
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,4 @@
1-
import logging
2-
import sys
31
from .api import run
42

5-
def setup_logging():
6-
logging.basicConfig(
7-
level=logging.INFO,
8-
format='%(asctime)s %(name)s [%(levelname)s]: %(message)s',
9-
handlers=[
10-
logging.StreamHandler(sys.stdout)
11-
]
12-
)
13-
143
if __name__ == "__main__":
15-
setup_logging()
164
run()

scrapyd_k8s/api.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,22 @@
11
#!/usr/bin/env python3
2-
import uuid
32
import logging
4-
3+
import uuid
54
from flask import Flask, request, Response, jsonify
65
from flask_basicauth import BasicAuth
76
from natsort import natsort_keygen, ns
87

8+
# setup logging before anything else
99
from .config import Config
10+
from .logging import setup_logging
11+
config = Config()
12+
log_level = config.scrapyd().get('log_level', 'INFO')
13+
setup_logging(log_level)
1014

1115
app = Flask(__name__)
12-
config = Config()
1316
repository = (config.repository_cls())(config)
1417
launcher = (config.launcher_cls())(config)
1518
scrapyd_config = config.scrapyd()
16-
logger = logging.getLogger(__name__)
19+
1720

1821
@app.get("/")
1922
def home():

scrapyd_k8s/joblogs/log_handler_k8s.py

-1
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,6 @@ def handle_events(self, event):
226226
-------
227227
None
228228
"""
229-
self.object_storage_provider = LibcloudObjectStorage(self.config)
230229
try:
231230

232231
pod = event['object']

scrapyd_k8s/k8s_resource_watcher.py

+7-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import threading
21
import logging
2+
import threading
33
import time
44
from kubernetes import client, watch
55
from typing import Callable, List
@@ -29,7 +29,6 @@ def __init__(self, namespace, config):
2929
Kubernetes namespace to watch pods in.
3030
"""
3131
self.namespace = namespace
32-
self.reconnection_attempts = int(config.scrapyd().get('reconnection_attempts', 5))
3332
self.backoff_time = int(config.scrapyd().get('backoff_time', 5))
3433
self.backoff_coefficient = int(config.scrapyd().get('backoff_coefficient', 2))
3534
self.subscribers: List[Callable] = []
@@ -90,8 +89,7 @@ def watch_pods(self):
9089

9190
logger.info(f"Started watching pods in namespace '{self.namespace}'.")
9291
backoff_time = self.backoff_time
93-
reconnection_attempts = self.reconnection_attempts
94-
while not self._stop_event.is_set() and reconnection_attempts > 0:
92+
while not self._stop_event.is_set():
9593
try:
9694
kwargs = {
9795
'namespace': self.namespace,
@@ -101,9 +99,10 @@ def watch_pods(self):
10199
kwargs['resource_version'] = resource_version
102100
first_event = True
103101
for event in w.stream(v1.list_namespaced_pod, **kwargs):
102+
logging.debug("Connected to the k8s API, received an event from k8s API")
104103
if first_event:
105104
# Reset reconnection attempts and backoff time upon successful reconnection
106-
reconnection_attempts = self.reconnection_attempts
105+
logging.debug("This is the first event in the stream in the established connection, setting reconnection attempts to default")
107106
backoff_time = self.backoff_time
108107
first_event = False # Ensure this only happens once per connection
109108
pod_name = event['object'].metadata.name
@@ -114,32 +113,29 @@ def watch_pods(self):
114113
except (urllib3.exceptions.ProtocolError,
115114
urllib3.exceptions.ReadTimeoutError,
116115
urllib3.exceptions.ConnectionError) as e:
117-
reconnection_attempts -= 1
118116
logger.exception(f"Encountered network error: {e}")
119117
logger.info(f"Retrying to watch pods after {backoff_time} seconds...")
120118
time.sleep(backoff_time)
121-
backoff_time *= self.backoff_coefficient
119+
backoff_time = min(backoff_time*self.backoff_coefficient, 900)
122120
except client.ApiException as e:
123121
# Resource version is too old and cannot be accessed anymore
124122
if e.status == 410:
125123
logger.error("Received 410 Gone error, resetting resource_version and restarting watch.")
126124
resource_version = None
127125
continue
128126
else:
129-
reconnection_attempts -= 1
130127
logger.exception(f"Encountered ApiException: {e}")
131128
logger.info(f"Retrying to watch pods after {backoff_time} seconds...")
132129
time.sleep(backoff_time)
133-
backoff_time *= self.backoff_coefficient
130+
backoff_time = min(backoff_time*self.backoff_coefficient, 900)
134131
except StopIteration:
135132
logger.info("Watch stream ended, restarting watch.")
136133
continue
137134
except Exception as e:
138-
reconnection_attempts -= 1
139135
logger.exception(f"Watcher encountered exception: {e}")
140136
logger.info(f"Retrying to watch pods after {backoff_time} seconds...")
141137
time.sleep(backoff_time)
142-
backoff_time *= self.backoff_coefficient
138+
backoff_time = min(backoff_time*self.backoff_coefficient, 900)
143139

144140

145141
def stop(self):

scrapyd_k8s/launcher/docker.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
import logging
12
import re
23
import socket
3-
import logging
44

55
import docker
66
from ..utils import format_iso_date_string, native_stringify_dict

scrapyd_k8s/logging.py

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import logging
2+
import sys
3+
4+
def setup_logging(log_level):
5+
level_name = str(log_level).upper()
6+
numeric_level = logging.getLevelName(level_name)
7+
if not isinstance(numeric_level, int):
8+
raise ValueError(f"Invalid log_level '{log_level}'.")
9+
logging.basicConfig(
10+
level=numeric_level,
11+
format='%(asctime)s %(name)s [%(levelname)s]: %(message)s',
12+
handlers=[logging.StreamHandler(sys.stdout)]
13+
)

scrapyd_k8s/object_storage/libcloud_driver.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import os
2-
import logging
32
import re
4-
5-
logger = logging.getLogger(__name__)
3+
import logging
64

75
from libcloud.storage.types import (
86
ObjectError,
@@ -11,6 +9,8 @@
119
)
1210
from libcloud.storage.providers import get_driver
1311

12+
logger = logging.getLogger(__name__)
13+
1414
class LibcloudObjectStorage:
1515
"""
1616
A class to interact with cloud object storage using Apache Libcloud.

0 commit comments

Comments
 (0)