Skip to content

Commit 0b69f88

Browse files
committed
share logs code with kubelet
fix #169 Signed-off-by: yanxuean <[email protected]>
1 parent 044a560 commit 0b69f88

File tree

31 files changed

+2788
-1496
lines changed

31 files changed

+2788
-1496
lines changed

cmd/crictl/logs.go

+15-343
Original file line numberDiff line numberDiff line change
@@ -17,67 +17,17 @@ limitations under the License.
1717
package main
1818

1919
import (
20-
"bufio"
21-
"bytes"
22-
"encoding/json"
23-
"errors"
2420
"fmt"
25-
"io"
26-
"math"
2721
"os"
28-
"time"
2922

30-
"github.com/docker/docker/pkg/jsonlog"
31-
"github.com/fsnotify/fsnotify"
32-
"github.com/golang/glog"
3323
"github.com/urfave/cli"
3424
"golang.org/x/net/context"
3525
"k8s.io/api/core/v1"
3626
pb "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
37-
"k8s.io/kubernetes/pkg/util/tail"
38-
)
39-
40-
// streamType is the type of the stream.
41-
type streamType string
42-
43-
const (
44-
stderrType streamType = "stderr"
45-
stdoutType streamType = "stdout"
46-
47-
// timeFormat is the time format used in the log.
48-
timeFormat = time.RFC3339Nano
49-
)
50-
51-
var (
52-
// eol is the end-of-line sign in the log.
53-
eol = []byte{'\n'}
54-
// delimiter is the delimiter for timestamp and streamtype in log line.
55-
delimiter = []byte{' '}
27+
"k8s.io/kubernetes/pkg/kubelet/kuberuntime/logs"
28+
"time"
5629
)
5730

58-
// logMessage is the internal log type.
59-
type logMessage struct {
60-
timestamp time.Time
61-
stream streamType
62-
log []byte
63-
}
64-
65-
// reset resets the log to nil.
66-
func (l *logMessage) reset() {
67-
l.timestamp = time.Time{}
68-
l.stream = ""
69-
l.log = nil
70-
}
71-
72-
// logOptions is the internal type of all log options.
73-
type logOptions struct {
74-
tail int64
75-
bytes int64
76-
since time.Time
77-
follow bool
78-
timestamp bool
79-
}
80-
8131
var logsCommand = cli.Command{
8232
Name: "logs",
8333
Usage: "Fetch the logs of a container",
@@ -99,310 +49,32 @@ var logsCommand = cli.Command{
9949
},
10050
},
10151
Action: func(context *cli.Context) error {
102-
if err := getRuntimeClient(context); err != nil {
52+
runtimeService, err := getRuntimeService(context)
53+
if err != nil {
10354
return err
10455
}
56+
57+
containerID := context.Args().First()
58+
if containerID == "" {
59+
return fmt.Errorf("ID cannot be empty")
60+
}
10561
tailLines := context.Int64("tail")
10662
limitBytes := context.Int64("limit-bytes")
107-
logOptions := &v1.PodLogOptions{
63+
logOptions := logs.NewLogOptions(&v1.PodLogOptions{
10864
Follow: context.Bool("follow"),
10965
TailLines: &tailLines,
11066
LimitBytes: &limitBytes,
111-
}
112-
r, err := getContainerStatus(runtimeClient, context.Args().First())
67+
}, time.Now())
68+
status, err := runtimeService.ContainerStatus(containerID)
11369
if err != nil {
11470
return err
11571
}
116-
logPath := r.Status.GetLogPath()
72+
logPath := status.GetLogPath()
11773
if logPath == "" {
118-
return fmt.Errorf("Get log path of container failed")
74+
return fmt.Errorf("The container has not set log path")
11975
}
120-
return ReadLogs(logPath, logOptions, os.Stdout, os.Stderr)
76+
return logs.ReadLogs(logPath, status.GetId(), logOptions, runtimeService, os.Stdout, os.Stderr)
12177
},
12278
After: closeConnection,
12379
}
12480

