From 2666824033e3caca7d08285f9b998f347e6f10cf Mon Sep 17 00:00:00 2001 From: Marcel Herm Date: Fri, 17 Dec 2021 10:46:39 +0100 Subject: [PATCH 01/10] Add new follow method to follow entries from next to tail --- sdjournal/read.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sdjournal/read.go b/sdjournal/read.go index 51a060fb..23b0be5b 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -256,6 +256,39 @@ process: } } +// FollowTail synchronously follows the JournalReader, writing each new journal entry to writer. The +// follow will continue until a single time.Time is received on the until channel. +func (r *JournalReader) FollowTail(entries chan *JournalEntry) error { + defer close(entries) + + // skip first entry which has already been read + if _, err := r.journal.Next(); err != nil { + return err + } + + for { + status := r.journal.Wait(200 * time.Millisecond) + if status != SD_JOURNAL_APPEND && status != SD_JOURNAL_INVALIDATE { + continue + } + + for { + if c, err := r.journal.Next(); err != nil { + return err + } else if c == 0 { + // EOF, should mean we're at the tail + break + } + + if entry, err := r.journal.GetEntry(); err != nil { + return err + } else { + entries <- entry + } + } + } +} + // simpleMessageFormatter is the default formatter. // It returns a string representing the current journal entry in a simple format which // includes the entry timestamp and MESSAGE field. From ebd2c33d044d2263806fcea01ef78e3029b40008 Mon Sep 17 00:00:00 2001 From: Marcel Herm Date: Fri, 17 Dec 2021 12:42:55 +0100 Subject: [PATCH 02/10] Update method description --- sdjournal/read.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdjournal/read.go b/sdjournal/read.go index 23b0be5b..2c21ade5 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -256,8 +256,8 @@ process: } } -// FollowTail synchronously follows the JournalReader, writing each new journal entry to writer. The -// follow will continue until a single time.Time is received on the until channel. +// FollowTail synchronously follows the JournalReader, writing each new journal entry to entries. +// It will start from the next unread entry. func (r *JournalReader) FollowTail(entries chan *JournalEntry) error { defer close(entries) From 55adede4e2f3d8b27bdb8d701f2b1bde7ce332db Mon Sep 17 00:00:00 2001 From: Marcel Herm Date: Fri, 17 Dec 2021 12:52:22 +0100 Subject: [PATCH 03/10] Add SkipN method to allow follow-decoupled entry skipping --- sdjournal/read.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/sdjournal/read.go b/sdjournal/read.go index 2c21ade5..2012d46c 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -256,16 +256,25 @@ process: } } +// SkipN skips the next n entries and returns the number of skipped entries and an eventual error. +func (r *JournalReader) SkipN(n int) (int, error) { + var i int + for i := 1; i <= n; i++ { + c, err := r.journal.Next() + if err != nil { + return i, err + } else if c == 0 { + return i, nil + } + } + return i, nil +} + // FollowTail synchronously follows the JournalReader, writing each new journal entry to entries. // It will start from the next unread entry. func (r *JournalReader) FollowTail(entries chan *JournalEntry) error { defer close(entries) - // skip first entry which has already been read - if _, err := r.journal.Next(); err != nil { - return err - } - for { status := r.journal.Wait(200 * time.Millisecond) if status != SD_JOURNAL_APPEND && status != SD_JOURNAL_INVALIDATE { From 8ffac0e2c18a046602e392c61e65abd906add7c8 Mon Sep 17 00:00:00 2001 From: Marcel Herm Date: Thu, 31 Mar 2022 16:33:16 +0200 Subject: [PATCH 04/10] Improve error handling, refactor loop, add context to FollowTail --- sdjournal/read.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/sdjournal/read.go b/sdjournal/read.go index 2012d46c..fc7e4a3f 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -16,10 +16,12 @@ package sdjournal import ( + "context" "errors" "fmt" "io" "log" + "strconv" "strings" "sync" "time" @@ -258,24 +260,35 @@ process: // SkipN skips the next n entries and returns the number of skipped entries and an eventual error. func (r *JournalReader) SkipN(n int) (int, error) { + if n < 0 { + return -1, errors.New("can not skip by negative number " + strconv.Itoa(n)) + } var i int - for i := 1; i <= n; i++ { + for i < n { c, err := r.journal.Next() if err != nil { return i, err } else if c == 0 { return i, nil } + i += 1 } return i, nil } // FollowTail synchronously follows the JournalReader, writing each new journal entry to entries. // It will start from the next unread entry. -func (r *JournalReader) FollowTail(entries chan *JournalEntry) error { +func (r *JournalReader) FollowTail(entries chan *JournalEntry, ctx context.Context) error { defer close(entries) for { + select { + case <-ctx.Done(): + fmt.Println("Context done, exit FollowTail") + return nil + default: + } + status := r.journal.Wait(200 * time.Millisecond) if status != SD_JOURNAL_APPEND && status != SD_JOURNAL_INVALIDATE { continue From 56bddaedeeff3038f461b94529680f3cfae7e4bc Mon Sep 17 00:00:00 2001 From: Marcel Herm Date: Fri, 13 May 2022 15:11:22 +0200 Subject: [PATCH 05/10] Add basic test case for FollowTail --- sdjournal/journal_test.go | 58 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/sdjournal/journal_test.go b/sdjournal/journal_test.go index 925dd6c4..d559f91f 100755 --- a/sdjournal/journal_test.go +++ b/sdjournal/journal_test.go @@ -17,6 +17,7 @@ package sdjournal import ( "bytes" + "context" "errors" "fmt" "io" @@ -84,6 +85,63 @@ func TestJournalFollow(t *testing.T) { } } +func TestJournalFollowTail(t *testing.T) { + r, err := NewJournalReader(JournalReaderConfig{ + Since: time.Duration(-15) * time.Second, + Matches: []Match{ + { + Field: SD_JOURNAL_FIELD_SYSTEMD_UNIT, + Value: "NetworkManager.service", + }, + }, + }) + + if err != nil { + t.Fatalf("Error opening journal: %s", err) + } + + if r == nil { + t.Fatal("Got a nil reader") + } + + defer r.Close() + + // start writing some test entries + done := make(chan struct{}, 1) + errCh := make(chan error, 1) + defer close(done) + go func() { + for { + select { + case <-done: + return + default: + if perr := journal.Print(journal.PriInfo, "test message %s", time.Now()); err != nil { + errCh <- perr + return + } + + time.Sleep(time.Second) + } + } + }() + + // and follow the reader synchronously + entries := make(chan *JournalEntry) + timeout := time.Duration(5) * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if err = r.FollowTail(entries, ctx); err != nil { + t.Fatalf("Error during follow: %s", err) + } + + select { + case err := <-errCh: + t.Fatalf("Error writing to journal: %s", err) + default: + } +} + func TestJournalWait(t *testing.T) { id := time.Now().String() j, err := NewJournal() From 263c6a142951c8074218092828570cf35b5f9f0d Mon Sep 17 00:00:00 2001 From: "Dutkowski, Bernd" Date: Thu, 25 Aug 2022 15:39:41 +0200 Subject: [PATCH 06/10] FollowTail: change return of "error" to channel of "error" --- sdjournal/read.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdjournal/read.go b/sdjournal/read.go index fc7e4a3f..85614ecd 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -278,14 +278,14 @@ func (r *JournalReader) SkipN(n int) (int, error) { // FollowTail synchronously follows the JournalReader, writing each new journal entry to entries. // It will start from the next unread entry. -func (r *JournalReader) FollowTail(entries chan *JournalEntry, ctx context.Context) error { +func (r *JournalReader) FollowTail(entries chan<- *JournalEntry, errors chan<- error, ctx context.Context) { defer close(entries) for { select { case <-ctx.Done(): fmt.Println("Context done, exit FollowTail") - return nil + return default: } @@ -296,14 +296,14 @@ func (r *JournalReader) FollowTail(entries chan *JournalEntry, ctx context.Conte for { if c, err := r.journal.Next(); err != nil { - return err + errors <- err } else if c == 0 { // EOF, should mean we're at the tail break } if entry, err := r.journal.GetEntry(); err != nil { - return err + errors <- err } else { entries <- entry } From 4424ec324ff959b6b96afec6cc502318b20a999e Mon Sep 17 00:00:00 2001 From: "Dutkowski, Bernd" Date: Thu, 25 Aug 2022 15:45:22 +0200 Subject: [PATCH 07/10] FollowTail: also close errors chan --- sdjournal/read.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdjournal/read.go b/sdjournal/read.go index 85614ecd..100208c4 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -280,7 +280,7 @@ func (r *JournalReader) SkipN(n int) (int, error) { // It will start from the next unread entry. func (r *JournalReader) FollowTail(entries chan<- *JournalEntry, errors chan<- error, ctx context.Context) { defer close(entries) - + defer close(errors) for { select { case <-ctx.Done(): From 33e4a56e4c1a8136aec279ad835ce7a585deca00 Mon Sep 17 00:00:00 2001 From: "Dutkowski, Bernd" Date: Thu, 25 Aug 2022 15:57:43 +0200 Subject: [PATCH 08/10] FollowTail: adjust test --- sdjournal/journal_test.go | 27 ++++++++++++++++----------- sdjournal/read.go | 24 ++++++++++++------------ 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/sdjournal/journal_test.go b/sdjournal/journal_test.go index d559f91f..693f4d6f 100755 --- a/sdjournal/journal_test.go +++ b/sdjournal/journal_test.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "math/rand" "os" + "strconv" "strings" "testing" "time" @@ -86,12 +87,17 @@ func TestJournalFollow(t *testing.T) { } func TestJournalFollowTail(t *testing.T) { + documentation := "https://github.com/coreos/go-systemd/" r, err := NewJournalReader(JournalReaderConfig{ Since: time.Duration(-15) * time.Second, Matches: []Match{ { - Field: SD_JOURNAL_FIELD_SYSTEMD_UNIT, - Value: "NetworkManager.service", + Field: SD_JOURNAL_FIELD_PRIORITY, + Value: strconv.Itoa(int(journal.PriInfo)), + }, + { + Field: "DOCUMENTATION", + Value: documentation, }, }, }) @@ -109,35 +115,34 @@ func TestJournalFollowTail(t *testing.T) { // start writing some test entries done := make(chan struct{}, 1) errCh := make(chan error, 1) - defer close(done) go func() { for { select { case <-done: return default: - if perr := journal.Print(journal.PriInfo, "test message %s", time.Now()); err != nil { + vars := make(map[string]string) + vars["DOCUMENTATION"] = documentation + if perr := journal.Send(fmt.Sprintf("test message %s", time.Now()), journal.PriInfo, vars); perr != nil { errCh <- perr return } - time.Sleep(time.Second) } } }() - // and follow the reader synchronously entries := make(chan *JournalEntry) - timeout := time.Duration(5) * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5)*time.Second) defer cancel() - if err = r.FollowTail(entries, ctx); err != nil { - t.Fatalf("Error during follow: %s", err) - } + go r.FollowTail(entries, errCh, ctx) select { case err := <-errCh: t.Fatalf("Error writing to journal: %s", err) + case entry := <-entries: + t.Log("received: " + entry.Cursor) + return default: } } diff --git a/sdjournal/read.go b/sdjournal/read.go index 100208c4..879bbbdd 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -294,19 +294,19 @@ func (r *JournalReader) FollowTail(entries chan<- *JournalEntry, errors chan<- e continue } - for { - if c, err := r.journal.Next(); err != nil { - errors <- err - } else if c == 0 { - // EOF, should mean we're at the tail - break - } + if c, err := r.journal.Next(); err != nil { + errors <- err + continue + } else if c == 0 { + // EOF, should mean we're at the tail + break + } - if entry, err := r.journal.GetEntry(); err != nil { - errors <- err - } else { - entries <- entry - } + if entry, err := r.journal.GetEntry(); err != nil { + errors <- err + continue + } else { + entries <- entry } } } From 6da4b9bca569d82116286827e7403dc16a9be2de Mon Sep 17 00:00:00 2001 From: "Dutkowski, Bernd" Date: Tue, 30 Aug 2022 16:15:04 +0200 Subject: [PATCH 09/10] FollowTail: change flow to avoid unnecessary sleep --- sdjournal/read.go | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/sdjournal/read.go b/sdjournal/read.go index 879bbbdd..b07af9a8 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -289,24 +289,23 @@ func (r *JournalReader) FollowTail(entries chan<- *JournalEntry, errors chan<- e default: } - status := r.journal.Wait(200 * time.Millisecond) - if status != SD_JOURNAL_APPEND && status != SD_JOURNAL_INVALIDATE { - continue - } - - if c, err := r.journal.Next(); err != nil { - errors <- err - continue - } else if c == 0 { - // EOF, should mean we're at the tail - break + for { + if c, err := r.journal.Next(); err != nil { + errors <- err + break + } else if c == 0 { + // EOF, should mean we're at the tail + break + } else if entry, err := r.journal.GetEntry(); err != nil { + errors <- err + } else { + entries <- entry + } } - if entry, err := r.journal.GetEntry(); err != nil { - errors <- err + status := r.journal.Wait(200 * time.Millisecond) + if status != SD_JOURNAL_APPEND && status != SD_JOURNAL_INVALIDATE { continue - } else { - entries <- entry } } } From c0bf411b01f1a80384c92de2fad0eea7060b79f4 Mon Sep 17 00:00:00 2001 From: "Dutkowski, Bernd" Date: Wed, 31 Aug 2022 10:31:56 +0200 Subject: [PATCH 10/10] FollowTail: move select on context to inner loop to prevent missing a cancellation of the context on high load --- sdjournal/read.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sdjournal/read.go b/sdjournal/read.go index b07af9a8..65f388ee 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -282,14 +282,13 @@ func (r *JournalReader) FollowTail(entries chan<- *JournalEntry, errors chan<- e defer close(entries) defer close(errors) for { - select { - case <-ctx.Done(): - fmt.Println("Context done, exit FollowTail") - return - default: - } - for { + select { + case <-ctx.Done(): + fmt.Println("Context done, exit FollowTail") + return + default: + } if c, err := r.journal.Next(); err != nil { errors <- err break