From 221039efff255f869bba5b1c706e108ad111d7b6 Mon Sep 17 00:00:00 2001 From: Andrew-Liggitt Date: Fri, 12 Jul 2024 15:03:23 +0800 Subject: [PATCH] [feat] tail JSON log --- pkg/logging/json_logger.go | 162 ++++++++++++-------------------- pkg/logging/json_logger_test.go | 157 +++++++++++++++++++++++++++++++ pkg/logging/logging.go | 3 +- 3 files changed, 217 insertions(+), 105 deletions(-) create mode 100644 pkg/logging/json_logger_test.go diff --git a/pkg/logging/json_logger.go b/pkg/logging/json_logger.go index 68ac48740b7..6e295efa038 100644 --- a/pkg/logging/json_logger.go +++ b/pkg/logging/json_logger.go @@ -17,24 +17,24 @@ package logging import ( - "bufio" + "context" "errors" "fmt" "io" "os" - "os/exec" "path/filepath" "strconv" - "strings" "time" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/runtime/v2/logging" "github.com/containerd/log" "github.com/containerd/nerdctl/v2/pkg/logging/jsonfile" + "github.com/containerd/nerdctl/v2/pkg/logging/tail" "github.com/containerd/nerdctl/v2/pkg/strutil" "github.com/docker/go-units" "github.com/fahedouch/go-logrotate" + "github.com/fsnotify/fsnotify" ) var JSONDriverLogOpts = []string{ @@ -142,9 +142,6 @@ func viewLogsJSONFile(lvopts LogViewOptions, stdout, stderr io.Writer, stopChann } } - if checkExecutableAvailableInPath("tail") { - return viewLogsJSONFileThroughTailExec(lvopts, logFilePath, stdout, stderr, stopChannel) - } return viewLogsJSONFileDirect(lvopts, logFilePath, stdout, stderr, stopChannel) } @@ -156,118 +153,77 @@ func viewLogsJSONFileDirect(lvopts LogViewOptions, jsonLogFilePath string, stdou if err != nil { return err } - defer fin.Close() - err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, lvopts.Tail) + defer func() { fin.Close() }() + + // Search start point based on tail line. + start, err := tail.FindTailLineStartIndex(fin, lvopts.Tail) if err != nil { - return fmt.Errorf("error occurred while doing initial read of JSON logfile %q: %s", jsonLogFilePath, err) + return fmt.Errorf("failed to tail %d lines of JSON logfile %q: %v", lvopts.Tail, jsonLogFilePath, err) } - if lvopts.Follow { - // Get the current file handler's seek. - lastPos, err := fin.Seek(0, io.SeekCurrent) - if err != nil { - return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) - } - fin.Close() - for { - select { - case <-stopChannel: - log.L.Debugf("received stop signal while re-reading JSON logfile, returning") + if _, err := fin.Seek(start, io.SeekStart); err != nil { + return fmt.Errorf("failed to seek in log file %q: %v", jsonLogFilePath, err) + } + + limitedMode := (lvopts.Tail > 0) && (!lvopts.Follow) + limitedNum := lvopts.Tail + var stop bool + var watcher *fsnotify.Watcher + baseName := filepath.Base(jsonLogFilePath) + dir := filepath.Dir(jsonLogFilePath) + + for { + select { + case <-stopChannel: + log.L.Debugf("received stop signal while re-reading JSON logfile, returning") + return nil + default: + if stop || (limitedMode && limitedNum == 0) { + log.L.Debugf("finished parsing log JSON filefile, path: %s", jsonLogFilePath) return nil - default: - // Re-open the file and seek to the last-consumed offset. - fin, err = os.OpenFile(jsonLogFilePath, os.O_RDONLY, 0400) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to re-open JSON logfile %q: %s", jsonLogFilePath, err) - } - _, err = fin.Seek(lastPos, 0) + } + + err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, lvopts.Tail) + if err != nil { + return fmt.Errorf("error occurred while doing initial read of JSON logfile %q: %s", jsonLogFilePath, err) + } + + if lvopts.Follow { + // Get the current file handler's seek. + lastPos, err := fin.Seek(0, io.SeekCurrent) if err != nil { - fin.Close() return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) } - err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while doing follow-up decoding of JSON logfile %q at starting position %d: %s", jsonLogFilePath, lastPos, err) + if watcher == nil { + // Initialize the watcher if it has not been initialized yet. + if watcher, err = NewLogFileWatcher(dir); err != nil { + return err + } + defer watcher.Close() + // If we just created the watcher, try again to read as we might have missed + // the event. + continue } - // Record current file seek position before looping again. - lastPos, err = fin.Seek(0, io.SeekCurrent) + var recreated bool + // Wait until the next log change. + recreated, err = startTail(context.Background(), baseName, watcher) if err != nil { + return err + } + if recreated { + newF, err := openFileShareDelete(jsonLogFilePath) + if err != nil { + return fmt.Errorf("failed to open JSON logfile %q: %v", jsonLogFilePath, err) + } fin.Close() - return fmt.Errorf("error occurred while trying to seek JSON logfile %q at current position: %s", jsonLogFilePath, err) + fin = newF } - fin.Close() + continue } + stop = true // Give the OS a second to breathe before re-opening the file: - time.Sleep(time.Second) - } - } - return nil -} - -// Loads logs through the `tail` executable. -func viewLogsJSONFileThroughTailExec(lvopts LogViewOptions, jsonLogFilePath string, stdout, stderr io.Writer, stopChannel chan os.Signal) error { - var args []string - - args = append(args, "-n") - if lvopts.Tail == 0 { - args = append(args, "+0") - } else { - args = append(args, strconv.FormatUint(uint64(lvopts.Tail), 10)) - } - - if lvopts.Follow { - // using the `-F` to follow the file name instead of descriptor and retry if inaccessible - args = append(args, "-F") - } - args = append(args, jsonLogFilePath) - cmd := exec.Command("tail", args...) - - cmdStdout, err := cmd.StdoutPipe() - if err != nil { - return err - } - - cmdStderr, err := cmd.StderrPipe() - if err != nil { - return err - } - - if err := cmd.Start(); err != nil { - return err - } - - // filter the unwanted error message of the tail - go filterTailStderr(cmdStderr) - - // Setup killing goroutine: - go func() { - <-stopChannel - log.L.Debugf("killing tail logs process with PID: %d", cmd.Process.Pid) - cmd.Process.Kill() - }() - - return jsonfile.Decode(stdout, stderr, cmdStdout, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0) -} - -func filterTailStderr(reader io.Reader) error { - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - line := scanner.Text() - if strings.HasSuffix(line, "has appeared; following new file") || - strings.HasSuffix(line, "has become inaccessible: No such file or directory") || - strings.HasSuffix(line, "has been replaced; following new file") || - strings.HasSuffix(line, ": No such file or directory") { - continue } - fmt.Fprintln(os.Stderr, line) - } - - if err := scanner.Err(); err != nil { - return err } - return nil } diff --git a/pkg/logging/json_logger_test.go b/pkg/logging/json_logger_test.go new file mode 100644 index 00000000000..0ecb7e119b2 --- /dev/null +++ b/pkg/logging/json_logger_test.go @@ -0,0 +1,157 @@ +package logging + +import ( + "bytes" + "encoding/json" + "fmt" + "os" + "path/filepath" + "testing" + "time" +) + +func TestReadRotatedJSONLog(t *testing.T) { + tmpDir := t.TempDir() + file, err := os.CreateTemp(tmpDir, "logfile") + if err != nil { + t.Errorf("unable to create temp file, error: %s", err.Error()) + } + stdoutBuf := &bytes.Buffer{} + stderrBuf := &bytes.Buffer{} + containerStopped := make(chan os.Signal) + // Start to follow the container's log. + fileName := file.Name() + go func() { + lvOpts := LogViewOptions{ + Follow: true, + LogPath: fileName, + } + viewLogsJSONFileDirect(lvOpts, file.Name(), stdoutBuf, stderrBuf, containerStopped) + }() + + // log in stdout + expectedStdout := "line0\nline1\nline2\nline3\nline4\nline5\nline6\nline7\nline8\nline9\n" + dir := filepath.Dir(file.Name()) + baseName := filepath.Base(file.Name()) + + // Write 10 lines to log file. + // Let ReadLogs start. + time.Sleep(50 * time.Millisecond) + + type logContent struct { + Log string `json:"log"` + Stream string `json:"stream"` + Time string `json:"time"` + } + + for line := 0; line < 10; line++ { + // Write the first three lines to log file + log := logContent{} + log.Log = fmt.Sprintf("line%d\n", line) + log.Stream = "stdout" + log.Time = time.Now().Format(time.RFC3339Nano) + time.Sleep(1 * time.Millisecond) + logData, _ := json.Marshal(log) + file.Write(logData) + + if line == 5 { + file.Close() + // Pretend to rotate the log. + rotatedName := fmt.Sprintf("%s.%s", baseName, time.Now().Format("220060102-150405")) + rotatedName = filepath.Join(dir, rotatedName) + if err := os.Rename(filepath.Join(dir, baseName), rotatedName); err != nil { + t.Errorf("failed to rotate log %q to %q, error: %s", file.Name(), rotatedName, err.Error()) + return + } + + newF := filepath.Join(dir, baseName) + if file, err = os.Create(newF); err != nil { + t.Errorf("unable to create new log file, error: %s", err.Error()) + return + } + time.Sleep(20 * time.Millisecond) + } + } + + // Finished writing into the file, close it, so we can delete it later. + err = file.Close() + if err != nil { + t.Errorf("could not close file, error: %s", err.Error()) + } + + time.Sleep(2 * time.Second) + // Make the function ReadLogs end. + close(containerStopped) + + if expectedStdout != stdoutBuf.String() { + t.Errorf("expected: %s, acoutal: %s", expectedStdout, stdoutBuf.String()) + } +} + +func TestReadJSONLogs(t *testing.T) { + file, err := os.CreateTemp("", "TestFollowLogs") + if err != nil { + t.Fatalf("unable to create temp file") + } + defer os.Remove(file.Name()) + file.WriteString(`{"log":"line1\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n") + file.WriteString(`{"log":"line2\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n") + file.WriteString(`{"log":"line3\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n") + + stopChan := make(chan os.Signal) + testCases := []struct { + name string + logViewOptions LogViewOptions + expected string + }{ + { + name: "default log options should output all lines", + logViewOptions: LogViewOptions{ + LogPath: file.Name(), + Tail: 0, + }, + expected: "line1\nline2\nline3\n", + }, + { + name: "using Tail 2 should output last 2 lines", + logViewOptions: LogViewOptions{ + LogPath: file.Name(), + Tail: 2, + }, + expected: "line2\nline3\n", + }, + { + name: "using Tail 4 should output all lines when the log has less than 4 lines", + logViewOptions: LogViewOptions{ + LogPath: file.Name(), + Tail: 4, + }, + expected: "line1\nline2\nline3\n", + }, + { + name: "using Tail 0 should output all", + logViewOptions: LogViewOptions{ + LogPath: file.Name(), + Tail: 0, + }, + expected: "line1\nline2\nline3\n", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + stdoutBuf := bytes.NewBuffer(nil) + stderrBuf := bytes.NewBuffer(nil) + err = viewLogsJSONFileDirect(tc.logViewOptions, file.Name(), stdoutBuf, stderrBuf, stopChan) + + if err != nil { + t.Fatalf(err.Error()) + } + if stderrBuf.Len() > 0 { + t.Fatalf("Stderr: %v", stderrBuf.String()) + } + if actual := stdoutBuf.String(); tc.expected != actual { + t.Fatalf("Actual output does not match expected.\nActual: %v\nExpected: %v\n", actual, tc.expected) + } + }) + } +} diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index c1760637706..b639afef9ea 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -227,8 +227,7 @@ func NewLogFileWatcher(dir string) (*fsnotify.Watcher, error) { if err := watcher.Add(dir); err != nil { watcher.Close() return nil, fmt.Errorf("failed to watch directory %q: %w", dir, err) - } else { - return watcher, nil } + return watcher, nil } }