Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new follow method (second take) #407

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
63 changes: 63 additions & 0 deletions sdjournal/journal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package sdjournal

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -84,6 +86,67 @@ func TestJournalFollow(t *testing.T) {
}
}

func TestJournalFollowTail(t *testing.T) {
documentation := "https://github.com/coreos/go-systemd/"
r, err := NewJournalReader(JournalReaderConfig{
Since: time.Duration(-15) * time.Second,
Matches: []Match{
{
Field: SD_JOURNAL_FIELD_PRIORITY,
Value: strconv.Itoa(int(journal.PriInfo)),
},
{
Field: "DOCUMENTATION",
Value: documentation,
},
},
})

if err != nil {
t.Fatalf("Error opening journal: %s", err)
}

if r == nil {
t.Fatal("Got a nil reader")
}

defer r.Close()

// start writing some test entries
done := make(chan struct{}, 1)
errCh := make(chan error, 1)
go func() {
for {
select {
case <-done:
return
default:
vars := make(map[string]string)
vars["DOCUMENTATION"] = documentation
if perr := journal.Send(fmt.Sprintf("test message %s", time.Now()), journal.PriInfo, vars); perr != nil {
errCh <- perr
return
}
time.Sleep(time.Second)
}
}
}()

entries := make(chan *JournalEntry)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5)*time.Second)
defer cancel()
go r.FollowTail(entries, errCh, ctx)

select {
case err := <-errCh:
t.Fatalf("Error writing to journal: %s", err)
case entry := <-entries:
t.Log("received: " + entry.Cursor)
return
default:
}
}

func TestJournalWait(t *testing.T) {
id := time.Now().String()
j, err := NewJournal()
Expand Down
53 changes: 53 additions & 0 deletions sdjournal/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package sdjournal

import (
"context"
"errors"
"fmt"
"io"
"log"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -256,6 +258,57 @@ process:
}
}

// SkipN skips the next n entries and returns the number of skipped entries and an eventual error.
func (r *JournalReader) SkipN(n int) (int, error) {
if n < 0 {
return -1, errors.New("can not skip by negative number " + strconv.Itoa(n))
}
var i int
for i < n {
c, err := r.journal.Next()
if err != nil {
return i, err
} else if c == 0 {
return i, nil
}
i += 1
}
return i, nil
}

// FollowTail synchronously follows the JournalReader, writing each new journal entry to entries.
// It will start from the next unread entry.
func (r *JournalReader) FollowTail(entries chan<- *JournalEntry, errors chan<- error, ctx context.Context) {
defer close(entries)
defer close(errors)
for {
for {
select {
case <-ctx.Done():
fmt.Println("Context done, exit FollowTail")
return
default:
}
if c, err := r.journal.Next(); err != nil {
errors <- err
break
} else if c == 0 {
// EOF, should mean we're at the tail
break
} else if entry, err := r.journal.GetEntry(); err != nil {
errors <- err
} else {
entries <- entry
}
}

status := r.journal.Wait(200 * time.Millisecond)
if status != SD_JOURNAL_APPEND && status != SD_JOURNAL_INVALIDATE {
continue
}
}
}

// simpleMessageFormatter is the default formatter.
// It returns a string representing the current journal entry in a simple format which
// includes the entry timestamp and MESSAGE field.
Expand Down