Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clickhouse and poller settings #110

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func init() {
rootCmd.PersistentFlags().Int("poller-from-block", 0, "From which block to start polling")
rootCmd.PersistentFlags().Bool("poller-force-from-block", false, "Force the poller to start from the block specified in `poller-from-block`")
rootCmd.PersistentFlags().Int("poller-until-block", 0, "Until which block to poll")
rootCmd.PersistentFlags().Int("poller-parallel-pollers", 5, "Maximum number of parallel pollers")
rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer")
rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval")
rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds")
Expand All @@ -76,6 +77,8 @@ func init() {
rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-host", "", "Clickhouse host for orchestrator storage")
rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-username", "", "Clickhouse username for orchestrator storage")
rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-password", "", "Clickhouse password for orchestrator storage")
rootCmd.PersistentFlags().Bool("storage-orchestrator-clickhouse-asyncInsert", false, "Clickhouse async insert for orchestrator storage")
rootCmd.PersistentFlags().Int("storage-orchestrator-clickhouse-maxRowsPerInsert", 100000, "Clickhouse max rows per insert for orchestrator storage")
rootCmd.PersistentFlags().Int("storage-orchestrator-memory-maxItems", 0, "Max items for orchestrator memory storage")
rootCmd.PersistentFlags().Int("storage-orchestrator-redis-poolSize", 0, "Redis pool size for orchestrator storage")
rootCmd.PersistentFlags().String("storage-orchestrator-redis-addr", "", "Redis address for orchestrator storage")
Expand All @@ -85,8 +88,12 @@ func init() {
rootCmd.PersistentFlags().String("storage-main-clickhouse-host", "", "Clickhouse host for main storage")
rootCmd.PersistentFlags().String("storage-main-clickhouse-username", "", "Clickhouse username for main storage")
rootCmd.PersistentFlags().String("storage-main-clickhouse-password", "", "Clickhouse password for main storage")
rootCmd.PersistentFlags().Bool("storage-main-clickhouse-asyncInsert", false, "Clickhouse async insert for main storage")
rootCmd.PersistentFlags().Int("storage-main-clickhouse-maxRowsPerInsert", 100000, "Clickhouse max rows per insert for main storage")
rootCmd.PersistentFlags().String("storage-staging-clickhouse-username", "", "Clickhouse username for staging storage")
rootCmd.PersistentFlags().String("storage-staging-clickhouse-password", "", "Clickhouse password for staging storage")
rootCmd.PersistentFlags().Bool("storage-staging-clickhouse-asyncInsert", false, "Clickhouse async insert for staging storage")
rootCmd.PersistentFlags().Int("storage-staging-clickhouse-maxRowsPerInsert", 100000, "Clickhouse max rows per insert for staging storage")
rootCmd.PersistentFlags().String("api-host", "localhost:3000", "API host")
viper.BindPFlag("rpc.url", rootCmd.PersistentFlags().Lookup("rpc-url"))
viper.BindPFlag("rpc.blocks.blocksPerRequest", rootCmd.PersistentFlags().Lookup("rpc-blocks-blocksPerRequest"))
Expand All @@ -107,6 +114,7 @@ func init() {
viper.BindPFlag("poller.fromBlock", rootCmd.PersistentFlags().Lookup("poller-from-block"))
viper.BindPFlag("poller.forceFromBlock", rootCmd.PersistentFlags().Lookup("poller-force-from-block"))
viper.BindPFlag("poller.untilBlock", rootCmd.PersistentFlags().Lookup("poller-until-block"))
viper.BindPFlag("poller.parallelPollers", rootCmd.PersistentFlags().Lookup("poller-parallel-pollers"))
viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled"))
viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit"))
viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval"))
Expand All @@ -122,18 +130,24 @@ func init() {
viper.BindPFlag("storage.staging.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-database"))
viper.BindPFlag("storage.staging.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-host"))
viper.BindPFlag("storage.staging.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-port"))
viper.BindPFlag("storage.staging.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-username"))
viper.BindPFlag("storage.staging.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-password"))
viper.BindPFlag("storage.staging.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-asyncInsert"))
viper.BindPFlag("storage.staging.clickhouse.maxRowsPerInsert", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-maxRowsPerInsert"))
viper.BindPFlag("storage.main.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-database"))
viper.BindPFlag("storage.main.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-host"))
viper.BindPFlag("storage.main.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-port"))
viper.BindPFlag("storage.main.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-username"))
viper.BindPFlag("storage.main.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-password"))
viper.BindPFlag("storage.staging.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-username"))
viper.BindPFlag("storage.staging.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-password"))
viper.BindPFlag("storage.main.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-asyncInsert"))
viper.BindPFlag("storage.main.clickhouse.maxRowsPerInsert", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-maxRowsPerInsert"))
viper.BindPFlag("storage.orchestrator.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-database"))
viper.BindPFlag("storage.orchestrator.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-host"))
viper.BindPFlag("storage.orchestrator.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-port"))
viper.BindPFlag("storage.orchestrator.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-username"))
viper.BindPFlag("storage.orchestrator.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-password"))
viper.BindPFlag("storage.orchestrator.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-asyncInsert"))
viper.BindPFlag("storage.orchestrator.clickhouse.maxRowsPerInsert", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-maxRowsPerInsert"))
viper.BindPFlag("storage.orchestrator.memory.maxItems", rootCmd.PersistentFlags().Lookup("storage-orchestrator-memory-maxItems"))
viper.BindPFlag("storage.orchestrator.redis.poolSize", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-poolSize"))
viper.BindPFlag("storage.orchestrator.redis.addr", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-addr"))
Expand Down
27 changes: 15 additions & 12 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ type LogConfig struct {
}

type PollerConfig struct {
Enabled bool `mapstructure:"enabled"`
Interval int `mapstructure:"interval"`
BlocksPerPoll int `mapstructure:"blocksPerPoll"`
FromBlock int `mapstructure:"fromBlock"`
ForceFromBlock bool `mapstructure:"forceFromBlock"`
UntilBlock int `mapstructure:"untilBlock"`
Enabled bool `mapstructure:"enabled"`
Interval int `mapstructure:"interval"`
BlocksPerPoll int `mapstructure:"blocksPerPoll"`
FromBlock int `mapstructure:"fromBlock"`
ForceFromBlock bool `mapstructure:"forceFromBlock"`
UntilBlock int `mapstructure:"untilBlock"`
ParallelPollers int `mapstructure:"parallelPollers"`
}

type CommitterConfig struct {
Expand Down Expand Up @@ -63,12 +64,14 @@ type StorageConnectionConfig struct {
}

type ClickhouseConfig struct {
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Database string `mapstructure:"database"`
DisableTLS bool `mapstructure:"disableTLS"`
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Database string `mapstructure:"database"`
DisableTLS bool `mapstructure:"disableTLS"`
AsyncInsert bool `mapstructure:"asyncInsert"`
MaxRowsPerInsert int `mapstructure:"maxRowsPerInsert"`
}

type MemoryConfig struct {
Expand Down
10 changes: 5 additions & 5 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Poller struct {
storage storage.IStorage
lastPolledBlock *big.Int
pollUntilBlock *big.Int
parallelPollers int
}

type BlockNumberWithError struct {
Expand Down Expand Up @@ -62,19 +63,18 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
storage: storage,
lastPolledBlock: lastPolledBlock,
pollUntilBlock: untilBlock,
parallelPollers: config.Cfg.Poller.ParallelPollers,
}
}

func (p *Poller) Start() {
interval := time.Duration(p.triggerIntervalMs) * time.Millisecond
ticker := time.NewTicker(interval)

// TODO: make this configurable?
const numWorkers = 5
tasks := make(chan struct{}, numWorkers)
tasks := make(chan struct{}, p.parallelPollers)
var blockRangeMutex sync.Mutex

for i := 0; i < numWorkers; i++ {
for i := 0; i < p.parallelPollers; i++ {
go func() {
for range tasks {
blockRangeMutex.Lock()
Expand Down Expand Up @@ -155,7 +155,7 @@ func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int)
endBlock = latestBlock
}
if p.reachedPollLimit(endBlock) {
log.Debug().Msgf("End block %s is greater than poll until block %s, setting to poll until block", endBlock, p.pollUntilBlock)
log.Debug().Msgf("End block %s is greater than or equal to poll until block %s, setting range end to poll until block", endBlock, p.pollUntilBlock)
endBlock = p.pollUntilBlock
}
return endBlock
Expand Down
Loading
Loading