Skip to content

Commit

Permalink
relay --time-seq for firehouse loosely bound to now
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Nov 22, 2024
1 parent b13c675 commit 0684381
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
7 changes: 7 additions & 0 deletions cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ func run(args []string) error {
EnvVars: []string{"RELAY_EVENT_PLAYBACK_TTL"},
Value: 72 * time.Hour,
},
&cli.BoolFlag{
Name: "time-seq",
EnvVars: []string{"RELAY_TIME_SEQUENCE"},
Value: false,
Usage: "make outbound firehose sequence number approximately unix microseconds",
},
&cli.IntFlag{
Name: "num-compaction-workers",
EnvVars: []string{"RELAY_NUM_COMPACTION_WORKERS"},
Expand Down Expand Up @@ -382,6 +388,7 @@ func runBigsky(cctx *cli.Context) error {

pOpts := events.DefaultDiskPersistOptions()
pOpts.Retention = cctx.Duration("event-playback-ttl")
pOpts.TimeSequence = cctx.Bool("time-seq")
dp, err := events.NewDiskPersistence(dpd, "", db, pOpts)
if err != nil {
return fmt.Errorf("setting up disk persister: %w", err)
Expand Down
19 changes: 16 additions & 3 deletions events/diskpersist.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type DiskPersistence struct {

logfi *os.File

curSeq int64
curSeq int64
timeSequence bool

uidCache *arc.ARCCache[models.Uid, string] // TODO: unused
didCache *arc.ARCCache[string, models.Uid]
Expand Down Expand Up @@ -76,6 +77,8 @@ type DiskPersistOptions struct {
EventsPerFile int64
WriteBufferSize int
Retention time.Duration

TimeSequence bool
}

func DefaultDiskPersistOptions() *DiskPersistOptions {
Expand Down Expand Up @@ -131,6 +134,7 @@ func NewDiskPersistence(primaryDir, archiveDir string, db *gorm.DB, opts *DiskPe
outbuf: new(bytes.Buffer),
writeBufferSize: opts.WriteBufferSize,
shutdown: make(chan struct{}),
timeSequence: opts.TimeSequence,
}

if err := dp.resumeLog(); err != nil {
Expand Down Expand Up @@ -173,7 +177,7 @@ func (dp *DiskPersistence) resumeLog() error {
return fmt.Errorf("failed to scan log file for last seqno: %w", err)
}

dp.curSeq = seq
dp.curSeq = seq + 1
dp.logfi = fi

return nil
Expand Down Expand Up @@ -443,7 +447,16 @@ func (dp *DiskPersistence) doPersist(ctx context.Context, j persistJob) error {
b := j.Bytes
e := j.Evt
seq := dp.curSeq
dp.curSeq++
if dp.timeSequence {
nextSeq := time.Now().UnixMicro()
if nextSeq <= seq {
// be monotonic
nextSeq = seq + 1
}
dp.curSeq = nextSeq
} else {
dp.curSeq++
}

// Set sequence number in event header
binary.LittleEndian.PutUint64(b[20:], uint64(seq))
Expand Down

0 comments on commit 0684381

Please sign in to comment.