Skip to content

Commit

Permalink
feat: tail JSON log
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew-Liggitt <[email protected]>
  • Loading branch information
Andrew-Liggitt authored and Andrew-Liggitt committed Jul 12, 2024
1 parent a35780d commit c0b2fed
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 105 deletions.
162 changes: 59 additions & 103 deletions pkg/logging/json_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@
package logging

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime/v2/logging"
"github.com/containerd/log"
"github.com/containerd/nerdctl/v2/pkg/logging/jsonfile"
"github.com/containerd/nerdctl/v2/pkg/logging/tail"
"github.com/containerd/nerdctl/v2/pkg/strutil"
"github.com/docker/go-units"
"github.com/fahedouch/go-logrotate"
"github.com/fsnotify/fsnotify"
)

var JSONDriverLogOpts = []string{
Expand Down Expand Up @@ -142,9 +142,6 @@ func viewLogsJSONFile(lvopts LogViewOptions, stdout, stderr io.Writer, stopChann
}
}

if checkExecutableAvailableInPath("tail") {
return viewLogsJSONFileThroughTailExec(lvopts, logFilePath, stdout, stderr, stopChannel)
}
return viewLogsJSONFileDirect(lvopts, logFilePath, stdout, stderr, stopChannel)
}

Expand All @@ -156,118 +153,77 @@ func viewLogsJSONFileDirect(lvopts LogViewOptions, jsonLogFilePath string, stdou
if err != nil {
return err
}
defer fin.Close()
err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, lvopts.Tail)
defer func() { fin.Close() }()

// Search start point based on tail line.
start, err := tail.FindTailLineStartIndex(fin, lvopts.Tail)
if err != nil {
return fmt.Errorf("error occurred while doing initial read of JSON logfile %q: %s", jsonLogFilePath, err)
return fmt.Errorf("failed to tail %d lines of JSON logfile %q: %v", lvopts.Tail, jsonLogFilePath, err)
}

if lvopts.Follow {
// Get the current file handler's seek.
lastPos, err := fin.Seek(0, io.SeekCurrent)
if err != nil {
return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err)
}
fin.Close()
for {
select {
case <-stopChannel:
log.L.Debugf("received stop signal while re-reading JSON logfile, returning")
if _, err := fin.Seek(start, io.SeekStart); err != nil {
return fmt.Errorf("failed to seek in log file %q: %v", jsonLogFilePath, err)
}

limitedMode := (lvopts.Tail > 0) && (!lvopts.Follow)
limitedNum := lvopts.Tail
var stop bool
var watcher *fsnotify.Watcher
baseName := filepath.Base(jsonLogFilePath)
dir := filepath.Dir(jsonLogFilePath)

for {
select {
case <-stopChannel:
log.L.Debugf("received stop signal while re-reading JSON logfile, returning")
return nil
default:
if stop || (limitedMode && limitedNum == 0) {
log.L.Debugf("finished parsing log JSON filefile, path: %s", jsonLogFilePath)
return nil
default:
// Re-open the file and seek to the last-consumed offset.
fin, err = os.OpenFile(jsonLogFilePath, os.O_RDONLY, 0400)
if err != nil {
fin.Close()
return fmt.Errorf("error occurred while trying to re-open JSON logfile %q: %s", jsonLogFilePath, err)
}
_, err = fin.Seek(lastPos, 0)
}

err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, lvopts.Tail)
if err != nil {
return fmt.Errorf("error occurred while doing initial read of JSON logfile %q: %s", jsonLogFilePath, err)
}

if lvopts.Follow {
// Get the current file handler's seek.
lastPos, err := fin.Seek(0, io.SeekCurrent)
if err != nil {
fin.Close()
return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err)
}

err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0)
if err != nil {
fin.Close()
return fmt.Errorf("error occurred while doing follow-up decoding of JSON logfile %q at starting position %d: %s", jsonLogFilePath, lastPos, err)
if watcher == nil {
// Initialize the watcher if it has not been initialized yet.
if watcher, err = NewLogFileWatcher(dir); err != nil {
return err
}
defer watcher.Close()
// If we just created the watcher, try again to read as we might have missed
// the event.
continue
}

// Record current file seek position before looping again.
lastPos, err = fin.Seek(0, io.SeekCurrent)
var recreated bool
// Wait until the next log change.
recreated, err = startTail(context.Background(), baseName, watcher)
if err != nil {
return err
}
if recreated {
newF, err := openFileShareDelete(jsonLogFilePath)
if err != nil {
return fmt.Errorf("failed to open JSON logfile %q: %v", jsonLogFilePath, err)
}
fin.Close()
return fmt.Errorf("error occurred while trying to seek JSON logfile %q at current position: %s", jsonLogFilePath, err)
fin = newF
}
fin.Close()
continue
}
stop = true
// Give the OS a second to breathe before re-opening the file:
time.Sleep(time.Second)
}
}
return nil
}

// Loads logs through the `tail` executable.
func viewLogsJSONFileThroughTailExec(lvopts LogViewOptions, jsonLogFilePath string, stdout, stderr io.Writer, stopChannel chan os.Signal) error {
var args []string

args = append(args, "-n")
if lvopts.Tail == 0 {
args = append(args, "+0")
} else {
args = append(args, strconv.FormatUint(uint64(lvopts.Tail), 10))
}

if lvopts.Follow {
// using the `-F` to follow the file name instead of descriptor and retry if inaccessible
args = append(args, "-F")
}
args = append(args, jsonLogFilePath)
cmd := exec.Command("tail", args...)

cmdStdout, err := cmd.StdoutPipe()
if err != nil {
return err
}

cmdStderr, err := cmd.StderrPipe()
if err != nil {
return err
}

if err := cmd.Start(); err != nil {
return err
}

// filter the unwanted error message of the tail
go filterTailStderr(cmdStderr)

// Setup killing goroutine:
go func() {
<-stopChannel
log.L.Debugf("killing tail logs process with PID: %d", cmd.Process.Pid)
cmd.Process.Kill()
}()

return jsonfile.Decode(stdout, stderr, cmdStdout, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0)
}

