Skip to content

Commit

Permalink
Merge pull request #103 from thirdweb-dev/10-16-add_lower_range_for_f…
Browse files Browse the repository at this point in the history
…inding_staged_block_number

Improve poller block range handling and staging data retrieval
  • Loading branch information
iuwqyir authored Oct 15, 2024
2 parents 241a8a7 + ce2266d commit 15948f0
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 29 deletions.
18 changes: 9 additions & 9 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
}
untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock))
pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock))
lastPolledBlock, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.GetChainID(), untilBlock)
if err != nil || lastPolledBlock == nil || lastPolledBlock.Sign() <= 0 {
lastPolledBlock = new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
log.Warn().Err(err).Msgf("No last polled block found, setting to %s", lastPolledBlock.String())
lastPolledBlock := new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
if config.Cfg.Poller.ForceFromBlock {
log.Debug().Msgf("ForceFromBlock is enabled, setting last polled block to %s", lastPolledBlock.String())
} else {
// In the case where the start block in staging introduces a gap with main storage,
// This hack allows us to re-poll from the start block without having to delete the staging data
if config.Cfg.Poller.ForceFromBlock {
lastPolledBlock = new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
highestBlockFromStaging, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.GetChainID(), pollFromBlock, untilBlock)
if err != nil || highestBlockFromStaging == nil || highestBlockFromStaging.Sign() <= 0 {
log.Warn().Err(err).Msgf("No last polled block found, setting to %s", lastPolledBlock.String())
} else {
lastPolledBlock = highestBlockFromStaging
log.Debug().Msgf("Last polled block found in staging: %s", lastPolledBlock.String())
}
log.Info().Msgf("Last polled block found: %s", lastPolledBlock.String())
}
return &Poller{
rpc: rpc,
Expand Down
5 changes: 4 additions & 1 deletion internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,11 +516,14 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe
return maxBlockNumber, nil
}

func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) {
func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) {
query := fmt.Sprintf("SELECT block_number FROM %s.block_data WHERE is_deleted = 0", c.cfg.Database)
if chainId.Sign() > 0 {
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
}
if rangeStart.Sign() > 0 {
query += fmt.Sprintf(" AND block_number >= %s", rangeStart.String())
}
if rangeEnd.Sign() > 0 {
query += fmt.Sprintf(" AND block_number <= %s", rangeEnd.String())
}
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type IStagingStorage interface {
InsertStagingData(data []common.BlockData) error
GetStagingData(qf QueryFilter) (data *[]common.BlockData, err error)
DeleteStagingData(data *[]common.BlockData) error
GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
}

type IMainStorage interface {
Expand Down
8 changes: 4 additions & 4 deletions internal/storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,14 @@ func (m *MemoryConnector) GetMaxBlockNumber(chainId *big.Int) (*big.Int, error)
return maxBlockNumber, nil
}

func IsInRange(num *big.Int, rangeEnd *big.Int) bool {
func IsInRange(num *big.Int, rangeStart *big.Int, rangeEnd *big.Int) bool {
if rangeEnd.Sign() == 0 {
return true
}
return num.Cmp(rangeEnd) <= 0
return num.Cmp(rangeStart) >= 0 && num.Cmp(rangeEnd) <= 0
}

func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (*big.Int, error) {
func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) {
maxBlockNumber := new(big.Int)
for _, key := range m.cache.Keys() {
if strings.HasPrefix(key, fmt.Sprintf("blockData:%s:", chainId.String())) {
Expand All @@ -210,7 +210,7 @@ func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *b
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
}
if blockNumber.Cmp(maxBlockNumber) > 0 && IsInRange(blockNumber, rangeEnd) {
if blockNumber.Cmp(maxBlockNumber) > 0 && IsInRange(blockNumber, rangeStart, rangeEnd) {
maxBlockNumber = blockNumber
}
}
Expand Down
29 changes: 15 additions & 14 deletions test/mocks/MockIStagingStorage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 15948f0

Please sign in to comment.