From b6825b877ed76e1cb2429e88d47716b711eec199 Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Thu, 31 Aug 2023 21:36:13 -0400 Subject: [PATCH] Added possibility to be notified on stream completion It's now possible to define on your handler the method `HandleBlockRangeCompletion(ctx context.Context, cursor *sink.Cursor) error` which will be called back when the `sink.Sinker` instance fully completed the request block range (infinite streaming or terminate because of an error does not trigger it). --- CHANGELOG.md | 4 ++++ sinker.go | 9 +++++++++ types.go | 16 ++++++++++++++++ 3 files changed, 29 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1642b81..352433f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## v0.3.2 + +* It's now possible to define on your handler the method `HandleBlockRangeCompletion(ctx context.Context, cursor *sink.Cursor) error` which will be called back when the `sink.Sinker` instance fully completed the request block range (infinite streaming or terminate because of an error does not trigger it). + ## v0.3.1 ### Substreams Progress Messages diff --git a/sinker.go b/sinker.go index 3928b58..e772af5 100644 --- a/sinker.go +++ b/sinker.go @@ -199,6 +199,15 @@ func (s *Sinker) Run(ctx context.Context, cursor *Cursor, handler SinkerHandler) lastCursor, err := s.run(ctx, cursor, handler) if err == nil { s.logger.Info("substreams ended correctly, reached your stop block", zap.Stringer("last_block_seen", lastCursor.Block())) + + if v, ok := handler.(SinkerCompletionHandler); ok { + s.logger.Info("substreams handler has completion callback defined, calling it") + + if err := v.HandleBlockRangeCompletion(ctx, lastCursor); err != nil { + s.Shutdown(fmt.Errorf("sinker completion handler error: %w", err)) + return + } + } } // If the context is canceled and we are here, it we have stop running without any other error, so Shutdown without error, diff --git a/types.go b/types.go index aaed9e6..e16454a 100644 --- a/types.go +++ b/types.go @@ -63,6 +63,22 @@ type SinkerHandler interface { HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error } +// SinkerCompletionHandler defines an extra interface that can be implemented on top of `SinkerHandler` where the +// callback will be invoked when the sinker is done processing the requested range. This is useful to implement +// a checkpointing mechanism where when the range has correctly fully processed, you can do something meaningful. +type SinkerCompletionHandler interface { + // HandleBlockRangeCompletion is called when the sinker is done processing the requested range, only when + // the stream has correctly reached its end block. If the sinker is configured to stream live, this callback + // will never be called. + // + // If the sinker terminates with an error, this callback will not be called. + // + // The handler receives the following arguments: + // - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this. + // - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted. + HandleBlockRangeCompletion(ctx context.Context, cursor *Cursor) error +} + type Cursor struct { *bstream.Cursor }