Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added new flags to configure flush intervals #69

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased

* Added support for the Clickhouse `Date` type.
* Added more flags to configure flushing intervals. Available flags are `batch-block-flush-interval`, `batch-row-flush-interval` and `live-block-flush-interval`.
* Deprecated the existing `flush-interval` flag in favor of `batch-block-flush-interval`.

## v4.3.0

Expand Down
6 changes: 4 additions & 2 deletions cmd/substreams-sink-sql/common_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ func extractSinkConfig(pkg *pbsubstreams.Package) (*pbsql.Service, error) {
func newDBLoader(
cmd *cobra.Command,
psqlDSN string,
flushInterval time.Duration,
batchBlockFlushInterval int,
batchRowFlushInterval int,
liveBlockFlushInterval int,
handleReorgs bool,
) (*db.Loader, error) {
moduleMismatchMode, err := db.ParseOnModuleHashMismatch(sflags.MustGetString(cmd, onModuleHashMistmatchFlag))
cli.NoError(err, "invalid mistmatch mode")

dbLoader, err := db.NewLoader(psqlDSN, flushInterval, moduleMismatchMode, &handleReorgs, zlog, tracer)
dbLoader, err := db.NewLoader(psqlDSN, batchBlockFlushInterval, batchRowFlushInterval, liveBlockFlushInterval, moduleMismatchMode, &handleReorgs, zlog, tracer)
if err != nil {
return nil, fmt.Errorf("new psql loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/create_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func createUserE(cmd *cobra.Command, args []string) error {
}

if err := retry(ctx, func(ctx context.Context) error {
dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, nil, zlog, tracer)
dbLoader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchError, nil, zlog, tracer)
if err != nil {
return fmt.Errorf("new psql loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/generate_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func generateCsvE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("new base sinker: %w", err)
}

dbLoader, err := newDBLoader(cmd, dsn, 0, false) // flush interval not used in CSV mode
dbLoader, err := newDBLoader(cmd, dsn, 0, 0, 0, false) // flush interval not used in CSV mode
if err != nil {
return fmt.Errorf("new db loader: %w", err)
}
Expand Down
12 changes: 10 additions & 2 deletions cmd/substreams-sink-sql/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ var sinkRunCmd = Command(sinkRunE,
AddCommonSinkerFlags(flags)

flags.Int("undo-buffer-size", 0, "If non-zero, handling of reorgs in the database is disabled. Instead, a buffer is introduced to only process a blocks once it has been confirmed by that many blocks, introducing a latency but slightly reducing the load on the database when close to head.")
flags.Int("flush-interval", 1000, "When in catch up mode, flush every N blocks")
fschoell marked this conversation as resolved.
Show resolved Hide resolved
flags.Int("batch-block-flush-interval", 1_000, "When in catch up mode, flush every N blocks or after batch-row-flush-interval, whichever comes first. Set to 0 to disable and only use batch-row-flush-interval. Ineffective if the sink is now in the live portion of the chain where only 'live-block-flush-interval' applies.")
flags.Int("batch-row-flush-interval", 100_000, "When in catch up mode, flush every N rows or after batch-block-flush-interval, whichever comes first. Set to 0 to disable and only use batch-block-flush-interval. Ineffective if the sink is now in the live portion of the chain where only 'live-block-flush-interval' applies.")
flags.Int("live-block-flush-interval", 1, "When processing in live mode, flush every N blocks.")
flags.Int("flush-interval", 0, "(deprecated) please use --batch-block-flush-interval instead")
flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`")
}),
Example("substreams-sink-sql run 'postgres://localhost:5432/posgres?sslmode=disable' [email protected]"),
Expand Down Expand Up @@ -85,7 +88,12 @@ func sinkRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("new base sinker: %w", err)
}

dbLoader, err := newDBLoader(cmd, dsn, sflags.MustGetDuration(cmd, "flush-interval"), handleReorgs)
batchBlockFlushInterval := sflags.MustGetInt(cmd, "batch-block-flush-interval")
if sflags.MustGetInt(cmd, "flush-interval") != 0 {
batchBlockFlushInterval = sflags.MustGetInt(cmd, "flush-interval")
}

dbLoader, err := newDBLoader(cmd, dsn, batchBlockFlushInterval, sflags.MustGetInt(cmd, "batch-row-flush-interval"), sflags.MustGetInt(cmd, "live-block-flush-interval"), handleReorgs)
if err != nil {
return fmt.Errorf("new db loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func sinkSetupE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("extract sink config: %w", err)
}

dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, nil, zlog, tracer)
dbLoader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchError, nil, zlog, tracer)
if err != nil {
return fmt.Errorf("new psql loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func toolsDeleteCursorE(cmd *cobra.Command, args []string) error {

func toolsCreateLoader() *db.Loader {
dsn := viper.GetString("tools-global-dsn")
loader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchIgnore, nil, zlog, tracer)
loader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchIgnore, nil, zlog, tracer)
cli.NoError(err, "Unable to instantiate database manager from DSN %q", dsn)

if err := loader.LoadTables(); err != nil {
Expand Down
55 changes: 37 additions & 18 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"database/sql"
"fmt"
"time"

"github.com/jimsmart/schema"
"github.com/streamingfast/logging"
orderedmap "github.com/wk8/go-ordered-map/v2"
Expand Down Expand Up @@ -39,9 +37,11 @@ type Loader struct {
tables map[string]*TableInfo
cursorTable *TableInfo

handleReorgs bool
flushInterval time.Duration
moduleMismatchMode OnModuleHashMismatch
handleReorgs bool
batchBlockFlushInterval int
batchRowFlushInterval int
liveBlockFlushInterval int
moduleMismatchMode OnModuleHashMismatch

logger *zap.Logger
tracer logging.Tracer
Expand All @@ -51,7 +51,9 @@ type Loader struct {

func NewLoader(
psqlDsn string,
flushInterval time.Duration,
batchBlockFlushInterval int,
batchRowFlushInterval int,
liveBlockFlushInterval int,
moduleMismatchMode OnModuleHashMismatch,
handleReorgs *bool,
logger *zap.Logger,
Expand All @@ -68,15 +70,17 @@ func NewLoader(
}

l := &Loader{
DB: db,
database: dsn.database,
schema: dsn.schema,
entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](),
tables: map[string]*TableInfo{},
flushInterval: flushInterval,
moduleMismatchMode: moduleMismatchMode,
logger: logger,
tracer: tracer,
DB: db,
database: dsn.database,
schema: dsn.schema,
entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](),
tables: map[string]*TableInfo{},
batchBlockFlushInterval: batchBlockFlushInterval,
batchRowFlushInterval: batchRowFlushInterval,
liveBlockFlushInterval: liveBlockFlushInterval,
moduleMismatchMode: moduleMismatchMode,
logger: logger,
tracer: tracer,
}
_, err = l.tryDialect()
if err != nil {
Expand All @@ -95,7 +99,9 @@ func NewLoader(
}

logger.Info("created new DB loader",
zap.Duration("flush_interval", flushInterval),
zap.Int("batch_block_flush_interval", batchBlockFlushInterval),
zap.Int("batch_row_flush_interval", batchRowFlushInterval),
zap.Int("live_block_flush_interval", liveBlockFlushInterval),
zap.String("driver", dsn.driver),
zap.String("database", dsn.database),
zap.String("schema", dsn.schema),
Expand Down Expand Up @@ -129,8 +135,21 @@ func (l *Loader) BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error) {
return l.DB.BeginTx(ctx, opts)
}

func (l *Loader) FlushInterval() time.Duration {
return l.flushInterval
func (l *Loader) BatchBlockFlushInterval() int {
return l.batchBlockFlushInterval
}

func (l *Loader) LiveBlockFlushInterval() int {
return l.liveBlockFlushInterval
}

func (l *Loader) FlushNeeded() bool {
totalRows := 0
// todo keep a running count when inserting/deleting rows directly
for pair := l.entries.Oldest(); pair != nil; pair = pair.Next() {
totalRows += pair.Value.Len()
}
return totalRows > l.batchRowFlushInterval
}

func (l *Loader) LoadTables() error {
Expand Down
2 changes: 1 addition & 1 deletion db/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func NewTestLoader(
tables map[string]*TableInfo,
) (*Loader, *TestTx) {

loader, err := NewLoader("psql://x:5432/x", 0, OnModuleHashMismatchIgnore, nil, zlog, tracer)
loader, err := NewLoader("psql://x:5432/x", 0, 0, 0, OnModuleHashMismatchIgnore, nil, zlog, tracer)
if err != nil {
panic(err)
}
Expand Down
22 changes: 12 additions & 10 deletions sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ import (
"google.golang.org/protobuf/proto"
)

const (
HISTORICAL_BLOCK_FLUSH_EACH = 1000
LIVE_BLOCK_FLUSH_EACH = 1
)
const BLOCK_FLUSH_INTERVAL_DISABLED = 0

type SQLSinker struct {
*shutter.Shutter
Expand Down Expand Up @@ -121,8 +118,13 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream
}
}

if data.Clock.Number%s.batchBlockModulo(isLive) == 0 {
s.logger.Debug("flushing to database", zap.Stringer("block", cursor.Block()), zap.Bool("is_live", *isLive))
if (s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0) || s.loader.FlushNeeded() {
s.logger.Debug("flushing to database",
zap.Stringer("block", cursor.Block()),
zap.Bool("is_live", *isLive),
zap.Bool("block_flush_interval_reached", s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0),
zap.Bool("row_flush_interval_reached", s.loader.FlushNeeded()),
)

flushStart := time.Now()
rowFlushedCount, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, data.FinalBlockHeight)
Expand Down Expand Up @@ -219,12 +221,12 @@ func (s *SQLSinker) batchBlockModulo(isLive *bool) uint64 {
}

if *isLive {
return LIVE_BLOCK_FLUSH_EACH
return uint64(s.loader.LiveBlockFlushInterval())
}

if s.loader.FlushInterval() > 0 {
return uint64(s.loader.FlushInterval())
if s.loader.BatchBlockFlushInterval() > 0 {
return uint64(s.loader.BatchBlockFlushInterval())
}

return HISTORICAL_BLOCK_FLUSH_EACH
return BLOCK_FLUSH_INTERVAL_DISABLED
}