From c34ab3174bb7a538719f60f8f2f7d97bf5adedd9 Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Mon, 21 Oct 2024 21:44:51 +0300 Subject: [PATCH] clickhouse and poller settings --- cmd/root.go | 12 ++++++++++-- configs/config.go | 26 ++++++++++++++------------ internal/orchestrator/poller.go | 8 ++++---- internal/storage/clickhouse.go | 19 +++++++++++++++---- 4 files changed, 43 insertions(+), 22 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 8381c2f..f6fcf24 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -55,6 +55,7 @@ func init() { rootCmd.PersistentFlags().Int("poller-from-block", 0, "From which block to start polling") rootCmd.PersistentFlags().Bool("poller-force-from-block", false, "Force the poller to start from the block specified in `poller-from-block`") rootCmd.PersistentFlags().Int("poller-until-block", 0, "Until which block to poll") + rootCmd.PersistentFlags().Int("poller-parallel-pollers", 5, "Maximum number of parallel pollers") rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer") rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval") rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds") @@ -76,6 +77,7 @@ func init() { rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-host", "", "Clickhouse host for orchestrator storage") rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-username", "", "Clickhouse username for orchestrator storage") rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-password", "", "Clickhouse password for orchestrator storage") + rootCmd.PersistentFlags().Bool("storage-orchestrator-clickhouse-asyncInsert", false, "Clickhouse async insert for orchestrator storage") rootCmd.PersistentFlags().Int("storage-orchestrator-memory-maxItems", 0, "Max items for orchestrator memory storage") rootCmd.PersistentFlags().Int("storage-orchestrator-redis-poolSize", 0, "Redis pool size for orchestrator storage") rootCmd.PersistentFlags().String("storage-orchestrator-redis-addr", "", "Redis address for orchestrator storage") @@ -85,8 +87,10 @@ func init() { rootCmd.PersistentFlags().String("storage-main-clickhouse-host", "", "Clickhouse host for main storage") rootCmd.PersistentFlags().String("storage-main-clickhouse-username", "", "Clickhouse username for main storage") rootCmd.PersistentFlags().String("storage-main-clickhouse-password", "", "Clickhouse password for main storage") + rootCmd.PersistentFlags().Bool("storage-main-clickhouse-asyncInsert", false, "Clickhouse async insert for main storage") rootCmd.PersistentFlags().String("storage-staging-clickhouse-username", "", "Clickhouse username for staging storage") rootCmd.PersistentFlags().String("storage-staging-clickhouse-password", "", "Clickhouse password for staging storage") + rootCmd.PersistentFlags().Bool("storage-staging-clickhouse-asyncInsert", false, "Clickhouse async insert for staging storage") rootCmd.PersistentFlags().String("api-host", "localhost:3000", "API host") viper.BindPFlag("rpc.url", rootCmd.PersistentFlags().Lookup("rpc-url")) viper.BindPFlag("rpc.blocks.blocksPerRequest", rootCmd.PersistentFlags().Lookup("rpc-blocks-blocksPerRequest")) @@ -107,6 +111,7 @@ func init() { viper.BindPFlag("poller.fromBlock", rootCmd.PersistentFlags().Lookup("poller-from-block")) viper.BindPFlag("poller.forceFromBlock", rootCmd.PersistentFlags().Lookup("poller-force-from-block")) viper.BindPFlag("poller.untilBlock", rootCmd.PersistentFlags().Lookup("poller-until-block")) + viper.BindPFlag("poller.parallelPollers", rootCmd.PersistentFlags().Lookup("poller-parallel-pollers")) viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled")) viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit")) viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval")) @@ -122,18 +127,21 @@ func init() { viper.BindPFlag("storage.staging.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-database")) viper.BindPFlag("storage.staging.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-host")) viper.BindPFlag("storage.staging.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-port")) + viper.BindPFlag("storage.staging.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-username")) + viper.BindPFlag("storage.staging.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-password")) + viper.BindPFlag("storage.staging.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-asyncInsert")) viper.BindPFlag("storage.main.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-database")) viper.BindPFlag("storage.main.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-host")) viper.BindPFlag("storage.main.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-port")) viper.BindPFlag("storage.main.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-username")) viper.BindPFlag("storage.main.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-password")) - viper.BindPFlag("storage.staging.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-username")) - viper.BindPFlag("storage.staging.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-password")) + viper.BindPFlag("storage.main.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-asyncInsert")) viper.BindPFlag("storage.orchestrator.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-database")) viper.BindPFlag("storage.orchestrator.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-host")) viper.BindPFlag("storage.orchestrator.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-port")) viper.BindPFlag("storage.orchestrator.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-username")) viper.BindPFlag("storage.orchestrator.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-password")) + viper.BindPFlag("storage.orchestrator.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-asyncInsert")) viper.BindPFlag("storage.orchestrator.memory.maxItems", rootCmd.PersistentFlags().Lookup("storage-orchestrator-memory-maxItems")) viper.BindPFlag("storage.orchestrator.redis.poolSize", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-poolSize")) viper.BindPFlag("storage.orchestrator.redis.addr", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-addr")) diff --git a/configs/config.go b/configs/config.go index a45c80e..775b634 100644 --- a/configs/config.go +++ b/configs/config.go @@ -14,12 +14,13 @@ type LogConfig struct { } type PollerConfig struct { - Enabled bool `mapstructure:"enabled"` - Interval int `mapstructure:"interval"` - BlocksPerPoll int `mapstructure:"blocksPerPoll"` - FromBlock int `mapstructure:"fromBlock"` - ForceFromBlock bool `mapstructure:"forceFromBlock"` - UntilBlock int `mapstructure:"untilBlock"` + Enabled bool `mapstructure:"enabled"` + Interval int `mapstructure:"interval"` + BlocksPerPoll int `mapstructure:"blocksPerPoll"` + FromBlock int `mapstructure:"fromBlock"` + ForceFromBlock bool `mapstructure:"forceFromBlock"` + UntilBlock int `mapstructure:"untilBlock"` + ParallelPollers int `mapstructure:"parallelPollers"` } type CommitterConfig struct { @@ -63,12 +64,13 @@ type StorageConnectionConfig struct { } type ClickhouseConfig struct { - Host string `mapstructure:"host"` - Port int `mapstructure:"port"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - Database string `mapstructure:"database"` - DisableTLS bool `mapstructure:"disableTLS"` + Host string `mapstructure:"host"` + Port int `mapstructure:"port"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + Database string `mapstructure:"database"` + DisableTLS bool `mapstructure:"disableTLS"` + AsyncInsert bool `mapstructure:"asyncInsert"` } type MemoryConfig struct { diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index fef0861..c53a117 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -25,6 +25,7 @@ type Poller struct { storage storage.IStorage lastPolledBlock *big.Int pollUntilBlock *big.Int + parallelPollers int } type BlockNumberWithError struct { @@ -62,6 +63,7 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller { storage: storage, lastPolledBlock: lastPolledBlock, pollUntilBlock: untilBlock, + parallelPollers: config.Cfg.Poller.ParallelPollers, } } @@ -69,12 +71,10 @@ func (p *Poller) Start() { interval := time.Duration(p.triggerIntervalMs) * time.Millisecond ticker := time.NewTicker(interval) - // TODO: make this configurable? - const numWorkers = 5 - tasks := make(chan struct{}, numWorkers) + tasks := make(chan struct{}, p.parallelPollers) var blockRangeMutex sync.Mutex - for i := 0; i < numWorkers; i++ { + for i := 0; i < p.parallelPollers; i++ { go func() { for range tasks { blockRangeMutex.Lock() diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 6f5f187..b42e9d2 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -55,6 +55,15 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) { Username: cfg.Username, Password: cfg.Password, }, + Settings: func() clickhouse.Settings { + if cfg.AsyncInsert { + return clickhouse.Settings{ + "async_insert": "1", + "wait_for_async_insert": "1", + } + } + return clickhouse.Settings{} + }(), }) if err != nil { return nil, err @@ -299,6 +308,8 @@ func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[common.Log], func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf QueryFilter, scanFunc func(driver.Rows) (T, error)) (QueryResult[T], error) { query := c.buildQuery(table, columns, qf) + // Print the query for debugging + zLog.Debug().Str("query", query).Msg("Executing query") rows, err := c.conn.Query(context.Background(), query) if err != nil { return QueryResult[T]{}, err @@ -919,7 +930,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { defer wg.Done() if err := c.insertBlocks(&blocks); err != nil { saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting blocks: %v", err) + saveErr = fmt.Errorf("error inserting blocks: %v", err) saveErrMutex.Unlock() } }() @@ -931,7 +942,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { defer wg.Done() if err := c.insertLogs(&logs); err != nil { saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting logs: %v", err) + saveErr = fmt.Errorf("error inserting logs: %v", err) saveErrMutex.Unlock() } }() @@ -943,7 +954,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { defer wg.Done() if err := c.insertTransactions(&transactions); err != nil { saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting transactions: %v", err) + saveErr = fmt.Errorf("error inserting transactions: %v", err) saveErrMutex.Unlock() } }() @@ -955,7 +966,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { defer wg.Done() if err := c.insertTraces(&traces); err != nil { saveErrMutex.Lock() - saveErr = fmt.Errorf("error deleting traces: %v", err) + saveErr = fmt.Errorf("error inserting traces: %v", err) saveErrMutex.Unlock() } }()