func filterTailStderr(reader io.Reader) error {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
if strings.HasSuffix(line, "has appeared; following new file") ||
strings.HasSuffix(line, "has become inaccessible: No such file or directory") ||
strings.HasSuffix(line, "has been replaced; following new file") ||
strings.HasSuffix(line, ": No such file or directory") {
continue
}
fmt.Fprintln(os.Stderr, line)
}

if err := scanner.Err(); err != nil {
return err
}
return nil
}
173 changes: 173 additions & 0 deletions pkg/logging/json_logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package logging

import (
"bytes"
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"
"time"
)

func TestReadRotatedJSONLog(t *testing.T) {
tmpDir := t.TempDir()
file, err := os.CreateTemp(tmpDir, "logfile")
if err != nil {
t.Errorf("unable to create temp file, error: %s", err.Error())
}
stdoutBuf := &bytes.Buffer{}
stderrBuf := &bytes.Buffer{}
containerStopped := make(chan os.Signal)
// Start to follow the container's log.
fileName := file.Name()
go func() {
lvOpts := LogViewOptions{
Follow: true,
LogPath: fileName,
}
viewLogsJSONFileDirect(lvOpts, file.Name(), stdoutBuf, stderrBuf, containerStopped)
}()

// log in stdout
expectedStdout := "line0\nline1\nline2\nline3\nline4\nline5\nline6\nline7\nline8\nline9\n"
dir := filepath.Dir(file.Name())
baseName := filepath.Base(file.Name())

// Write 10 lines to log file.
// Let ReadLogs start.
time.Sleep(50 * time.Millisecond)

type logContent struct {
Log string `json:"log"`
Stream string `json:"stream"`
Time string `json:"time"`
}

for line := 0; line < 10; line++ {
// Write the first three lines to log file
log := logContent{}
log.Log = fmt.Sprintf("line%d\n", line)
log.Stream = "stdout"
log.Time = time.Now().Format(time.RFC3339Nano)
time.Sleep(1 * time.Millisecond)
logData, _ := json.Marshal(log)
file.Write(logData)

if line == 5 {
file.Close()
// Pretend to rotate the log.
rotatedName := fmt.Sprintf("%s.%s", baseName, time.Now().Format("20060102-150405"))
rotatedName = filepath.Join(dir, rotatedName)
if err := os.Rename(filepath.Join(dir, baseName), rotatedName); err != nil {
t.Errorf("failed to rotate log %q to %q, error: %s", file.Name(), rotatedName, err.Error())
return
}

newF := filepath.Join(dir, baseName)
if file, err = os.Create(newF); err != nil {
t.Errorf("unable to create new log file, error: %s", err.Error())
return
}
time.Sleep(20 * time.Millisecond)
}
}

// Finished writing into the file, close it, so we can delete it later.
err = file.Close()
if err != nil {
t.Errorf("could not close file, error: %s", err.Error())
}

time.Sleep(2 * time.Second)
// Make the function ReadLogs end.
close(containerStopped)

if expectedStdout != stdoutBuf.String() {
t.Errorf("expected: %s, acoutal: %s", expectedStdout, stdoutBuf.String())
}
}

func TestReadJSONLogs(t *testing.T) {
file, err := os.CreateTemp("", "TestFollowLogs")
if err != nil {
t.Fatalf("unable to create temp file")
}
defer os.Remove(file.Name())
file.WriteString(`{"log":"line1\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n")
file.WriteString(`{"log":"line2\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n")
file.WriteString(`{"log":"line3\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n")

stopChan := make(chan os.Signal)
testCases := []struct {
name string
logViewOptions LogViewOptions
expected string
}{
{
name: "default log options should output all lines",
logViewOptions: LogViewOptions{
LogPath: file.Name(),
Tail: 0,
},
expected: "line1\nline2\nline3\n",
},
{
name: "using Tail 2 should output last 2 lines",
logViewOptions: LogViewOptions{
LogPath: file.Name(),
Tail: 2,
},
expected: "line2\nline3\n",
},
{
name: "using Tail 4 should output all lines when the log has less than 4 lines",
logViewOptions: LogViewOptions{
LogPath: file.Name(),
Tail: 4,
},
expected: "line1\nline2\nline3\n",
},
{
name: "using Tail 0 should output all",
logViewOptions: LogViewOptions{
LogPath: file.Name(),
Tail: 0,
},
expected: "line1\nline2\nline3\n",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
stdoutBuf := bytes.NewBuffer(nil)
stderrBuf := bytes.NewBuffer(nil)
err = viewLogsJSONFileDirect(tc.logViewOptions, file.Name(), stdoutBuf, stderrBuf, stopChan)

if err != nil {
t.Fatalf(err.Error())
}
if stderrBuf.Len() > 0 {
t.Fatalf("Stderr: %v", stderrBuf.String())
}
if actual := stdoutBuf.String(); tc.expected != actual {
t.Fatalf("Actual output does not match expected.\nActual: %v\nExpected: %v\n", actual, tc.expected)
}
})
}
}
Loading

0 comments on commit c0b2fed

Please sign in to comment.