@@ -17,67 +17,15 @@ limitations under the License.
17
17
package main
18
18
19
19
import (
20
- "bufio"
21
- "bytes"
22
- "encoding/json"
23
- "errors"
24
20
"fmt"
25
- "io"
26
- "math"
27
21
"os"
28
22
"time"
29
23
30
- "github.com/docker/docker/pkg/jsonlog"
31
- "github.com/fsnotify/fsnotify"
32
- "github.com/golang/glog"
33
24
"github.com/urfave/cli"
34
- "golang.org/x/net/context"
35
25
"k8s.io/api/core/v1"
36
- pb "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
37
- "k8s.io/kubernetes/pkg/util/tail"
26
+ "k8s.io/kubernetes/pkg/kubelet/kuberuntime/logs"
38
27
)
39
28
40
- // streamType is the type of the stream.
41
- type streamType string
42
-
43
- const (
44
- stderrType streamType = "stderr"
45
- stdoutType streamType = "stdout"
46
-
47
- // timeFormat is the time format used in the log.
48
- timeFormat = time .RFC3339Nano
49
- )
50
-
51
- var (
52
- // eol is the end-of-line sign in the log.
53
- eol = []byte {'\n' }
54
- // delimiter is the delimiter for timestamp and streamtype in log line.
55
- delimiter = []byte {' ' }
56
- )
57
-
58
- // logMessage is the internal log type.
59
- type logMessage struct {
60
- timestamp time.Time
61
- stream streamType
62
- log []byte
63
- }
64
-
65
- // reset resets the log to nil.
66
- func (l * logMessage ) reset () {
67
- l .timestamp = time.Time {}
68
- l .stream = ""
69
- l .log = nil
70
- }
71
-
72
- // logOptions is the internal type of all log options.
73
- type logOptions struct {
74
- tail int64
75
- bytes int64
76
- since time.Time
77
- follow bool
78
- timestamp bool
79
- }
80
-
81
29
var logsCommand = cli.Command {
82
30
Name : "logs" ,
83
31
Usage : "Fetch the logs of a container" ,
@@ -99,310 +47,31 @@ var logsCommand = cli.Command{
99
47
},
100
48
},
101
49
Action : func (context * cli.Context ) error {
102
- if err := getRuntimeClient (context ); err != nil {
50
+ runtimeService , err := getRuntimeService (context )
51
+ if err != nil {
103
52
return err
104
53
}
54
+
55
+ containerID := context .Args ().First ()
56
+ if containerID == "" {
57
+ return fmt .Errorf ("ID cannot be empty" )
58
+ }
105
59
tailLines := context .Int64 ("tail" )
106
60
limitBytes := context .Int64 ("limit-bytes" )
107
- logOptions := & v1.PodLogOptions {
61
+ logOptions := logs . NewLogOptions ( & v1.PodLogOptions {
108
62
Follow : context .Bool ("follow" ),
109
63
TailLines : & tailLines ,
110
64
LimitBytes : & limitBytes ,
111
- }
112
- r , err := getContainerStatus ( runtimeClient , context . Args (). First () )
65
+ }, time . Now ())
66
+ status , err := runtimeService . ContainerStatus ( containerID )
113
67
if err != nil {
114
68
return err
115
69
}
116
- logPath := r . Status .GetLogPath ()
70
+ logPath := status .GetLogPath ()
117
71
if logPath == "" {
118
- return fmt .Errorf ("Get log path of container failed " )
72
+ return fmt .Errorf ("The container has not set log path " )
119
73
}
120
- return ReadLogs (logPath , logOptions , os .Stdout , os .Stderr )
74
+ return logs . ReadLogs (logPath , status . GetId (), logOptions , runtimeService , os .Stdout , os .Stderr )
121
75
},
122
76
After : closeConnection ,
123
77
}
124
-
125
- func getContainerStatus (client pb.RuntimeServiceClient , ID string ) (* pb.ContainerStatusResponse , error ) {
126
- if ID == "" {
127
- return nil , fmt .Errorf ("ID cannot be empty" )
128
- }
129
- request := & pb.ContainerStatusRequest {
130
- ContainerId : ID ,
131
- }
132
- r , err := client .ContainerStatus (context .Background (), request )
133
- if err != nil {
134
- return nil , err
135
- }
136
- return r , nil
137
- }
138
-
139
- // newLogOptions convert the v1.PodLogOptions to internal logOptions.
140
- func newLogOptions (apiOpts * v1.PodLogOptions , now time.Time ) * logOptions {
141
- opts := & logOptions {
142
- tail : * apiOpts .TailLines ,
143
- bytes : * apiOpts .LimitBytes ,
144
- follow : apiOpts .Follow ,
145
- timestamp : apiOpts .Timestamps ,
146
- }
147
- if apiOpts .TailLines != nil {
148
- opts .tail = * apiOpts .TailLines
149
- }
150
- if apiOpts .LimitBytes != nil {
151
- opts .bytes = * apiOpts .LimitBytes
152
- }
153
- if apiOpts .SinceSeconds != nil {
154
- opts .since = now .Add (- time .Duration (* apiOpts .SinceSeconds ) * time .Second )
155
- }
156
- if apiOpts .SinceTime != nil && apiOpts .SinceTime .After (opts .since ) {
157
- opts .since = apiOpts .SinceTime .Time
158
- }
159
- return opts
160
- }
161
-
162
- // ReadLogs read the container log and redirect into stdout and stderr.
163
- func ReadLogs (path string , apiOpts * v1.PodLogOptions , stdout , stderr io.Writer ) error {
164
- f , err := os .Open (path )
165
- if err != nil {
166
- return fmt .Errorf ("failed to open log file %q: %v" , path , err )
167
- }
168
- defer f .Close ()
169
-
170
- // Convert v1.PodLogOptions into internal log options.
171
- opts := newLogOptions (apiOpts , time .Now ())
172
-
173
- // Search start point based on tail line.
174
- start , err := tail .FindTailLineStartIndex (f , opts .tail )
175
- if err != nil {
176
- return fmt .Errorf ("failed to tail %d lines of log file %q: %v" , opts .tail , path , err )
177
- }
178
- if _ , err := f .Seek (start , os .SEEK_SET ); err != nil {
179
- return fmt .Errorf ("failed to seek %d in log file %q: %v" , start , path , err )
180
- }
181
-
182
- // Start parsing the logs.
183
- r := bufio .NewReader (f )
184
- // Do not create watcher here because it is not needed if `Follow` is false.
185
- var watcher * fsnotify.Watcher
186
- var parse parseFunc
187
- writer := newLogWriter (stdout , stderr , opts )
188
- msg := & logMessage {}
189
- for {
190
- l , err := r .ReadBytes (eol [0 ])
191
- if err != nil {
192
- if err != io .EOF { // This is an real error
193
- return fmt .Errorf ("failed to read log file %q: %v" , path , err )
194
- }
195
- if ! opts .follow {
196
- // Return directly when reading to the end if not follow.
197
- if len (l ) > 0 {
198
- glog .Warningf ("Incomplete line in log file %q: %q" , path , l )
199
- }
200
- glog .V (2 ).Infof ("Finish parsing log file %q" , path )
201
- return nil
202
- }
203
- // Reset seek so that if this is an incomplete line,
204
- // it will be read again.
205
- if _ , err = f .Seek (- int64 (len (l )), os .SEEK_CUR ); err != nil {
206
- return fmt .Errorf ("failed to reset seek in log file %q: %v" , path , err )
207
- }
208
- if watcher == nil {
209
- // Intialize the watcher if it has not been initialized yet.
210
- if watcher , err = fsnotify .NewWatcher (); err != nil {
211
- return fmt .Errorf ("failed to create fsnotify watcher: %v" , err )
212
- }
213
- defer watcher .Close ()
214
- if err = watcher .Add (f .Name ()); err != nil {
215
- return fmt .Errorf ("failed to watch file %q: %v" , f .Name (), err )
216
- }
217
- }
218
- // Wait until the next log change.
219
- if err = waitLogs (watcher ); err != nil {
220
- return fmt .Errorf ("failed to wait logs for log file %q: %v" , path , err )
221
- }
222
- continue
223
- }
224
- if parse == nil {
225
- // Intialize the log parsing function.
226
- parse , err = getParseFunc (l )
227
- if err != nil {
228
- return fmt .Errorf ("failed to get parse function: %v" , err )
229
- }
230
- }
231
- // Parse the log line.
232
- msg .reset ()
233
- if err := parse (l , msg ); err != nil {
234
- glog .Errorf ("Failed with err %v when parsing log for log file %q: %q" , err , path , l )
235
- continue
236
- }
237
- // Write the log line into the stream.
238
- if err := writer .write (msg ); err != nil {
239
- if err == errMaximumWrite {
240
- glog .V (2 ).Infof ("Finish parsing log file %q, hit bytes limit %d(bytes)" , path , opts .bytes )
241
- return nil
242
- }
243
- glog .Errorf ("Failed with err %v when writing log for log file %q: %+v" , err , path , msg )
244
- return err
245
- }
246
- }
247
- }
248
-
249
- // parseFunc is a function parsing one log line to the internal log type.
250
- // Notice that the caller must make sure logMessage is not nil.
251
- type parseFunc func ([]byte , * logMessage ) error
252
-
253
- var parseFuncs = []parseFunc {
254
- parseCRILog , // CRI log format parse function
255
- parseDockerJSONLog , // Docker JSON log format parse function
256
- }
257
-
258
- // parseCRILog parses logs in CRI log format. CRI Log format example:
259
- // 2016-10-06T00:17:09.669794202Z stdout log content 1
260
- // 2016-10-06T00:17:09.669794203Z stderr log content 2
261
- func parseCRILog (log []byte , msg * logMessage ) error {
262
- var err error
263
- // Parse timestamp
264
- idx := bytes .Index (log , delimiter )
265
- if idx < 0 {
266
- return fmt .Errorf ("timestamp is not found" )
267
- }
268
- msg .timestamp , err = time .Parse (timeFormat , string (log [:idx ]))
269
- if err != nil {
270
- return fmt .Errorf ("unexpected timestamp format %q: %v" , timeFormat , err )
271
- }
272
-
273
- // Parse stream type
274
- log = log [idx + 1 :]
275
- idx = bytes .Index (log , delimiter )
276
- if idx < 0 {
277
- return fmt .Errorf ("stream type is not found" )
278
- }
279
- msg .stream = streamType (log [:idx ])
280
- if msg .stream != stdoutType && msg .stream != stderrType {
281
- return fmt .Errorf ("unexpected stream type %q" , msg .stream )
282
- }
283
-
284
- // Get log content
285
- msg .log = log [idx + 1 :]
286
-
287
- return nil
288
- }
289
-
290
- // dockerJSONLog is the JSON log buffer used in parseDockerJSONLog.
291
- var dockerJSONLog = & jsonlog.JSONLog {}
292
-
293
- // parseDockerJSONLog parses logs in Docker JSON log format. Docker JSON log format
294
- // example:
295
- // {"log":"content 1","stream":"stdout","time":"2016-10-20T18:39:20.57606443Z"}
296
- // {"log":"content 2","stream":"stderr","time":"2016-10-20T18:39:20.57606444Z"}
297
- func parseDockerJSONLog (log []byte , msg * logMessage ) error {
298
- dockerJSONLog .Reset ()
299
- l := dockerJSONLog
300
- // TODO: JSON decoding is fairly expensive, we should evaluate this.
301
- if err := json .Unmarshal (log , l ); err != nil {
302
- return fmt .Errorf ("failed with %v to unmarshal log %q" , err , l )
303
- }
304
- msg .timestamp = l .Created
305
- msg .stream = streamType (l .Stream )
306
- msg .log = []byte (l .Log )
307
- return nil
308
- }
309
-
310
- // getParseFunc returns proper parse function based on the sample log line passed in.
311
- func getParseFunc (log []byte ) (parseFunc , error ) {
312
- for _ , p := range parseFuncs {
313
- if err := p (log , & logMessage {}); err == nil {
314
- return p , nil
315
- }
316
- }
317
- return nil , fmt .Errorf ("unsupported log format: %q" , log )
318
- }
319
-
320
- // waitLogs wait for the next log write.
321
- func waitLogs (w * fsnotify.Watcher ) error {
322
- errRetry := 5
323
- for {
324
- select {
325
- case e := <- w .Events :
326
- switch e .Op {
327
- case fsnotify .Write :
328
- return nil
329
- default :
330
- glog .Errorf ("Unexpected fsnotify event: %v, retrying..." , e )
331
- }
332
- case err := <- w .Errors :
333
- glog .Errorf ("Fsnotify watch error: %v, %d error retries remaining" , err , errRetry )
334
- if errRetry == 0 {
335
- return err
336
- }
337
- errRetry --
338
- }
339
- }
340
- }
341
-
342
- // logWriter controls the writing into the stream based on the log options.
343
- type logWriter struct {
344
- stdout io.Writer
345
- stderr io.Writer
346
- opts * logOptions
347
- remain int64
348
- }
349
-
350
- // errMaximumWrite is returned when all bytes have been written.
351
- var errMaximumWrite = errors .New ("maximum write" )
352
-
353
- // errShortWrite is returned when the message is not fully written.
354
- var errShortWrite = errors .New ("short write" )
355
-
356
- func newLogWriter (stdout io.Writer , stderr io.Writer , opts * logOptions ) * logWriter {
357
- w := & logWriter {
358
- stdout : stdout ,
359
- stderr : stderr ,
360
- opts : opts ,
361
- remain : math .MaxInt64 , // initialize it as infinity
362
- }
363
- if opts .bytes >= 0 {
364
- w .remain = opts .bytes
365
- }
366
- return w
367
- }
368
-
369
- // writeLogs writes logs into stdout, stderr.
370
- func (w * logWriter ) write (msg * logMessage ) error {
371
- if msg .timestamp .Before (w .opts .since ) {
372
- // Skip the line because it's older than since
373
- return nil
374
- }
375
- line := msg .log
376
- if w .opts .timestamp {
377
- prefix := append ([]byte (msg .timestamp .Format (timeFormat )), delimiter [0 ])
378
- line = append (prefix , line ... )
379
- }
380
- // If the line is longer than the remaining bytes, cut it.
381
- if int64 (len (line )) > w .remain {
382
- line = line [:w .remain ]
383
- }
384
- // Get the proper stream to write to.
385
- var stream io.Writer
386
- switch msg .stream {
387
- case stdoutType :
388
- stream = w .stdout
389
- case stderrType :
390
- stream = w .stderr
391
- default :
392
- return fmt .Errorf ("unexpected stream type %q" , msg .stream )
393
- }
394
- n , err := stream .Write (line )
395
- w .remain -= int64 (n )
396
- if err != nil {
397
- return err
398
- }
399
- // If the line has not been fully written, return errShortWrite
400
- if n < len (line ) {
401
- return errShortWrite
402
- }
403
- // If there are no more bytes left, return errMaximumWrite
404
- if w .remain <= 0 {
405
- return errMaximumWrite
406
- }
407
- return nil
408
- }
0 commit comments