Skip to content

Commit

Permalink
Added possibility to be notified on stream completion
Browse files Browse the repository at this point in the history
It's now possible to define on your handler the method `HandleBlockRangeCompletion(ctx context.Context, cursor *sink.Cursor) error` which will be called back when the `sink.Sinker` instance fully completed the request block range (infinite streaming or terminate because of an error does not trigger it).
  • Loading branch information
maoueh committed Sep 1, 2023
1 parent c4df9d0 commit b6825b8
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v0.3.2

* It's now possible to define on your handler the method `HandleBlockRangeCompletion(ctx context.Context, cursor *sink.Cursor) error` which will be called back when the `sink.Sinker` instance fully completed the request block range (infinite streaming or terminate because of an error does not trigger it).

## v0.3.1

### Substreams Progress Messages
Expand Down
9 changes: 9 additions & 0 deletions sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ func (s *Sinker) Run(ctx context.Context, cursor *Cursor, handler SinkerHandler)
lastCursor, err := s.run(ctx, cursor, handler)
if err == nil {
s.logger.Info("substreams ended correctly, reached your stop block", zap.Stringer("last_block_seen", lastCursor.Block()))

if v, ok := handler.(SinkerCompletionHandler); ok {
s.logger.Info("substreams handler has completion callback defined, calling it")

if err := v.HandleBlockRangeCompletion(ctx, lastCursor); err != nil {
s.Shutdown(fmt.Errorf("sinker completion handler error: %w", err))
return
}
}
}

// If the context is canceled and we are here, it we have stop running without any other error, so Shutdown without error,
Expand Down
16 changes: 16 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@ type SinkerHandler interface {
HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error
}

// SinkerCompletionHandler defines an extra interface that can be implemented on top of `SinkerHandler` where the
// callback will be invoked when the sinker is done processing the requested range. This is useful to implement
// a checkpointing mechanism where when the range has correctly fully processed, you can do something meaningful.
type SinkerCompletionHandler interface {
// HandleBlockRangeCompletion is called when the sinker is done processing the requested range, only when
// the stream has correctly reached its end block. If the sinker is configured to stream live, this callback
// will never be called.
//
// If the sinker terminates with an error, this callback will not be called.
//
// The handler receives the following arguments:
// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
// - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
HandleBlockRangeCompletion(ctx context.Context, cursor *Cursor) error
}

type Cursor struct {
*bstream.Cursor
}
Expand Down

0 comments on commit b6825b8

Please sign in to comment.