Skip to content

Commit

Permalink
[feat] tail JSON log
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew-Liggitt authored and Andrew-Liggitt committed Jul 12, 2024
1 parent 9c6e5f5 commit 221039e
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 105 deletions.
162 changes: 59 additions & 103 deletions pkg/logging/json_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@
package logging

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime/v2/logging"
"github.com/containerd/log"
"github.com/containerd/nerdctl/v2/pkg/logging/jsonfile"
"github.com/containerd/nerdctl/v2/pkg/logging/tail"
"github.com/containerd/nerdctl/v2/pkg/strutil"
"github.com/docker/go-units"
"github.com/fahedouch/go-logrotate"
"github.com/fsnotify/fsnotify"
)

var JSONDriverLogOpts = []string{
Expand Down Expand Up @@ -142,9 +142,6 @@ func viewLogsJSONFile(lvopts LogViewOptions, stdout, stderr io.Writer, stopChann
}
}

if checkExecutableAvailableInPath("tail") {
return viewLogsJSONFileThroughTailExec(lvopts, logFilePath, stdout, stderr, stopChannel)
}
return viewLogsJSONFileDirect(lvopts, logFilePath, stdout, stderr, stopChannel)
}

Expand All @@ -156,118 +153,77 @@ func viewLogsJSONFileDirect(lvopts LogViewOptions, jsonLogFilePath string, stdou
if err != nil {
return err
}
defer fin.Close()
err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, lvopts.Tail)
defer func() { fin.Close() }()

// Search start point based on tail line.
start, err := tail.FindTailLineStartIndex(fin, lvopts.Tail)
if err != nil {
return fmt.Errorf("error occurred while doing initial read of JSON logfile %q: %s", jsonLogFilePath, err)
return fmt.Errorf("failed to tail %d lines of JSON logfile %q: %v", lvopts.Tail, jsonLogFilePath, err)
}

if lvopts.Follow {
// Get the current file handler's seek.
lastPos, err := fin.Seek(0, io.SeekCurrent)
if err != nil {
return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err)
}
fin.Close()
for {
select {
case <-stopChannel:
log.L.Debugf("received stop signal while re-reading JSON logfile, returning")
if _, err := fin.Seek(start, io.SeekStart); err != nil {
return fmt.Errorf("failed to seek in log file %q: %v", jsonLogFilePath, err)
}

limitedMode := (lvopts.Tail > 0) && (!lvopts.Follow)
limitedNum := lvopts.Tail
var stop bool
var watcher *fsnotify.Watcher
baseName := filepath.Base(jsonLogFilePath)
dir := filepath.Dir(jsonLogFilePath)

for {
select {
case <-stopChannel:
log.L.Debugf("received stop signal while re-reading JSON logfile, returning")
return nil
default:
if stop || (limitedMode && limitedNum == 0) {
log.L.Debugf("finished parsing log JSON filefile, path: %s", jsonLogFilePath)
return nil
default:
// Re-open the file and seek to the last-consumed offset.
fin, err = os.OpenFile(jsonLogFilePath, os.O_RDONLY, 0400)
if err != nil {
fin.Close()
return fmt.Errorf("error occurred while trying to re-open JSON logfile %q: %s", jsonLogFilePath, err)
}
_, err = fin.Seek(lastPos, 0)
}

err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, lvopts.Tail)
if err != nil {
return fmt.Errorf("error occurred while doing initial read of JSON logfile %q: %s", jsonLogFilePath, err)
}

if lvopts.Follow {
// Get the current file handler's seek.
lastPos, err := fin.Seek(0, io.SeekCurrent)
if err != nil {
fin.Close()
return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err)
}

err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0)
if err != nil {
fin.Close()
return fmt.Errorf("error occurred while doing follow-up decoding of JSON logfile %q at starting position %d: %s", jsonLogFilePath, lastPos, err)
if watcher == nil {
// Initialize the watcher if it has not been initialized yet.
if watcher, err = NewLogFileWatcher(dir); err != nil {
return err
}
defer watcher.Close()
// If we just created the watcher, try again to read as we might have missed
// the event.
continue
}

// Record current file seek position before looping again.
lastPos, err = fin.Seek(0, io.SeekCurrent)
var recreated bool
// Wait until the next log change.
recreated, err = startTail(context.Background(), baseName, watcher)
if err != nil {
return err
}
if recreated {
newF, err := openFileShareDelete(jsonLogFilePath)
if err != nil {
return fmt.Errorf("failed to open JSON logfile %q: %v", jsonLogFilePath, err)
}
fin.Close()
return fmt.Errorf("error occurred while trying to seek JSON logfile %q at current position: %s", jsonLogFilePath, err)
fin = newF
}
fin.Close()
continue
}
stop = true
// Give the OS a second to breathe before re-opening the file:
time.Sleep(time.Second)
}
}
return nil
}

// Loads logs through the `tail` executable.
func viewLogsJSONFileThroughTailExec(lvopts LogViewOptions, jsonLogFilePath string, stdout, stderr io.Writer, stopChannel chan os.Signal) error {
var args []string

args = append(args, "-n")
if lvopts.Tail == 0 {
args = append(args, "+0")
} else {
args = append(args, strconv.FormatUint(uint64(lvopts.Tail), 10))
}

if lvopts.Follow {
// using the `-F` to follow the file name instead of descriptor and retry if inaccessible
args = append(args, "-F")
}
args = append(args, jsonLogFilePath)
cmd := exec.Command("tail", args...)

cmdStdout, err := cmd.StdoutPipe()
if err != nil {
return err
}

cmdStderr, err := cmd.StderrPipe()
if err != nil {
return err
}

if err := cmd.Start(); err != nil {
return err
}

// filter the unwanted error message of the tail
go filterTailStderr(cmdStderr)

// Setup killing goroutine:
go func() {
<-stopChannel
log.L.Debugf("killing tail logs process with PID: %d", cmd.Process.Pid)
cmd.Process.Kill()
}()

return jsonfile.Decode(stdout, stderr, cmdStdout, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0)
}

