diff --git a/go.mod b/go.mod index 1f05721c975..006e9befd83 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/fahedouch/go-logrotate v0.2.1 github.com/fatih/color v1.17.0 github.com/fluent/fluent-logger-golang v1.9.0 + github.com/fsnotify/fsnotify v1.7.0 github.com/ipfs/go-cid v0.4.1 github.com/mattn/go-isatty v0.0.20 github.com/mitchellh/mapstructure v1.5.0 @@ -66,8 +67,6 @@ require ( gotest.tools/v3 v3.5.1 ) -require github.com/go-viper/mapstructure/v2 v2.0.0 // indirect - require ( github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0 // indirect @@ -87,6 +86,7 @@ require ( github.com/go-jose/go-jose/v3 v3.0.3 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.0.0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect diff --git a/go.sum b/go.sum index 20dbd0f22d9..9e8cbf85adf 100644 --- a/go.sum +++ b/go.sum @@ -114,6 +114,8 @@ github.com/fluent/fluent-logger-golang v1.9.0 h1:zUdY44CHX2oIUc7VTNZc+4m+ORuO/ml github.com/fluent/fluent-logger-golang v1.9.0/go.mod h1:2/HCT/jTy78yGyeNGQLGQsjF3zzzAuy6Xlk6FCMV5eU= github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA= github.com/frankban/quicktest v1.14.5/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k= github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= diff --git a/pkg/logging/cri_logger.go b/pkg/logging/cri_logger.go index c3c3b35efc3..d373b2fcf0b 100644 --- a/pkg/logging/cri_logger.go +++ b/pkg/logging/cri_logger.go @@ -25,6 +25,7 @@ package logging import ( "bufio" "bytes" + "context" "errors" "fmt" "io" @@ -35,6 +36,7 @@ import ( "github.com/containerd/log" "github.com/containerd/nerdctl/v2/pkg/logging/tail" + "github.com/fsnotify/fsnotify" ) // LogStreamType is the type of the stream in CRI container log. @@ -45,6 +47,11 @@ const ( Stdout LogStreamType = "stdout" // Stderr is the stream type for stderr. Stderr LogStreamType = "stderr" + + // logForceCheckPeriod is the period to check for a new read + logForceCheckPeriod = 1 * time.Second + // RFC3339NanoLenient is the variable width RFC3339 time format for lenient parsing of strings into timestamps. + RFC3339NanoLenient = "2006-01-02T15:04:05.999999999Z07:00" ) // LogTag is the tag of a log line in CRI container log. @@ -89,7 +96,9 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o if err != nil { return fmt.Errorf("failed to open log file %q: %v", logPath, err) } - defer f.Close() + defer func() { + f.Close() + }() // Search start point based on tail line. start, err := tail.FindTailLineStartIndex(f, opts.Tail) @@ -101,6 +110,8 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o return fmt.Errorf("failed to seek in log file %q: %v", logPath, err) } + var watcher *fsnotify.Watcher + limitedMode := (opts.Tail > 0) && (!opts.Follow) limitedNum := opts.Tail // Start parsing the logs. @@ -110,6 +121,9 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o isNewLine := true writer := newLogWriter(stdout, stderr, opts) msg := &logMessage{} + baseName := filepath.Base(logPath) + dir := filepath.Dir(logPath) + for { select { case <-stopChannel: @@ -126,13 +140,42 @@ func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan o return fmt.Errorf("failed to read log file %q: %v", logPath, err) } if opts.Follow { - // Reset seek so that if this is an incomplete line, // it will be read again. if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil { return fmt.Errorf("failed to reset seek in log file %q: %v", logPath, err) } + if watcher == nil { + // Initialize the watcher if it has not been initialized yet. + if watcher, err = fsnotify.NewWatcher(); err != nil { + return fmt.Errorf("failed to create fsnotify watcher: %v", err) + } + defer watcher.Close() + if err := watcher.Add(dir); err != nil { + return fmt.Errorf("failed to watch directory %q: %w", dir, err) + } + // If we just created the watcher, try again to read as we might have missed + // the event. + continue + } + + var recreated bool + // Wait until the next log change. + recreated, err = startTail(context.Background(), baseName, watcher) + if err != nil { + return err + } + if recreated { + newF, err := openFileShareDelete(logPath) + if err != nil { + return fmt.Errorf("failed to open log file %q: %v", logPath, err) + } + f.Close() + f = newF + r = bufio.NewReader(f) + } + // If the container exited consume data until the next EOF continue } @@ -249,6 +292,36 @@ func (w *logWriter) write(msg *logMessage, addPrefix bool) error { return nil } +// startTail wait for the next log write. +// the boolean value indicates if the log file was recreated; +// the error is error happens during waiting new logs. +func startTail(ctx context.Context, logName string, w *fsnotify.Watcher) (bool, error) { + errRetry := 5 + for { + select { + case <-ctx.Done(): + return false, fmt.Errorf("context cancelled") + case e := <-w.Events: + switch e.Op { + case fsnotify.Write: + return false, nil + case fsnotify.Create: + return filepath.Base(e.Name) == logName, nil + default: + log.L.Debugf("Received unexpected fsnotify event: %v, retrying", e) + } + case err := <-w.Errors: + log.L.Debugf("Received fsnotify watch error, retrying unless no more retries left, retries: %d, error: %s", errRetry, err) + if errRetry == 0 { + return false, err + } + errRetry-- + case <-time.After(logForceCheckPeriod): + return false, nil + } + } +} + // logMessage is the CRI internal log type. type logMessage struct { timestamp time.Time diff --git a/pkg/logging/cri_logger_test.go b/pkg/logging/cri_logger_test.go index 3788a3c706b..98f4ea7a09d 100644 --- a/pkg/logging/cri_logger_test.go +++ b/pkg/logging/cri_logger_test.go @@ -28,6 +28,7 @@ import ( "fmt" "io" "os" + "path/filepath" "reflect" "testing" "time" @@ -230,3 +231,85 @@ func TestReadLogsLimitsWithTimestamps(t *testing.T) { t.Errorf("should have two lines, lineCount= %d", lineCount) } } + +func TestReadRotatedLog(t *testing.T) { + tmpDir := t.TempDir() + file, err := os.CreateTemp(tmpDir, "logfile") + if err != nil { + t.Errorf("unable to create temp file, error: %s", err.Error()) + } + stdoutBuf := &bytes.Buffer{} + stderrBuf := &bytes.Buffer{} + containerStoped := make(chan os.Signal) + // Start to follow the container's log. + fileName := file.Name() + go func() { + lvOpts := &LogViewOptions{ + Follow: true, + LogPath: fileName, + } + _ = ReadLogs(lvOpts, stdoutBuf, stderrBuf, containerStoped) + }() + + // log in stdout + expectedStdout := "line0line2line4line6line8" + // log in stderr + expectedStderr := "line1line3line5line7line9" + + dir := filepath.Dir(file.Name()) + baseName := filepath.Base(file.Name()) + + // Write 10 lines to log file. + // Let ReadLogs start. + time.Sleep(50 * time.Millisecond) + + for line := 0; line < 10; line++ { + // Write the first three lines to log file + now := time.Now().Format(time.RFC3339Nano) + if line%2 == 0 { + file.WriteString(fmt.Sprintf( + "%s stdout P line%d\n", now, line)) + } else { + file.WriteString(fmt.Sprintf( + "%s stderr P line%d\n", now, line)) + } + + time.Sleep(1 * time.Millisecond) + + if line == 5 { + file.Close() + // Pretend to rotate the log. + rotatedName := fmt.Sprintf("%s.%s", baseName, time.Now().Format("220060102-150405")) + rotatedName = filepath.Join(dir, rotatedName) + if err := os.Rename(filepath.Join(dir, baseName), rotatedName); err != nil { + t.Errorf("failed to rotate log %q to %q, error: %s", file.Name(), rotatedName, err.Error()) + return + } + + newF := filepath.Join(dir, baseName) + if file, err = os.Create(newF); err != nil { + t.Errorf("unable to create new log file, error: %s", err.Error()) + return + } + time.Sleep(20 * time.Millisecond) + } + } + + // Finished writing into the file, close it, so we can delete it later. + err = file.Close() + if err != nil { + t.Errorf("could not close file, error: %s", err.Error()) + } + + time.Sleep(20 * time.Millisecond) + // Make the function ReadLogs end. + close(containerStoped) + + if expectedStdout != stdoutBuf.String() { + t.Errorf("expected: %s, acoutal: %s", expectedStdout, stdoutBuf.String()) + } + + if expectedStderr != stderrBuf.String() { + t.Errorf("expected: %s, acoutal: %s", expectedStderr, stderrBuf.String()) + } +} diff --git a/pkg/logging/logs_other.go b/pkg/logging/logs_other.go new file mode 100644 index 00000000000..6270efbe0fe --- /dev/null +++ b/pkg/logging/logs_other.go @@ -0,0 +1,34 @@ +//go:build !windows +// +build !windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/* +Forked from https://github.com/kubernetes/kubernetes/blob/cc60b26dee4768e3c5aa0515bbf4ba1824ad38dc/staging/src/k8s.io/cri-client/pkg/logs/logs_other.go +Copyright The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 +*/ +package logging + +import ( + "os" +) + +func openFileShareDelete(path string) (*os.File, error) { + // Noop. Only relevant for Windows. + return os.Open(path) +} diff --git a/pkg/logging/logs_windows.go b/pkg/logging/logs_windows.go new file mode 100644 index 00000000000..262ac0c2d42 --- /dev/null +++ b/pkg/logging/logs_windows.go @@ -0,0 +1,54 @@ +//go:build windows +// +build windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/* +Forked from https://github.com/kubernetes/kubernetes/blob/cc60b26dee4768e3c5aa0515bbf4ba1824ad38dc/staging/src/k8s.io/cri-client/pkg/logs/logs_windows.go +Copyright The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 +*/ +package logging + +import ( + "os" + "syscall" +) + +// Based on Windows implementation of Windows' syscall.Open +// https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/syscall/syscall_windows.go;l=342 +// In addition to syscall.Open, this function also adds the syscall.FILE_SHARE_DELETE flag to sharemode, +// which will allow us to read from the file without blocking the file from being deleted or renamed. +// This is essential for Log Rotation which is done by renaming the open file. Without this, the file rename would fail. +func openFileShareDelete(path string) (*os.File, error) { + pathp, err := syscall.UTF16PtrFromString(path) + if err != nil { + return nil, err + } + + var access uint32 = syscall.GENERIC_READ + var sharemode uint32 = syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE | syscall.FILE_SHARE_DELETE + var createmode uint32 = syscall.OPEN_EXISTING + var attrs uint32 = syscall.FILE_ATTRIBUTE_NORMAL + + handle, err := syscall.CreateFile(pathp, access, sharemode, nil, createmode, attrs, 0) + if err != nil { + return nil, err + } + + return os.NewFile(uintptr(handle), path), nil +}