Skip to content

Commit

Permalink
Pump logfiles dry during playback
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Oct 12, 2023
1 parent 8b086dc commit 3d3930b
Showing 1 changed file with 48 additions and 17 deletions.
65 changes: 48 additions & 17 deletions events/diskpersist.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,16 +620,43 @@ func (dp *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*X
return err
}

for _, lf := range logs {
if err := dp.readEventsFrom(ctx, since, filepath.Join(dp.primaryDir, lf.Path), cb); err != nil {
for i := 0; i < 10; i++ {
lastSeq, err := dp.PlaybackLogfiles(ctx, since, cb, logs)
if err != nil {
return err
}
since = 0

// If we got a lastSeq, there may be more log files to read
if lastSeq != nil {
if err := dp.meta.Debug().Order("seq_start asc").Find(&logs, "seq_start >= ?", *lastSeq).Error; err != nil {
return err
}
} else {
break
}
}

return nil
}

func (dp *DiskPersistence) PlaybackLogfiles(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error, logFiles []LogFileRef) (*int64, error) {
for i, lf := range logFiles {
lastSeq, err := dp.readEventsFrom(ctx, since, filepath.Join(dp.primaryDir, lf.Path), cb)
if err != nil {
return nil, err
}
since = 0
if i == len(logFiles)-1 &&
lastSeq != nil &&
(*lastSeq-lf.SeqStart) == dp.eventsPerFile {
// There may be more log files to read since the last one was full
return lastSeq, nil
}
}

return nil, nil
}

func postDoNotEmit(flags uint32) bool {
if flags&(EvtFlagRebased|EvtFlagTakedown) != 0 {
return true
Expand All @@ -638,45 +665,49 @@ func postDoNotEmit(flags uint32) bool {
return false
}

func (dp *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn string, cb func(*XRPCStreamEvent) error) error {
func (dp *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn string, cb func(*XRPCStreamEvent) error) (*int64, error) {
fi, err := os.OpenFile(fn, os.O_RDONLY, 0)
if err != nil {
return err
return nil, err
}

if since != 0 {
lastSeq, err := scanForLastSeq(fi, since)
if err != nil {
return err
return nil, err
}
if since > lastSeq {
log.Errorw("playback cursor is greater than last seq of file checked",
"since", since,
"lastSeq", lastSeq,
"filename", fn,
)
return nil
return nil, nil
}
}

bufr := bufio.NewReader(fi)

lastSeq := int64(0)

scratch := make([]byte, headerSize)
for {
h, err := readHeader(bufr, scratch)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
return &lastSeq, nil
}

return err
return nil, err
}

lastSeq = h.Seq

if postDoNotEmit(h.Flags) {
// event taken down, skip
_, err := io.CopyN(io.Discard, bufr, h.Len64()) // would be really nice if the buffered reader had a 'skip' method that does a seek under the hood
if err != nil {
return fmt.Errorf("failed while skipping event (seq: %d, fn: %q): %w", h.Seq, fn, err)
return nil, fmt.Errorf("failed while skipping event (seq: %d, fn: %q): %w", h.Seq, fn, err)
}
continue
}
Expand All @@ -685,33 +716,33 @@ func (dp *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn s
case evtKindCommit:
var evt atproto.SyncSubscribeRepos_Commit
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
return err
return nil, err
}
evt.Seq = h.Seq
if err := cb(&XRPCStreamEvent{RepoCommit: &evt}); err != nil {
return err
return nil, err
}
case evtKindHandle:
var evt atproto.SyncSubscribeRepos_Handle
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
return err
return nil, err
}
evt.Seq = h.Seq
if err := cb(&XRPCStreamEvent{RepoHandle: &evt}); err != nil {
return err
return nil, err
}
case evtKindTombstone:
var evt atproto.SyncSubscribeRepos_Tombstone
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
return err
return nil, err
}
evt.Seq = h.Seq
if err := cb(&XRPCStreamEvent{RepoTombstone: &evt}); err != nil {
return err
return nil, err
}
default:
log.Warnw("unrecognized event kind coming from log file", "seq", h.Seq, "kind", h.Kind)
return fmt.Errorf("halting on unrecognized event kind")
return nil, fmt.Errorf("halting on unrecognized event kind")
}
}
}
Expand Down

0 comments on commit 3d3930b

Please sign in to comment.