From 3d3930b24406c120958332ec1277f25139d1ed90 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Thu, 12 Oct 2023 22:40:55 +0000 Subject: [PATCH] Pump logfiles dry during playback --- events/diskpersist.go | 65 ++++++++++++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/events/diskpersist.go b/events/diskpersist.go index 07c146762..fa5f45b77 100644 --- a/events/diskpersist.go +++ b/events/diskpersist.go @@ -620,16 +620,43 @@ func (dp *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*X return err } - for _, lf := range logs { - if err := dp.readEventsFrom(ctx, since, filepath.Join(dp.primaryDir, lf.Path), cb); err != nil { + for i := 0; i < 10; i++ { + lastSeq, err := dp.PlaybackLogfiles(ctx, since, cb, logs) + if err != nil { return err } - since = 0 + + // If we got a lastSeq, there may be more log files to read + if lastSeq != nil { + if err := dp.meta.Debug().Order("seq_start asc").Find(&logs, "seq_start >= ?", *lastSeq).Error; err != nil { + return err + } + } else { + break + } } return nil } +func (dp *DiskPersistence) PlaybackLogfiles(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error, logFiles []LogFileRef) (*int64, error) { + for i, lf := range logFiles { + lastSeq, err := dp.readEventsFrom(ctx, since, filepath.Join(dp.primaryDir, lf.Path), cb) + if err != nil { + return nil, err + } + since = 0 + if i == len(logFiles)-1 && + lastSeq != nil && + (*lastSeq-lf.SeqStart) == dp.eventsPerFile { + // There may be more log files to read since the last one was full + return lastSeq, nil + } + } + + return nil, nil +} + func postDoNotEmit(flags uint32) bool { if flags&(EvtFlagRebased|EvtFlagTakedown) != 0 { return true @@ -638,16 +665,16 @@ func postDoNotEmit(flags uint32) bool { return false } -func (dp *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn string, cb func(*XRPCStreamEvent) error) error { +func (dp *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn string, cb func(*XRPCStreamEvent) error) (*int64, error) { fi, err := os.OpenFile(fn, os.O_RDONLY, 0) if err != nil { - return err + return nil, err } if since != 0 { lastSeq, err := scanForLastSeq(fi, since) if err != nil { - return err + return nil, err } if since > lastSeq { log.Errorw("playback cursor is greater than last seq of file checked", @@ -655,28 +682,32 @@ func (dp *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn s "lastSeq", lastSeq, "filename", fn, ) - return nil + return nil, nil } } bufr := bufio.NewReader(fi) + lastSeq := int64(0) + scratch := make([]byte, headerSize) for { h, err := readHeader(bufr, scratch) if err != nil { if errors.Is(err, io.EOF) { - return nil + return &lastSeq, nil } - return err + return nil, err } + lastSeq = h.Seq + if postDoNotEmit(h.Flags) { // event taken down, skip _, err := io.CopyN(io.Discard, bufr, h.Len64()) // would be really nice if the buffered reader had a 'skip' method that does a seek under the hood if err != nil { - return fmt.Errorf("failed while skipping event (seq: %d, fn: %q): %w", h.Seq, fn, err) + return nil, fmt.Errorf("failed while skipping event (seq: %d, fn: %q): %w", h.Seq, fn, err) } continue } @@ -685,33 +716,33 @@ func (dp *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn s case evtKindCommit: var evt atproto.SyncSubscribeRepos_Commit if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { - return err + return nil, err } evt.Seq = h.Seq if err := cb(&XRPCStreamEvent{RepoCommit: &evt}); err != nil { - return err + return nil, err } case evtKindHandle: var evt atproto.SyncSubscribeRepos_Handle if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { - return err + return nil, err } evt.Seq = h.Seq if err := cb(&XRPCStreamEvent{RepoHandle: &evt}); err != nil { - return err + return nil, err } case evtKindTombstone: var evt atproto.SyncSubscribeRepos_Tombstone if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { - return err + return nil, err } evt.Seq = h.Seq if err := cb(&XRPCStreamEvent{RepoTombstone: &evt}); err != nil { - return err + return nil, err } default: log.Warnw("unrecognized event kind coming from log file", "seq", h.Seq, "kind", h.Kind) - return fmt.Errorf("halting on unrecognized event kind") + return nil, fmt.Errorf("halting on unrecognized event kind") } } }