Skip to content

Commit

Permalink
fix: output log after log rotation
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew-Liggitt <[email protected]>
  • Loading branch information
Andrew-Liggitt authored and Andrew-Liggitt committed Jul 7, 2024
1 parent 3acf57b commit c1c5879
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 4 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/fahedouch/go-logrotate v0.2.1
github.com/fatih/color v1.17.0
github.com/fluent/fluent-logger-golang v1.9.0
github.com/fsnotify/fsnotify v1.7.0
github.com/ipfs/go-cid v0.4.1
github.com/mattn/go-isatty v0.0.20
github.com/mitchellh/mapstructure v1.5.0
Expand Down Expand Up @@ -66,8 +67,6 @@ require (
gotest.tools/v3 v3.5.1
)

require github.com/go-viper/mapstructure/v2 v2.0.0 // indirect

require (
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect
github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0 // indirect
Expand All @@ -87,6 +86,7 @@ require (
github.com/go-jose/go-jose/v3 v3.0.3 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ github.com/fluent/fluent-logger-golang v1.9.0 h1:zUdY44CHX2oIUc7VTNZc+4m+ORuO/ml
github.com/fluent/fluent-logger-golang v1.9.0/go.mod h1:2/HCT/jTy78yGyeNGQLGQsjF3zzzAuy6Xlk6FCMV5eU=
github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA=
github.com/frankban/quicktest v1.14.5/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k=
github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand Down
83 changes: 81 additions & 2 deletions pkg/logging/cri_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package logging
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
Expand All @@ -35,6 +36,7 @@ import (

"github.com/containerd/log"
"github.com/containerd/nerdctl/v2/pkg/logging/tail"
"github.com/fsnotify/fsnotify"
)

// LogStreamType is the type of the stream in CRI container log.
Expand All @@ -45,6 +47,11 @@ const (
Stdout LogStreamType = "stdout"
// Stderr is the stream type for stderr.
Stderr LogStreamType = "stderr"

// logForceCheckPeriod is the period to check for a new read
logForceCheckPeriod = 1 * time.Second
// RFC3339NanoLenient is the variable width RFC3339 time format for lenient parsing of strings into timestamps.
RFC3339NanoLenient = "2006-01-02T15:04:05.999999999Z07:00"
)

// LogTag is the tag of a log line in CRI container log.
Expand Down Expand Up @@ -89,7 +96,9 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o
if err != nil {
return fmt.Errorf("failed to open log file %q: %v", logPath, err)
}
defer f.Close()
defer func() {
f.Close()
}()

// Search start point based on tail line.
start, err := tail.FindTailLineStartIndex(f, opts.Tail)
Expand All @@ -101,15 +110,20 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o
return fmt.Errorf("failed to seek in log file %q: %v", logPath, err)
}

var watcher *fsnotify.Watcher

limitedMode := (opts.Tail > 0) && (!opts.Follow)
limitedNum := opts.Tail
// Start parsing the logs.
r := bufio.NewReader(f)

var stop bool
isNewLine := true
continueNext := true
writer := newLogWriter(stdout, stderr, opts)
msg := &logMessage{}
baseName := filepath.Base(logPath)
dir := filepath.Dir(logPath)
for {
select {
case <-stopChannel:
Expand All @@ -126,13 +140,48 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o
return fmt.Errorf("failed to read log file %q: %v", logPath, err)
}
if opts.Follow {

if !continueNext {
return nil
}
// Reset seek so that if this is an incomplete line,
// it will be read again.
if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil {
return fmt.Errorf("failed to reset seek in log file %q: %v", logPath, err)
}

if watcher == nil {
// Initialize the watcher if it has not been initialized yet.
if watcher, err = fsnotify.NewWatcher(); err != nil {
return fmt.Errorf("failed to create fsnotify watcher: %v", err)
}
defer watcher.Close()
if err := watcher.Add(dir); err != nil {
return fmt.Errorf("failed to watch directory %q: %w", dir, err)
}
// If we just created the watcher, try again to read as we might have missed
// the event.
continue
}

var recreated bool
// Wait until the next log change.
continueNext, recreated, err = waitLogs(context.Background(), opts.ContainerID, baseName, watcher)
if err != nil {
return err
}
if recreated {
newF, err := openFileShareDelete(logPath)
if err != nil {
if os.IsNotExist(err) {
continue
}
return fmt.Errorf("failed to open log file %q: %v", logPath, err)
}
f.Close()
f = newF
r = bufio.NewReader(f)
}

// If the container exited consume data until the next EOF
continue
}
Expand Down Expand Up @@ -249,6 +298,36 @@ func (w *logWriter) write(msg *logMessage, addPrefix bool) error {
return nil
}

// waitLogs wait for the next log write. It returns two booleans and an error. The first boolean
// indicates whether a new log is found; the second boolean if the log file was recreated;
// the error is error happens during waiting new logs.
func waitLogs(ctx context.Context, id string, logName string, w *fsnotify.Watcher) (bool, bool, error) {
errRetry := 5
for {
select {
case <-ctx.Done():
return false, false, fmt.Errorf("context cancelled")
case e := <-w.Events:
switch e.Op {
case fsnotify.Write, fsnotify.Rename, fsnotify.Remove, fsnotify.Chmod:
return true, false, nil
case fsnotify.Create:
return true, filepath.Base(e.Name) == logName, nil
default:
log.L.Debugf("Received unexpected fsnotify event: %v, retrying", e)
}
case err := <-w.Errors:
log.L.Debugf("Received fsnotify watch error, retrying unless no more retries left, retries: %d, error: %s", errRetry, err)
if errRetry == 0 {
return false, false, err
}
errRetry--
case <-time.After(logForceCheckPeriod):
return true, false, nil
}
}
}

// logMessage is the CRI internal log type.
type logMessage struct {
timestamp time.Time
Expand Down
82 changes: 82 additions & 0 deletions pkg/logging/cri_logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -230,3 +231,84 @@ func TestReadLogsLimitsWithTimestamps(t *testing.T) {
t.Errorf("should have two lines, lineCount= %d", lineCount)
}
}

func TestReadRotatedLog(t *testing.T) {
tmpDir := t.TempDir()
file, err := os.CreateTemp(tmpDir, "logfile")
if err != nil {
t.Errorf("unable to create temp file, error: %s", err.Error())
}
stdoutBuf := &bytes.Buffer{}
stderrBuf := &bytes.Buffer{}
containerStoped := make(chan os.Signal)
// Start to follow the container's log.
fileName := file.Name()
go func() {
lvOpts := &LogViewOptions{
Follow: true,
LogPath: fileName,
}
_ = ReadLogs(lvOpts, stdoutBuf, stderrBuf, containerStoped)
}()

// log in stdout
expectedStdout := "line0line2line4line6line8"
// log in stderr
expectedStderr := "line1line3line5line7line9"

dir := filepath.Dir(file.Name())
baseName := filepath.Base(file.Name())

// Write 10 lines to log file.
// Let ReadLogs start.
time.Sleep(50 * time.Millisecond)

for line := 0; line < 10; line++ {
// Write the first three lines to log file
now := time.Now().Format(time.RFC3339Nano)
if line%2 == 0 {
file.WriteString(fmt.Sprintf(
"%s stdout P line%d\n", now, line))
} else {
file.WriteString(fmt.Sprintf(
"%s stderr P line%d\n", now, line))
}
time.Sleep(1 * time.Millisecond)

if line == 5 {
file.Close()
// Pretend to rotate the log.
rotatedName := fmt.Sprintf("%s.%s", baseName, time.Now().Format("220060102-150405"))
rotatedName = filepath.Join(dir, rotatedName)
if err := os.Rename(filepath.Join(dir, baseName), rotatedName); err != nil {
t.Errorf("failed to rotate log %q to %q, error: %s", file.Name(), rotatedName, err.Error())
return
}

newF := filepath.Join(dir, baseName)
if file, err = os.Create(newF); err != nil {
t.Errorf("unable to create new log file, error: %s", err.Error())
return
}
time.Sleep(20 * time.Millisecond)
}
}

// Finished writing into the file, close it, so we can delete it later.
err = file.Close()
if err != nil {
t.Errorf("could not close file, error: %s", err.Error())
}

time.Sleep(20 * time.Millisecond)
// Make the function ReadLogs end.
close(containerStoped)

if expectedStdout != stdoutBuf.String() {
t.Errorf("expected: %s, acoutal: %s", expectedStdout, stdoutBuf.String())
}

if expectedStderr != stderrBuf.String() {
t.Errorf("expected: %s, acoutal: %s", expectedStdout, stdoutBuf.String())
}
}
34 changes: 34 additions & 0 deletions pkg/logging/logs_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//go:build !windows
// +build !windows

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
Forked from https://github.com/kubernetes/kubernetes/blob/cc60b26dee4768e3c5aa0515bbf4ba1824ad38dc/staging/src/k8s.io/cri-client/pkg/logs/logs_other.go
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0
*/
package logging

import (
"os"
)

func openFileShareDelete(path string) (*os.File, error) {
// Noop. Only relevant for Windows.
return os.Open(path)
}
54 changes: 54 additions & 0 deletions pkg/logging/logs_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//go:build windows
// +build windows

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
Forked from https://github.com/kubernetes/kubernetes/blob/cc60b26dee4768e3c5aa0515bbf4ba1824ad38dc/staging/src/k8s.io/cri-client/pkg/logs/logs_windows.go
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0
*/
package logging

import (
"os"
"syscall"
)

// Based on Windows implementation of Windows' syscall.Open
// https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/syscall/syscall_windows.go;l=342
// In addition to syscall.Open, this function also adds the syscall.FILE_SHARE_DELETE flag to sharemode,
// which will allow us to read from the file without blocking the file from being deleted or renamed.
// This is essential for Log Rotation which is done by renaming the open file. Without this, the file rename would fail.
func openFileShareDelete(path string) (*os.File, error) {
pathp, err := syscall.UTF16PtrFromString(path)
if err != nil {
return nil, err
}

var access uint32 = syscall.GENERIC_READ
var sharemode uint32 = syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE | syscall.FILE_SHARE_DELETE
var createmode uint32 = syscall.OPEN_EXISTING
var attrs uint32 = syscall.FILE_ATTRIBUTE_NORMAL

handle, err := syscall.CreateFile(pathp, access, sharemode, nil, createmode, attrs, 0)
if err != nil {
return nil, err
}

return os.NewFile(uintptr(handle), path), nil
}

0 comments on commit c1c5879

Please sign in to comment.