func filterTailStderr(reader io.Reader) error {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
if strings.HasSuffix(line, "has appeared; following new file") ||
strings.HasSuffix(line, "has become inaccessible: No such file or directory") ||
strings.HasSuffix(line, "has been replaced; following new file") ||
strings.HasSuffix(line, ": No such file or directory") {
continue
}
fmt.Fprintln(os.Stderr, line)
}

if err := scanner.Err(); err != nil {
return err
}
return nil
}
157 changes: 157 additions & 0 deletions pkg/logging/json_logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package logging

import (
"bytes"
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"
"time"
)

func TestReadRotatedJSONLog(t *testing.T) {
tmpDir := t.TempDir()
file, err := os.CreateTemp(tmpDir, "logfile")
if err != nil {
t.Errorf("unable to create temp file, error: %s", err.Error())
}
stdoutBuf := &bytes.Buffer{}
stderrBuf := &bytes.Buffer{}
containerStopped := make(chan os.Signal)
// Start to follow the container's log.
fileName := file.Name()
go func() {
lvOpts := LogViewOptions{
Follow: true,
LogPath: fileName,
}
viewLogsJSONFileDirect(lvOpts, file.Name(), stdoutBuf, stderrBuf, containerStopped)
}()

// log in stdout
expectedStdout := "line0\nline1\nline2\nline3\nline4\nline5\nline6\nline7\nline8\nline9\n"
dir := filepath.Dir(file.Name())
baseName := filepath.Base(file.Name())

// Write 10 lines to log file.
// Let ReadLogs start.
time.Sleep(50 * time.Millisecond)

type logContent struct {
Log string `json:"log"`
Stream string `json:"stream"`
Time string `json:"time"`
}

for line := 0; line < 10; line++ {
// Write the first three lines to log file
log := logContent{}
log.Log = fmt.Sprintf("line%d\n", line)
log.Stream = "stdout"
log.Time = time.Now().Format(time.RFC3339Nano)
time.Sleep(1 * time.Millisecond)
logData, _ := json.Marshal(log)
file.Write(logData)

if line == 5 {
file.Close()
// Pretend to rotate the log.
rotatedName := fmt.Sprintf("%s.%s", baseName, time.Now().Format("220060102-150405"))
rotatedName = filepath.Join(dir, rotatedName)
if err := os.Rename(filepath.Join(dir, baseName), rotatedName); err != nil {
t.Errorf("failed to rotate log %q to %q, error: %s", file.Name(), rotatedName, err.Error())
return
}

newF := filepath.Join(dir, baseName)
if file, err = os.Create(newF); err != nil {
t.Errorf("unable to create new log file, error: %s", err.Error())
return
}
time.Sleep(20 * time.Millisecond)
}
}

// Finished writing into the file, close it, so we can delete it later.
err = file.Close()
if err != nil {
t.Errorf("could not close file, error: %s", err.Error())
}

time.Sleep(2 * time.Second)
// Make the function ReadLogs end.
close(containerStopped)

if expectedStdout != stdoutBuf.String() {
t.Errorf("expected: %s, acoutal: %s", expectedStdout, stdoutBuf.String())
}
}

func TestReadJSONLogs(t *testing.T) {
file, err := os.CreateTemp("", "TestFollowLogs")
if err != nil {
t.Fatalf("unable to create temp file")
}
defer os.Remove(file.Name())
file.WriteString(`{"log":"line1\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n")
file.WriteString(`{"log":"line2\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n")
file.WriteString(`{"log":"line3\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n")

stopChan := make(chan os.Signal)
testCases := []struct {
name string
logViewOptions LogViewOptions
expected string
}{
{
name: "default log options should output all lines",
logViewOptions: LogViewOptions{
LogPath: file.Name(),
Tail: 0,
},
expected: "line1\nline2\nline3\n",
},
{
name: "using Tail 2 should output last 2 lines",
logViewOptions: LogViewOptions{
LogPath: file.Name(),
Tail: 2,
},
expected: "line2\nline3\n",
},
{
name: "using Tail 4 should output all lines when the log has less than 4 lines",
logViewOptions: LogViewOptions{
LogPath: file.Name(),
Tail: 4,
},
expected: "line1\nline2\nline3\n",
},
{
name: "using Tail 0 should output all",
logViewOptions: LogViewOptions{
LogPath: file.Name(),
Tail: 0,
},
expected: "line1\nline2\nline3\n",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
stdoutBuf := bytes.NewBuffer(nil)
stderrBuf := bytes.NewBuffer(nil)
err = viewLogsJSONFileDirect(tc.logViewOptions, file.Name(), stdoutBuf, stderrBuf, stopChan)

if err != nil {
t.Fatalf(err.Error())
}
if stderrBuf.Len() > 0 {
t.Fatalf("Stderr: %v", stderrBuf.String())
}
if actual := stdoutBuf.String(); tc.expected != actual {
t.Fatalf("Actual output does not match expected.\nActual: %v\nExpected: %v\n", actual, tc.expected)
}
})
}
}
3 changes: 1 addition & 2 deletions pkg/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ func NewLogFileWatcher(dir string) (*fsnotify.Watcher, error) {
if err := watcher.Add(dir); err != nil {
watcher.Close()
return nil, fmt.Errorf("failed to watch directory %q: %w", dir, err)
} else {
return watcher, nil
}
return watcher, nil
}
}

0 comments on commit 221039e

Please sign in to comment.