From 3ded2bf28d17ab23c49753adfa3e150c0e44298f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Thu, 31 Oct 2024 17:07:09 +0100 Subject: [PATCH 1/6] added new flags to configure flush intervals --- CHANGELOG.md | 2 + cmd/substreams-sink-sql/common_flags.go | 6 ++- cmd/substreams-sink-sql/create_user.go | 2 +- cmd/substreams-sink-sql/generate_csv.go | 2 +- cmd/substreams-sink-sql/run.go | 6 ++- cmd/substreams-sink-sql/setup.go | 2 +- cmd/substreams-sink-sql/tools.go | 2 +- db/db.go | 54 ++++++++++++++++--------- db/testing.go | 2 +- sinker/sinker.go | 22 +++++----- 10 files changed, 62 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 67e6914..8fd7587 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased * Added support for the Clickhouse `Date` type. +* Added more flags to configure flushing intervals. Available flags are `batch-block-flush-interval`, `batch-row-flush-interval` and `live-block-flush-interval`. +* Deprecated the existing `flush-interval` flag in favor of `batch-block-flush-interval`. ## v4.3.0 diff --git a/cmd/substreams-sink-sql/common_flags.go b/cmd/substreams-sink-sql/common_flags.go index 5c63407..a8f6892 100644 --- a/cmd/substreams-sink-sql/common_flags.go +++ b/cmd/substreams-sink-sql/common_flags.go @@ -57,13 +57,15 @@ func extractSinkConfig(pkg *pbsubstreams.Package) (*pbsql.Service, error) { func newDBLoader( cmd *cobra.Command, psqlDSN string, - flushInterval time.Duration, + batchBlockFlushInterval int, + batchRowFlushInterval int, + liveBlockFlushInterval int, handleReorgs bool, ) (*db.Loader, error) { moduleMismatchMode, err := db.ParseOnModuleHashMismatch(sflags.MustGetString(cmd, onModuleHashMistmatchFlag)) cli.NoError(err, "invalid mistmatch mode") - dbLoader, err := db.NewLoader(psqlDSN, flushInterval, moduleMismatchMode, &handleReorgs, zlog, tracer) + dbLoader, err := db.NewLoader(psqlDSN, batchBlockFlushInterval, batchRowFlushInterval, liveBlockFlushInterval, moduleMismatchMode, &handleReorgs, zlog, tracer) if err != nil { return nil, fmt.Errorf("new psql loader: %w", err) } diff --git a/cmd/substreams-sink-sql/create_user.go b/cmd/substreams-sink-sql/create_user.go index f8a91a4..5eb1a55 100644 --- a/cmd/substreams-sink-sql/create_user.go +++ b/cmd/substreams-sink-sql/create_user.go @@ -45,7 +45,7 @@ func createUserE(cmd *cobra.Command, args []string) error { } if err := retry(ctx, func(ctx context.Context) error { - dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, nil, zlog, tracer) + dbLoader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchError, nil, zlog, tracer) if err != nil { return fmt.Errorf("new psql loader: %w", err) } diff --git a/cmd/substreams-sink-sql/generate_csv.go b/cmd/substreams-sink-sql/generate_csv.go index 8f02136..b5d1056 100644 --- a/cmd/substreams-sink-sql/generate_csv.go +++ b/cmd/substreams-sink-sql/generate_csv.go @@ -107,7 +107,7 @@ func generateCsvE(cmd *cobra.Command, args []string) error { return fmt.Errorf("new base sinker: %w", err) } - dbLoader, err := newDBLoader(cmd, dsn, 0, false) // flush interval not used in CSV mode + dbLoader, err := newDBLoader(cmd, dsn, 0, 0, 0, false) // flush interval not used in CSV mode if err != nil { return fmt.Errorf("new db loader: %w", err) } diff --git a/cmd/substreams-sink-sql/run.go b/cmd/substreams-sink-sql/run.go index ea2a4de..25abe3c 100644 --- a/cmd/substreams-sink-sql/run.go +++ b/cmd/substreams-sink-sql/run.go @@ -28,7 +28,9 @@ var sinkRunCmd = Command(sinkRunE, AddCommonSinkerFlags(flags) flags.Int("undo-buffer-size", 0, "If non-zero, handling of reorgs in the database is disabled. Instead, a buffer is introduced to only process a blocks once it has been confirmed by that many blocks, introducing a latency but slightly reducing the load on the database when close to head.") - flags.Int("flush-interval", 1000, "When in catch up mode, flush every N blocks") + flags.Int("batch-block-flush-interval", 1_000, "When in catch up mode, flush every N blocks or after batch-row-flush-interval, whichever comes first. Set to 0 to disable and only use batch-row-flush-interval.") + flags.Int("batch-row-flush-interval", 100_000, "When in catch up mode, flush every N rows or after batch-block-flush-interval, whichever comes first. Set to 0 to disable and only use batch-block-flush-interval.") + flags.Int("live-block-flush-interval", 1, "When processing in live mode, flush every N blocks.") flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`") }), Example("substreams-sink-sql run 'postgres://localhost:5432/posgres?sslmode=disable' uniswap-v3@v0.2.10"), @@ -85,7 +87,7 @@ func sinkRunE(cmd *cobra.Command, args []string) error { return fmt.Errorf("new base sinker: %w", err) } - dbLoader, err := newDBLoader(cmd, dsn, sflags.MustGetDuration(cmd, "flush-interval"), handleReorgs) + dbLoader, err := newDBLoader(cmd, dsn, sflags.MustGetInt(cmd, "batch-block-flush-interval"), sflags.MustGetInt(cmd, "batch-row-flush-interval"), sflags.MustGetInt(cmd, "live-block-flush-interval"), handleReorgs) if err != nil { return fmt.Errorf("new db loader: %w", err) } diff --git a/cmd/substreams-sink-sql/setup.go b/cmd/substreams-sink-sql/setup.go index b11dcc1..f354542 100644 --- a/cmd/substreams-sink-sql/setup.go +++ b/cmd/substreams-sink-sql/setup.go @@ -46,7 +46,7 @@ func sinkSetupE(cmd *cobra.Command, args []string) error { return fmt.Errorf("extract sink config: %w", err) } - dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, nil, zlog, tracer) + dbLoader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchError, nil, zlog, tracer) if err != nil { return fmt.Errorf("new psql loader: %w", err) } diff --git a/cmd/substreams-sink-sql/tools.go b/cmd/substreams-sink-sql/tools.go index 398228c..c67a900 100644 --- a/cmd/substreams-sink-sql/tools.go +++ b/cmd/substreams-sink-sql/tools.go @@ -145,7 +145,7 @@ func toolsDeleteCursorE(cmd *cobra.Command, args []string) error { func toolsCreateLoader() *db.Loader { dsn := viper.GetString("tools-global-dsn") - loader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchIgnore, nil, zlog, tracer) + loader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchIgnore, nil, zlog, tracer) cli.NoError(err, "Unable to instantiate database manager from DSN %q", dsn) if err := loader.LoadTables(); err != nil { diff --git a/db/db.go b/db/db.go index d03eb65..9367dd2 100644 --- a/db/db.go +++ b/db/db.go @@ -4,8 +4,6 @@ import ( "context" "database/sql" "fmt" - "time" - "github.com/jimsmart/schema" "github.com/streamingfast/logging" orderedmap "github.com/wk8/go-ordered-map/v2" @@ -39,9 +37,11 @@ type Loader struct { tables map[string]*TableInfo cursorTable *TableInfo - handleReorgs bool - flushInterval time.Duration - moduleMismatchMode OnModuleHashMismatch + handleReorgs bool + batchBlockFlushInterval int + batchRowFlushInterval int + liveBlockFlushInterval int + moduleMismatchMode OnModuleHashMismatch logger *zap.Logger tracer logging.Tracer @@ -51,7 +51,9 @@ type Loader struct { func NewLoader( psqlDsn string, - flushInterval time.Duration, + batchBlockFlushInterval int, + batchRowFlushInterval int, + liveBlockFlushInterval int, moduleMismatchMode OnModuleHashMismatch, handleReorgs *bool, logger *zap.Logger, @@ -68,15 +70,17 @@ func NewLoader( } l := &Loader{ - DB: db, - database: dsn.database, - schema: dsn.schema, - entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](), - tables: map[string]*TableInfo{}, - flushInterval: flushInterval, - moduleMismatchMode: moduleMismatchMode, - logger: logger, - tracer: tracer, + DB: db, + database: dsn.database, + schema: dsn.schema, + entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](), + tables: map[string]*TableInfo{}, + batchBlockFlushInterval: batchBlockFlushInterval, + batchRowFlushInterval: batchRowFlushInterval, + liveBlockFlushInterval: liveBlockFlushInterval, + moduleMismatchMode: moduleMismatchMode, + logger: logger, + tracer: tracer, } _, err = l.tryDialect() if err != nil { @@ -95,7 +99,9 @@ func NewLoader( } logger.Info("created new DB loader", - zap.Duration("flush_interval", flushInterval), + zap.Int("batch_block_flush_interval", batchBlockFlushInterval), + zap.Int("batch_row_flush_interval", batchRowFlushInterval), + zap.Int("live_block_flush_interval", liveBlockFlushInterval), zap.String("driver", dsn.driver), zap.String("database", dsn.database), zap.String("schema", dsn.schema), @@ -129,8 +135,20 @@ func (l *Loader) BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error) { return l.DB.BeginTx(ctx, opts) } -func (l *Loader) FlushInterval() time.Duration { - return l.flushInterval +func (l *Loader) BatchBlockFlushInterval() int { + return l.batchBlockFlushInterval +} + +func (l *Loader) LiveBlockFlushInterval() int { + return l.liveBlockFlushInterval +} + +func (l *Loader) FlushNeeded() bool { + totalRows := 0 + for pair := l.entries.Oldest(); pair != nil; pair = pair.Next() { + totalRows += pair.Value.Len() + } + return totalRows > l.batchRowFlushInterval } func (l *Loader) LoadTables() error { diff --git a/db/testing.go b/db/testing.go index 252207a..523d0d4 100644 --- a/db/testing.go +++ b/db/testing.go @@ -15,7 +15,7 @@ func NewTestLoader( tables map[string]*TableInfo, ) (*Loader, *TestTx) { - loader, err := NewLoader("psql://x:5432/x", 0, OnModuleHashMismatchIgnore, nil, zlog, tracer) + loader, err := NewLoader("psql://x:5432/x", 0, 0, 0, OnModuleHashMismatchIgnore, nil, zlog, tracer) if err != nil { panic(err) } diff --git a/sinker/sinker.go b/sinker/sinker.go index 1ce13fc..4d17462 100644 --- a/sinker/sinker.go +++ b/sinker/sinker.go @@ -17,11 +17,6 @@ import ( "google.golang.org/protobuf/proto" ) -const ( - HISTORICAL_BLOCK_FLUSH_EACH = 1000 - LIVE_BLOCK_FLUSH_EACH = 1 -) - type SQLSinker struct { *shutter.Shutter *sink.Sinker @@ -121,8 +116,13 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream } } - if data.Clock.Number%s.batchBlockModulo(isLive) == 0 { - s.logger.Debug("flushing to database", zap.Stringer("block", cursor.Block()), zap.Bool("is_live", *isLive)) + if (s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0) || s.loader.FlushNeeded() { + s.logger.Debug("flushing to database", + zap.Stringer("block", cursor.Block()), + zap.Bool("is_live", *isLive), + zap.Bool("block_flush_interval_reached", s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0), + zap.Bool("row_flush_interval_reached", s.loader.FlushNeeded()), + ) flushStart := time.Now() rowFlushedCount, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, data.FinalBlockHeight) @@ -219,12 +219,12 @@ func (s *SQLSinker) batchBlockModulo(isLive *bool) uint64 { } if *isLive { - return LIVE_BLOCK_FLUSH_EACH + return uint64(s.loader.LiveBlockFlushInterval()) } - if s.loader.FlushInterval() > 0 { - return uint64(s.loader.FlushInterval()) + if s.loader.BatchBlockFlushInterval() > 0 { + return uint64(s.loader.BatchBlockFlushInterval()) } - return HISTORICAL_BLOCK_FLUSH_EACH + return 0 } From 9e1bea2a28da1b5c696ba494021236824c7fb695 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Thu, 19 Dec 2024 13:37:19 +0100 Subject: [PATCH 2/6] update flag description Co-authored-by: Matthieu Vachon --- cmd/substreams-sink-sql/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/substreams-sink-sql/run.go b/cmd/substreams-sink-sql/run.go index 25abe3c..e6c2f40 100644 --- a/cmd/substreams-sink-sql/run.go +++ b/cmd/substreams-sink-sql/run.go @@ -28,7 +28,7 @@ var sinkRunCmd = Command(sinkRunE, AddCommonSinkerFlags(flags) flags.Int("undo-buffer-size", 0, "If non-zero, handling of reorgs in the database is disabled. Instead, a buffer is introduced to only process a blocks once it has been confirmed by that many blocks, introducing a latency but slightly reducing the load on the database when close to head.") - flags.Int("batch-block-flush-interval", 1_000, "When in catch up mode, flush every N blocks or after batch-row-flush-interval, whichever comes first. Set to 0 to disable and only use batch-row-flush-interval.") + flags.Int("batch-block-flush-interval", 1_000, "When in catch up mode, flush every N blocks or after batch-row-flush-interval, whichever comes first. Set to 0 to disable and only use batch-row-flush-interval. Ineffective if the sink is now in the live portion of the chain where only 'live-block-flush-interval' applies.") flags.Int("batch-row-flush-interval", 100_000, "When in catch up mode, flush every N rows or after batch-block-flush-interval, whichever comes first. Set to 0 to disable and only use batch-block-flush-interval.") flags.Int("live-block-flush-interval", 1, "When processing in live mode, flush every N blocks.") flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`") From a6aa926ff87512ac5d81c114e3944b0f9e376ed9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Thu, 19 Dec 2024 13:37:27 +0100 Subject: [PATCH 3/6] update flag description Co-authored-by: Matthieu Vachon --- cmd/substreams-sink-sql/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/substreams-sink-sql/run.go b/cmd/substreams-sink-sql/run.go index e6c2f40..5d5b51b 100644 --- a/cmd/substreams-sink-sql/run.go +++ b/cmd/substreams-sink-sql/run.go @@ -29,7 +29,7 @@ var sinkRunCmd = Command(sinkRunE, flags.Int("undo-buffer-size", 0, "If non-zero, handling of reorgs in the database is disabled. Instead, a buffer is introduced to only process a blocks once it has been confirmed by that many blocks, introducing a latency but slightly reducing the load on the database when close to head.") flags.Int("batch-block-flush-interval", 1_000, "When in catch up mode, flush every N blocks or after batch-row-flush-interval, whichever comes first. Set to 0 to disable and only use batch-row-flush-interval. Ineffective if the sink is now in the live portion of the chain where only 'live-block-flush-interval' applies.") - flags.Int("batch-row-flush-interval", 100_000, "When in catch up mode, flush every N rows or after batch-block-flush-interval, whichever comes first. Set to 0 to disable and only use batch-block-flush-interval.") + flags.Int("batch-row-flush-interval", 100_000, "When in catch up mode, flush every N rows or after batch-block-flush-interval, whichever comes first. Set to 0 to disable and only use batch-block-flush-interval. Ineffective if the sink is now in the live portion of the chain where only 'live-block-flush-interval' applies.") flags.Int("live-block-flush-interval", 1, "When processing in live mode, flush every N blocks.") flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`") }), From f90a23fa11c59bac5b01d4cc817afebe80a83c0c Mon Sep 17 00:00:00 2001 From: fschoell Date: Thu, 19 Dec 2024 13:45:27 +0100 Subject: [PATCH 4/6] re-add --flush-interval as deprecated flag --- cmd/substreams-sink-sql/run.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cmd/substreams-sink-sql/run.go b/cmd/substreams-sink-sql/run.go index 5d5b51b..c9b5ee6 100644 --- a/cmd/substreams-sink-sql/run.go +++ b/cmd/substreams-sink-sql/run.go @@ -31,6 +31,7 @@ var sinkRunCmd = Command(sinkRunE, flags.Int("batch-block-flush-interval", 1_000, "When in catch up mode, flush every N blocks or after batch-row-flush-interval, whichever comes first. Set to 0 to disable and only use batch-row-flush-interval. Ineffective if the sink is now in the live portion of the chain where only 'live-block-flush-interval' applies.") flags.Int("batch-row-flush-interval", 100_000, "When in catch up mode, flush every N rows or after batch-block-flush-interval, whichever comes first. Set to 0 to disable and only use batch-block-flush-interval. Ineffective if the sink is now in the live portion of the chain where only 'live-block-flush-interval' applies.") flags.Int("live-block-flush-interval", 1, "When processing in live mode, flush every N blocks.") + flags.Int("flush-interval", 0, "(deprecated) please use --batch-block-flush-interval instead") flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`") }), Example("substreams-sink-sql run 'postgres://localhost:5432/posgres?sslmode=disable' uniswap-v3@v0.2.10"), @@ -87,7 +88,12 @@ func sinkRunE(cmd *cobra.Command, args []string) error { return fmt.Errorf("new base sinker: %w", err) } - dbLoader, err := newDBLoader(cmd, dsn, sflags.MustGetInt(cmd, "batch-block-flush-interval"), sflags.MustGetInt(cmd, "batch-row-flush-interval"), sflags.MustGetInt(cmd, "live-block-flush-interval"), handleReorgs) + batchBlockFlushInterval := sflags.MustGetInt(cmd, "batch-block-flush-interval") + if sflags.MustGetInt(cmd, "flush-interval") != 0 { + batchBlockFlushInterval = sflags.MustGetInt(cmd, "flush-interval") + } + + dbLoader, err := newDBLoader(cmd, dsn, batchBlockFlushInterval, sflags.MustGetInt(cmd, "batch-row-flush-interval"), sflags.MustGetInt(cmd, "live-block-flush-interval"), handleReorgs) if err != nil { return fmt.Errorf("new db loader: %w", err) } From 660f6b73f66794ee9de798ba640ef13738c926c9 Mon Sep 17 00:00:00 2001 From: fschoell Date: Thu, 19 Dec 2024 13:47:07 +0100 Subject: [PATCH 5/6] add BLOCK_FLUSH_INTERVAL_DISABLED constant --- sinker/sinker.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sinker/sinker.go b/sinker/sinker.go index 4d17462..89b0ca9 100644 --- a/sinker/sinker.go +++ b/sinker/sinker.go @@ -17,6 +17,8 @@ import ( "google.golang.org/protobuf/proto" ) +const BLOCK_FLUSH_INTERVAL_DISABLED = 0 + type SQLSinker struct { *shutter.Shutter *sink.Sinker @@ -226,5 +228,5 @@ func (s *SQLSinker) batchBlockModulo(isLive *bool) uint64 { return uint64(s.loader.BatchBlockFlushInterval()) } - return 0 + return BLOCK_FLUSH_INTERVAL_DISABLED } From 3459d75eb5cb23bdd52aa0cfb99cd9cfeb4aad35 Mon Sep 17 00:00:00 2001 From: fschoell Date: Thu, 19 Dec 2024 13:50:15 +0100 Subject: [PATCH 6/6] add a comment to add running counts --- db/db.go | 1 + 1 file changed, 1 insertion(+) diff --git a/db/db.go b/db/db.go index 9367dd2..0dd6628 100644 --- a/db/db.go +++ b/db/db.go @@ -145,6 +145,7 @@ func (l *Loader) LiveBlockFlushInterval() int { func (l *Loader) FlushNeeded() bool { totalRows := 0 + // todo keep a running count when inserting/deleting rows directly for pair := l.entries.Oldest(); pair != nil; pair = pair.Next() { totalRows += pair.Value.Len() }