diff --git a/sdjournal/journal_test.go b/sdjournal/journal_test.go index 925dd6c4..693f4d6f 100755 --- a/sdjournal/journal_test.go +++ b/sdjournal/journal_test.go @@ -17,12 +17,14 @@ package sdjournal import ( "bytes" + "context" "errors" "fmt" "io" "io/ioutil" "math/rand" "os" + "strconv" "strings" "testing" "time" @@ -84,6 +86,67 @@ func TestJournalFollow(t *testing.T) { } } +func TestJournalFollowTail(t *testing.T) { + documentation := "https://github.com/coreos/go-systemd/" + r, err := NewJournalReader(JournalReaderConfig{ + Since: time.Duration(-15) * time.Second, + Matches: []Match{ + { + Field: SD_JOURNAL_FIELD_PRIORITY, + Value: strconv.Itoa(int(journal.PriInfo)), + }, + { + Field: "DOCUMENTATION", + Value: documentation, + }, + }, + }) + + if err != nil { + t.Fatalf("Error opening journal: %s", err) + } + + if r == nil { + t.Fatal("Got a nil reader") + } + + defer r.Close() + + // start writing some test entries + done := make(chan struct{}, 1) + errCh := make(chan error, 1) + go func() { + for { + select { + case <-done: + return + default: + vars := make(map[string]string) + vars["DOCUMENTATION"] = documentation + if perr := journal.Send(fmt.Sprintf("test message %s", time.Now()), journal.PriInfo, vars); perr != nil { + errCh <- perr + return + } + time.Sleep(time.Second) + } + } + }() + + entries := make(chan *JournalEntry) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5)*time.Second) + defer cancel() + go r.FollowTail(entries, errCh, ctx) + + select { + case err := <-errCh: + t.Fatalf("Error writing to journal: %s", err) + case entry := <-entries: + t.Log("received: " + entry.Cursor) + return + default: + } +} + func TestJournalWait(t *testing.T) { id := time.Now().String() j, err := NewJournal() diff --git a/sdjournal/read.go b/sdjournal/read.go index 51a060fb..65f388ee 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -16,10 +16,12 @@ package sdjournal import ( + "context" "errors" "fmt" "io" "log" + "strconv" "strings" "sync" "time" @@ -256,6 +258,57 @@ process: } } +// SkipN skips the next n entries and returns the number of skipped entries and an eventual error. +func (r *JournalReader) SkipN(n int) (int, error) { + if n < 0 { + return -1, errors.New("can not skip by negative number " + strconv.Itoa(n)) + } + var i int + for i < n { + c, err := r.journal.Next() + if err != nil { + return i, err + } else if c == 0 { + return i, nil + } + i += 1 + } + return i, nil +} + +// FollowTail synchronously follows the JournalReader, writing each new journal entry to entries. +// It will start from the next unread entry. +func (r *JournalReader) FollowTail(entries chan<- *JournalEntry, errors chan<- error, ctx context.Context) { + defer close(entries) + defer close(errors) + for { + for { + select { + case <-ctx.Done(): + fmt.Println("Context done, exit FollowTail") + return + default: + } + if c, err := r.journal.Next(); err != nil { + errors <- err + break + } else if c == 0 { + // EOF, should mean we're at the tail + break + } else if entry, err := r.journal.GetEntry(); err != nil { + errors <- err + } else { + entries <- entry + } + } + + status := r.journal.Wait(200 * time.Millisecond) + if status != SD_JOURNAL_APPEND && status != SD_JOURNAL_INVALIDATE { + continue + } + } +} + // simpleMessageFormatter is the default formatter. // It returns a string representing the current journal entry in a simple format which // includes the entry timestamp and MESSAGE field.