diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index 827ca55..fef0861 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -43,17 +43,17 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller { } untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock)) pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock)) - lastPolledBlock, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.GetChainID(), untilBlock) - if err != nil || lastPolledBlock == nil || lastPolledBlock.Sign() <= 0 { - lastPolledBlock = new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block - log.Warn().Err(err).Msgf("No last polled block found, setting to %s", lastPolledBlock.String()) + lastPolledBlock := new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block + if config.Cfg.Poller.ForceFromBlock { + log.Debug().Msgf("ForceFromBlock is enabled, setting last polled block to %s", lastPolledBlock.String()) } else { - // In the case where the start block in staging introduces a gap with main storage, - // This hack allows us to re-poll from the start block without having to delete the staging data - if config.Cfg.Poller.ForceFromBlock { - lastPolledBlock = new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block + highestBlockFromStaging, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.GetChainID(), pollFromBlock, untilBlock) + if err != nil || highestBlockFromStaging == nil || highestBlockFromStaging.Sign() <= 0 { + log.Warn().Err(err).Msgf("No last polled block found, setting to %s", lastPolledBlock.String()) + } else { + lastPolledBlock = highestBlockFromStaging + log.Debug().Msgf("Last polled block found in staging: %s", lastPolledBlock.String()) } - log.Info().Msgf("Last polled block found: %s", lastPolledBlock.String()) } return &Poller{ rpc: rpc, diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index e672da4..6f5f187 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -516,11 +516,14 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe return maxBlockNumber, nil } -func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) { +func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) { query := fmt.Sprintf("SELECT block_number FROM %s.block_data WHERE is_deleted = 0", c.cfg.Database) if chainId.Sign() > 0 { query += fmt.Sprintf(" AND chain_id = %s", chainId.String()) } + if rangeStart.Sign() > 0 { + query += fmt.Sprintf(" AND block_number >= %s", rangeStart.String()) + } if rangeEnd.Sign() > 0 { query += fmt.Sprintf(" AND block_number <= %s", rangeEnd.String()) } diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 5afeaf8..baf9fb2 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -46,7 +46,7 @@ type IStagingStorage interface { InsertStagingData(data []common.BlockData) error GetStagingData(qf QueryFilter) (data *[]common.BlockData, err error) DeleteStagingData(data *[]common.BlockData) error - GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) + GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) } type IMainStorage interface { diff --git a/internal/storage/memory.go b/internal/storage/memory.go index 19286a8..99568ab 100644 --- a/internal/storage/memory.go +++ b/internal/storage/memory.go @@ -194,14 +194,14 @@ func (m *MemoryConnector) GetMaxBlockNumber(chainId *big.Int) (*big.Int, error) return maxBlockNumber, nil } -func IsInRange(num *big.Int, rangeEnd *big.Int) bool { +func IsInRange(num *big.Int, rangeStart *big.Int, rangeEnd *big.Int) bool { if rangeEnd.Sign() == 0 { return true } - return num.Cmp(rangeEnd) <= 0 + return num.Cmp(rangeStart) >= 0 && num.Cmp(rangeEnd) <= 0 } -func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (*big.Int, error) { +func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) { maxBlockNumber := new(big.Int) for _, key := range m.cache.Keys() { if strings.HasPrefix(key, fmt.Sprintf("blockData:%s:", chainId.String())) { @@ -210,7 +210,7 @@ func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *b if !ok { return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr) } - if blockNumber.Cmp(maxBlockNumber) > 0 && IsInRange(blockNumber, rangeEnd) { + if blockNumber.Cmp(maxBlockNumber) > 0 && IsInRange(blockNumber, rangeStart, rangeEnd) { maxBlockNumber = blockNumber } } diff --git a/test/mocks/MockIStagingStorage.go b/test/mocks/MockIStagingStorage.go index 56ca2cc..969967a 100644 --- a/test/mocks/MockIStagingStorage.go +++ b/test/mocks/MockIStagingStorage.go @@ -72,9 +72,9 @@ func (_c *MockIStagingStorage_DeleteStagingData_Call) RunAndReturn(run func(*[]c return _c } -// GetLastStagedBlockNumber provides a mock function with given fields: chainId, rangeEnd -func (_m *MockIStagingStorage) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (*big.Int, error) { - ret := _m.Called(chainId, rangeEnd) +// GetLastStagedBlockNumber provides a mock function with given fields: chainId, rangeStart, rangeEnd +func (_m *MockIStagingStorage) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) { + ret := _m.Called(chainId, rangeStart, rangeEnd) if len(ret) == 0 { panic("no return value specified for GetLastStagedBlockNumber") @@ -82,19 +82,19 @@ func (_m *MockIStagingStorage) GetLastStagedBlockNumber(chainId *big.Int, rangeE var r0 *big.Int var r1 error - if rf, ok := ret.Get(0).(func(*big.Int, *big.Int) (*big.Int, error)); ok { - return rf(chainId, rangeEnd) + if rf, ok := ret.Get(0).(func(*big.Int, *big.Int, *big.Int) (*big.Int, error)); ok { + return rf(chainId, rangeStart, rangeEnd) } - if rf, ok := ret.Get(0).(func(*big.Int, *big.Int) *big.Int); ok { - r0 = rf(chainId, rangeEnd) + if rf, ok := ret.Get(0).(func(*big.Int, *big.Int, *big.Int) *big.Int); ok { + r0 = rf(chainId, rangeStart, rangeEnd) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*big.Int) } } - if rf, ok := ret.Get(1).(func(*big.Int, *big.Int) error); ok { - r1 = rf(chainId, rangeEnd) + if rf, ok := ret.Get(1).(func(*big.Int, *big.Int, *big.Int) error); ok { + r1 = rf(chainId, rangeStart, rangeEnd) } else { r1 = ret.Error(1) } @@ -109,14 +109,15 @@ type MockIStagingStorage_GetLastStagedBlockNumber_Call struct { // GetLastStagedBlockNumber is a helper method to define mock.On call // - chainId *big.Int +// - rangeStart *big.Int // - rangeEnd *big.Int -func (_e *MockIStagingStorage_Expecter) GetLastStagedBlockNumber(chainId interface{}, rangeEnd interface{}) *MockIStagingStorage_GetLastStagedBlockNumber_Call { - return &MockIStagingStorage_GetLastStagedBlockNumber_Call{Call: _e.mock.On("GetLastStagedBlockNumber", chainId, rangeEnd)} +func (_e *MockIStagingStorage_Expecter) GetLastStagedBlockNumber(chainId interface{}, rangeStart interface{}, rangeEnd interface{}) *MockIStagingStorage_GetLastStagedBlockNumber_Call { + return &MockIStagingStorage_GetLastStagedBlockNumber_Call{Call: _e.mock.On("GetLastStagedBlockNumber", chainId, rangeStart, rangeEnd)} } -func (_c *MockIStagingStorage_GetLastStagedBlockNumber_Call) Run(run func(chainId *big.Int, rangeEnd *big.Int)) *MockIStagingStorage_GetLastStagedBlockNumber_Call { +func (_c *MockIStagingStorage_GetLastStagedBlockNumber_Call) Run(run func(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int)) *MockIStagingStorage_GetLastStagedBlockNumber_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*big.Int), args[1].(*big.Int)) + run(args[0].(*big.Int), args[1].(*big.Int), args[2].(*big.Int)) }) return _c } @@ -126,7 +127,7 @@ func (_c *MockIStagingStorage_GetLastStagedBlockNumber_Call) Return(maxBlockNumb return _c } -func (_c *MockIStagingStorage_GetLastStagedBlockNumber_Call) RunAndReturn(run func(*big.Int, *big.Int) (*big.Int, error)) *MockIStagingStorage_GetLastStagedBlockNumber_Call { +func (_c *MockIStagingStorage_GetLastStagedBlockNumber_Call) RunAndReturn(run func(*big.Int, *big.Int, *big.Int) (*big.Int, error)) *MockIStagingStorage_GetLastStagedBlockNumber_Call { _c.Call.Return(run) return _c }