125-
func getContainerStatus(client pb.RuntimeServiceClient, ID string) (*pb.ContainerStatusResponse, error) {
126-
if ID == "" {
127-
return nil, fmt.Errorf("ID cannot be empty")
128-
}
129-
request := &pb.ContainerStatusRequest{
130-
ContainerId: ID,
131-
}
132-
r, err := client.ContainerStatus(context.Background(), request)
133-
if err != nil {
134-
return nil, err
135-
}
136-
return r, nil
137-
}
138-
139-
// newLogOptions convert the v1.PodLogOptions to internal logOptions.
140-
func newLogOptions(apiOpts *v1.PodLogOptions, now time.Time) *logOptions {
141-
opts := &logOptions{
142-
tail: *apiOpts.TailLines,
143-
bytes: *apiOpts.LimitBytes,
144-
follow: apiOpts.Follow,
145-
timestamp: apiOpts.Timestamps,
146-
}
147-
if apiOpts.TailLines != nil {
148-
opts.tail = *apiOpts.TailLines
149-
}
150-
if apiOpts.LimitBytes != nil {
151-
opts.bytes = *apiOpts.LimitBytes
152-
}
153-
if apiOpts.SinceSeconds != nil {
154-
opts.since = now.Add(-time.Duration(*apiOpts.SinceSeconds) * time.Second)
155-
}
156-
if apiOpts.SinceTime != nil && apiOpts.SinceTime.After(opts.since) {
157-
opts.since = apiOpts.SinceTime.Time
158-
}
159-
return opts
160-
}
161-
162-
// ReadLogs read the container log and redirect into stdout and stderr.
163-
func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error {
164-
f, err := os.Open(path)
165-
if err != nil {
166-
return fmt.Errorf("failed to open log file %q: %v", path, err)
167-
}
168-
defer f.Close()
169-
170-
// Convert v1.PodLogOptions into internal log options.
171-
opts := newLogOptions(apiOpts, time.Now())
172-
173-
// Search start point based on tail line.
174-
start, err := tail.FindTailLineStartIndex(f, opts.tail)
175-
if err != nil {
176-
return fmt.Errorf("failed to tail %d lines of log file %q: %v", opts.tail, path, err)
177-
}
178-
if _, err := f.Seek(start, os.SEEK_SET); err != nil {
179-
return fmt.Errorf("failed to seek %d in log file %q: %v", start, path, err)
180-
}
181-
182-
// Start parsing the logs.
183-
r := bufio.NewReader(f)
184-
// Do not create watcher here because it is not needed if `Follow` is false.
185-
var watcher *fsnotify.Watcher
186-
var parse parseFunc
187-
writer := newLogWriter(stdout, stderr, opts)
188-
msg := &logMessage{}
189-
for {
190-
l, err := r.ReadBytes(eol[0])
191-
if err != nil {
192-
if err != io.EOF { // This is an real error
193-
return fmt.Errorf("failed to read log file %q: %v", path, err)
194-
}
195-
if !opts.follow {
196-
// Return directly when reading to the end if not follow.
197-
if len(l) > 0 {
198-
glog.Warningf("Incomplete line in log file %q: %q", path, l)
199-
}
200-
glog.V(2).Infof("Finish parsing log file %q", path)
201-
return nil
202-
}
203-
// Reset seek so that if this is an incomplete line,
204-
// it will be read again.
205-
if _, err = f.Seek(-int64(len(l)), os.SEEK_CUR); err != nil {
206-
return fmt.Errorf("failed to reset seek in log file %q: %v", path, err)
207-
}
208-
if watcher == nil {
209-
// Intialize the watcher if it has not been initialized yet.
210-
if watcher, err = fsnotify.NewWatcher(); err != nil {
211-
return fmt.Errorf("failed to create fsnotify watcher: %v", err)
212-
}
213-
defer watcher.Close()
214-
if err = watcher.Add(f.Name()); err != nil {
215-
return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
216-
}
217-
}
218-
// Wait until the next log change.
219-
if err = waitLogs(watcher); err != nil {
220-
return fmt.Errorf("failed to wait logs for log file %q: %v", path, err)
221-
}
222-
continue
223-
}
224-
if parse == nil {
225-
// Intialize the log parsing function.
226-
parse, err = getParseFunc(l)
227-
if err != nil {
228-
return fmt.Errorf("failed to get parse function: %v", err)
229-
}
230-
}
231-
// Parse the log line.
232-
msg.reset()
233-
if err := parse(l, msg); err != nil {
234-
glog.Errorf("Failed with err %v when parsing log for log file %q: %q", err, path, l)
235-
continue
236-
}
237-
// Write the log line into the stream.
238-
if err := writer.write(msg); err != nil {
239-
if err == errMaximumWrite {
240-
glog.V(2).Infof("Finish parsing log file %q, hit bytes limit %d(bytes)", path, opts.bytes)
241-
return nil
242-
}
243-
glog.Errorf("Failed with err %v when writing log for log file %q: %+v", err, path, msg)
244-
return err
245-
}
246-
}
247-
}
248-
249-
// parseFunc is a function parsing one log line to the internal log type.
250-
// Notice that the caller must make sure logMessage is not nil.
251-
type parseFunc func([]byte, *logMessage) error
252-
253-
var parseFuncs = []parseFunc{
254-
parseCRILog, // CRI log format parse function
255-
parseDockerJSONLog, // Docker JSON log format parse function
256-
}
257-
258-
// parseCRILog parses logs in CRI log format. CRI Log format example:
259-
// 2016-10-06T00:17:09.669794202Z stdout log content 1
260-
// 2016-10-06T00:17:09.669794203Z stderr log content 2
261-
func parseCRILog(log []byte, msg *logMessage) error {
262-
var err error
263-
// Parse timestamp
264-
idx := bytes.Index(log, delimiter)
265-
if idx < 0 {
266-
return fmt.Errorf("timestamp is not found")
267-
}
268-
msg.timestamp, err = time.Parse(timeFormat, string(log[:idx]))
269-
if err != nil {
270-
return fmt.Errorf("unexpected timestamp format %q: %v", timeFormat, err)
271-
}
272-
273-
// Parse stream type
274-
log = log[idx+1:]
275-
idx = bytes.Index(log, delimiter)
276-
if idx < 0 {
277-
return fmt.Errorf("stream type is not found")
278-
}
279-
msg.stream = streamType(log[:idx])
280-
if msg.stream != stdoutType && msg.stream != stderrType {
281-
return fmt.Errorf("unexpected stream type %q", msg.stream)
282-
}
283-
284-
// Get log content
285-
msg.log = log[idx+1:]
286-
287-
return nil
288-
}
289-
290-
// dockerJSONLog is the JSON log buffer used in parseDockerJSONLog.
291-
var dockerJSONLog = &jsonlog.JSONLog{}
292-
293-
// parseDockerJSONLog parses logs in Docker JSON log format. Docker JSON log format
294-
// example:
295-
// {"log":"content 1","stream":"stdout","time":"2016-10-20T18:39:20.57606443Z"}
296-
// {"log":"content 2","stream":"stderr","time":"2016-10-20T18:39:20.57606444Z"}
297-
func parseDockerJSONLog(log []byte, msg *logMessage) error {
298-
dockerJSONLog.Reset()
299-
l := dockerJSONLog
300-
// TODO: JSON decoding is fairly expensive, we should evaluate this.
301-
if err := json.Unmarshal(log, l); err != nil {
302-
return fmt.Errorf("failed with %v to unmarshal log %q", err, l)
303-
}
304-
msg.timestamp = l.Created
305-
msg.stream = streamType(l.Stream)
306-
msg.log = []byte(l.Log)
307-
return nil
308-
}
309-
310-
// getParseFunc returns proper parse function based on the sample log line passed in.
311-
func getParseFunc(log []byte) (parseFunc, error) {
312-
for _, p := range parseFuncs {
313-
if err := p(log, &logMessage{}); err == nil {
314-
return p, nil
315-
}
316-
}
317-
return nil, fmt.Errorf("unsupported log format: %q", log)
318-
}
319-
320-
// waitLogs wait for the next log write.
321-
func waitLogs(w *fsnotify.Watcher) error {
322-
errRetry := 5
323-
for {
324-
select {
325-
case e := <-w.Events:
326-
switch e.Op {
327-
case fsnotify.Write:
328-
return nil
329-
default:
330-
glog.Errorf("Unexpected fsnotify event: %v, retrying...", e)
331-
}
332-
case err := <-w.Errors:
333-
glog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry)
334-
if errRetry == 0 {
335-
return err
336-
}
337-
errRetry--
338-
}
339-
}
340-
}
341-
342-
// logWriter controls the writing into the stream based on the log options.
343-
type logWriter struct {
344-
stdout io.Writer
345-
stderr io.Writer
346-
opts *logOptions
347-
remain int64
348-
}
349-
350-
// errMaximumWrite is returned when all bytes have been written.
351-
var errMaximumWrite = errors.New("maximum write")
352-
353-
// errShortWrite is returned when the message is not fully written.
354-
var errShortWrite = errors.New("short write")
355-
356-
func newLogWriter(stdout io.Writer, stderr io.Writer, opts *logOptions) *logWriter {
357-
w := &logWriter{
358-
stdout: stdout,
359-
stderr: stderr,
360-
opts: opts,
361-
remain: math.MaxInt64, // initialize it as infinity
362-
}
363-
if opts.bytes >= 0 {
364-
w.remain = opts.bytes
365-
}
366-
return w
367-
}
368-
369-
// writeLogs writes logs into stdout, stderr.
370-
func (w *logWriter) write(msg *logMessage) error {
371-
if msg.timestamp.Before(w.opts.since) {
372-
// Skip the line because it's older than since
373-
return nil
374-
}
375-
line := msg.log
376-
if w.opts.timestamp {
377-
prefix := append([]byte(msg.timestamp.Format(timeFormat)), delimiter[0])
378-
line = append(prefix, line...)
379-
}
380-
// If the line is longer than the remaining bytes, cut it.
381-
if int64(len(line)) > w.remain {
382-
line = line[:w.remain]
383-
}
384-
// Get the proper stream to write to.
385-
var stream io.Writer
386-
switch msg.stream {
387-
case stdoutType:
388-
stream = w.stdout
389-
case stderrType:
390-
stream = w.stderr
391-
default:
392-
return fmt.Errorf("unexpected stream type %q", msg.stream)
393-
}
394-
n, err := stream.Write(line)
395-
w.remain -= int64(n)
396-
if err != nil {
397-
return err
398-
}
399-
// If the line has not been fully written, return errShortWrite
400-
if n < len(line) {
401-
return errShortWrite
402-
}
403-
// If there are no more bytes left, return errMaximumWrite
404-
if w.remain <= 0 {
405-
return errMaximumWrite
406-
}
407-
return nil
408-
}

0 commit comments

Comments
 (0)