@@ -12,39 +12,46 @@ class KubernetesJobLogHandler:
12
12
"""
13
13
A class to handle Kubernetes job logs by watching pods, streaming logs, and uploading them to object storage.
14
14
15
- ...
15
+ This class:
16
+ - Observes Kubernetes pods for job-related events.
17
+ - Streams logs from running pods, storing them locally.
18
+ - Uploads completed job logs to object storage.
19
+ - Retrieves and concatenates log files as needed.
16
20
17
21
Attributes
18
22
----------
19
23
DEFAULT_BLOCK_SIZE : int
20
- Default size (in bytes) of blocks to read when retrieving the last N lines from a file.
24
+ Default size (in bytes) of blocks to read when retrieving lines from a file.
21
25
config : object
22
26
Configuration object containing settings for job logs and storage.
23
27
watcher_threads : dict
24
28
Dictionary to keep track of watcher threads for each pod.
25
- pod_tmp_mapping : dict
26
- Mapping of pod names to their temporary log file paths.
27
29
namespace : str
28
30
Kubernetes namespace to watch pods in.
29
31
num_lines_to_check : int
30
- Number of lines to check for matching logs when resuming log streaming .
32
+ Number of lines to check from the end of the existing log file to avoid duplicates .
31
33
object_storage_provider : LibcloudObjectStorage
32
34
Instance of the object storage provider for uploading logs.
33
35
34
36
Methods
35
37
-------
36
- start():
37
- Starts the pod watcher thread for job logs.
38
+ get_existing_log_filename(job_name):
39
+ Retrieves an existing temporary log file path for a given job name.
40
+
38
41
get_last_n_lines(file_path, num_lines):
39
42
Efficiently retrieves the last `num_lines` lines from a file.
43
+
40
44
concatenate_and_delete_files(main_file_path, temp_file_path, block_size=6144):
41
45
Concatenates a temporary file to the main log file and deletes the temporary file.
46
+
42
47
make_log_filename_for_job(job_name):
43
- Generates a unique temporary file path for storing logs of a job.
48
+ Ensures a log file exists for a given job and returns its path.
49
+
44
50
stream_logs(job_name):
45
- Streams logs from a Kubernetes pod and writes them to a file.
46
- watch_pods():
47
- Watches Kubernetes pods and handles events such as starting log streaming or uploading logs.
51
+ Streams logs from a Kubernetes pod corresponding to the given job name and writes them to a file.
52
+
53
+ handle_events(event):
54
+ Processes Kubernetes pod events to start log streaming or upload logs when pods complete.
48
55
"""
49
56
# The value was chosen to provide a balance between memory usage and the number of I/O operations
50
57
DEFAULT_BLOCK_SIZE = 6144
@@ -60,11 +67,32 @@ def __init__(self, config):
60
67
"""
61
68
self .config = config
62
69
self .watcher_threads = {}
63
- self .pod_tmp_mapping = {}
64
70
self .namespace = config .namespace ()
65
71
self .num_lines_to_check = int (config .joblogs ().get ('num_lines_to_check' , 0 ))
72
+ self .logs_dir = self .config .joblogs ().get ('logs_dir' ).strip ()
73
+ if not self .logs_dir :
74
+ raise ValueError ("Configuration error: 'logs_dir' is missing in joblogs configuration section." )
66
75
self .object_storage_provider = LibcloudObjectStorage (self .config )
67
76
77
+ def get_existing_log_filename (self , job_id ):
78
+ """
79
+ Retrieves the existing temporary log file path for a job without creating a new one.
80
+
81
+ Parameters
82
+ ----------
83
+ job_id : str
84
+ ID of the Kubernetes job or pod, which is also the name of the log file.
85
+
86
+ Returns
87
+ -------
88
+ str or None
89
+ Path to the existing temporary log file for the given job, or None if no such file exists.
90
+ """
91
+ log_file_path = os .path .join (self .logs_dir , f"{ job_id } .txt" )
92
+ if os .path .isfile (log_file_path ):
93
+ return log_file_path
94
+ return None
95
+
68
96
def get_last_n_lines (self , file_path , num_lines ):
69
97
"""
70
98
Efficiently retrieves the last `num_lines` lines from a file.
@@ -141,38 +169,46 @@ def concatenate_and_delete_files(self, main_file_path, temp_file_path, block_siz
141
169
except (IOError , OSError ) as e :
142
170
logger .error (f"Failed to concatenate and delete files for job: { e } " )
143
171
144
- def make_log_filename_for_job (self , job_name ):
172
+ def make_log_filename_for_job (self , job_id ):
145
173
"""
146
- Generates a unique temporary file path for storing logs of a job.
174
+ Creates a log file path for a job, using the job name as the file name or returns a path to an existing file .
147
175
148
- Parameters
149
- ----------
150
- job_name : str
151
- Name of the Kubernetes job or pod .
176
+ Parameters
177
+ ----------
178
+ job_id : str
179
+ ID of the Kubernetes job.
152
180
153
- Returns
154
- -------
155
- str
156
- Path to the temporary log file for the given job.
181
+ Returns
182
+ -------
183
+ str
184
+ Path to the temporary log file for the given job.
157
185
"""
158
- if self .pod_tmp_mapping .get (job_name ) is not None :
159
- return self .pod_tmp_mapping [job_name ]
160
- temp_dir = tempfile .gettempdir ()
161
- app_temp_dir = os .path .join (temp_dir , 'job_logs' )
162
- os .makedirs (app_temp_dir , exist_ok = True )
163
- fd , path = tempfile .mkstemp (prefix = f"{ job_name } _logs_" , suffix = ".txt" , dir = app_temp_dir )
164
- os .close (fd )
165
- self .pod_tmp_mapping [job_name ] = path
166
- return path
167
-
168
- def stream_logs (self , job_name ):
186
+
187
+ if not os .path .isdir (self .logs_dir ):
188
+ os .makedirs (self .logs_dir )
189
+
190
+ log_file_path = os .path .join (self .logs_dir , f"{ job_id } .txt" )
191
+ if os .path .exists (log_file_path ):
192
+ return log_file_path
193
+
194
+ with open (log_file_path , 'w' ) as file :
195
+ pass
196
+
197
+ return log_file_path
198
+
199
+
200
+
201
+ def stream_logs (self , job_id , pod_name ):
169
202
"""
170
203
Streams logs from a Kubernetes pod and writes them to a file.
171
204
172
205
Parameters
173
206
----------
174
- job_name : str
175
- Name of the Kubernetes pod to stream logs from.
207
+ job_id : str
208
+ ID of the Kubernetes job to use as a log file name.
209
+
210
+ pod_name : str
211
+ Name of the Kubernetes pod to read logs from.
176
212
177
213
Returns
178
214
-------
@@ -181,20 +217,20 @@ def stream_logs(self, job_name):
181
217
log_lines_counter = 0
182
218
v1 = client .CoreV1Api ()
183
219
w = watch .Watch ()
184
- log_file_path = self .make_log_filename_for_job (job_name )
220
+ log_file_path = self .make_log_filename_for_job (job_id )
185
221
last_n_lines = self .get_last_n_lines (log_file_path , self .num_lines_to_check )
186
222
if len (last_n_lines ) == 0 :
187
- logger .info (f"Log file '{ log_file_path } ' is empty or not found. Starting fresh logs for job '{ job_name } '." )
223
+ logger .info (f"Log file '{ log_file_path } ' is empty or not found. Starting fresh logs for job '{ job_id } '." )
188
224
189
225
try :
190
226
with open (log_file_path , 'a' ) as log_file :
191
227
temp_dir = os .path .dirname (log_file_path )
192
228
with tempfile .NamedTemporaryFile (mode = 'w+' , delete = False , dir = temp_dir ,
193
- prefix = f"{ job_name } _logs_tmp_" , suffix = ".txt" ) as temp_logs :
229
+ prefix = f"{ job_id } _logs_tmp_" , suffix = ".txt" ) as temp_logs :
194
230
temp_file_path = temp_logs .name
195
231
for line in w .stream (
196
232
v1 .read_namespaced_pod_log ,
197
- name = job_name ,
233
+ name = pod_name ,
198
234
namespace = self .namespace ,
199
235
follow = True ,
200
236
_preload_content = False
@@ -214,9 +250,9 @@ def stream_logs(self, job_name):
214
250
self .concatenate_and_delete_files (log_file_path , temp_file_path )
215
251
else :
216
252
os .remove (temp_file_path )
217
- logger .info (f"Removed temporary file '{ temp_file_path } ' after streaming logs for job '{ job_name } '." )
253
+ logger .info (f"Removed temporary file '{ temp_file_path } ' after streaming logs for job '{ job_id } '." )
218
254
except Exception as e :
219
- logger .exception (f"Error streaming logs for job '{ job_name } ': { e } " )
255
+ logger .exception (f"Error streaming logs for job '{ job_id } ': { e } " )
220
256
221
257
def handle_events (self , event ):
222
258
"""
@@ -241,11 +277,11 @@ def handle_events(self, event):
241
277
else :
242
278
self .watcher_threads [thread_name ] = threading .Thread (
243
279
target = self .stream_logs ,
244
- args = (pod_name ,)
280
+ args = (job_id , pod_name ,)
245
281
)
246
282
self .watcher_threads [thread_name ].start ()
247
283
elif pod .status .phase in ['Succeeded' , 'Failed' ]:
248
- log_filename = self .pod_tmp_mapping . get ( pod_name )
284
+ log_filename = self .get_existing_log_filename ( job_id )
249
285
if log_filename is not None and os .path .isfile (log_filename ) and os .path .getsize (log_filename ) > 0 :
250
286
if self .object_storage_provider .object_exists (job_id ):
251
287
logger .info (f"Log file for job '{ job_id } ' already exists in storage." )
0 commit comments