diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 25efee4..99294fd 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -109,6 +109,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error } if blocksData == nil || len(*blocksData) == 0 { log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64()) + c.handleMissingStagingData(blocksToCommit) return nil, nil } @@ -189,3 +190,25 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc poller.Poll(missingBlockNumbers) return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String()) } + +func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) { + // Checks if there are any blocks in staging after the current range end + lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0)) + if err != nil { + log.Error().Err(err).Msg("Error checking staged data for missing range") + return + } + if lastStagedBlockNumber == nil || lastStagedBlockNumber.Sign() <= 0 { + log.Debug().Msgf("Committer is caught up with staging. No need to poll for missing blocks.") + return + } + log.Debug().Msgf("Detected missing blocks in staging data starting from %s.", blocksToCommit[0].String()) + + poller := NewBoundlessPoller(c.rpc, c.storage) + blocksToPoll := blocksToCommit + if len(blocksToCommit) > int(poller.blocksPerPoll) { + blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)] + } + poller.Poll(blocksToPoll) + log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String()) +} diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index bb7053f..0da4e56 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -50,9 +50,6 @@ func TestGetBlockNumbersToCommit(t *testing.T) { assert.Equal(t, committer.blocksPerCommit, len(blockNumbers)) assert.Equal(t, big.NewInt(101), blockNumbers[0]) assert.Equal(t, big.NewInt(100+int64(committer.blocksPerCommit)), blockNumbers[len(blockNumbers)-1]) - - mockRPC.AssertExpectations(t) - mockMainStorage.AssertExpectations(t) } func TestGetSequentialBlockDataToCommit(t *testing.T) { @@ -87,10 +84,6 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, result) assert.Equal(t, 3, len(*result)) - - mockRPC.AssertExpectations(t) - mockMainStorage.AssertExpectations(t) - mockStagingStorage.AssertExpectations(t) } func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) { @@ -130,10 +123,6 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) { assert.Equal(t, big.NewInt(101), (*result)[0].Block.Number) assert.Equal(t, big.NewInt(102), (*result)[1].Block.Number) assert.Equal(t, big.NewInt(103), (*result)[2].Block.Number) - - mockRPC.AssertExpectations(t) - mockMainStorage.AssertExpectations(t) - mockStagingStorage.AssertExpectations(t) } func TestCommit(t *testing.T) { @@ -157,9 +146,6 @@ func TestCommit(t *testing.T) { err := committer.commit(&blockData) assert.NoError(t, err) - - mockMainStorage.AssertExpectations(t) - mockStagingStorage.AssertExpectations(t) } func TestHandleGap(t *testing.T) { @@ -206,7 +192,6 @@ func TestStartCommitter(t *testing.T) { } committer := NewCommitter(mockRPC, mockStorage) - committer.storage = mockStorage committer.triggerIntervalMs = 100 // Set a short interval for testing chainID := big.NewInt(1) @@ -226,9 +211,93 @@ func TestStartCommitter(t *testing.T) { // Wait for a short time to allow the committer to run time.Sleep(200 * time.Millisecond) +} + +func TestHandleMissingStagingData(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Committer.BlocksPerCommit = 5 + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + } + + committer := NewCommitter(mockRPC, mockStorage) + + chainID := big.NewInt(1) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{ + Blocks: 100, + }) + mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}).Return([]rpc.GetFullBlockResult{ + {BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}}, + {BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}}, + {BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}}, + {BlockNumber: big.NewInt(3), Data: common.BlockData{Block: common.Block{Number: big.NewInt(3)}}}, + {BlockNumber: big.NewInt(4), Data: common.BlockData{Block: common.Block{Number: big.NewInt(4)}}}, + }) + mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil) + + mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil) + expectedEndBlock := big.NewInt(4) + mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil) + + blockData := []common.BlockData{} + mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{ + ChainId: chainID, + BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}, + }).Return(&blockData, nil) + + result, err := committer.getSequentialBlockDataToCommit() - // Assert that the expected methods were called - mockRPC.AssertExpectations(t) - mockMainStorage.AssertExpectations(t) - mockStagingStorage.AssertExpectations(t) + assert.NoError(t, err) + assert.Nil(t, result) +} + +func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Committer.BlocksPerCommit = 5 + config.Cfg.Poller.BlocksPerPoll = 3 + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + } + + committer := NewCommitter(mockRPC, mockStorage) + + chainID := big.NewInt(1) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{ + Blocks: 3, + }) + mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2)}).Return([]rpc.GetFullBlockResult{ + {BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}}, + {BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}}, + {BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}}, + }) + mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil) + + mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil) + expectedEndBlock := big.NewInt(4) + mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil) + + blockData := []common.BlockData{} + mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{ + ChainId: chainID, + BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}, + }).Return(&blockData, nil) + + result, err := committer.getSequentialBlockDataToCommit() + + assert.NoError(t, err) + assert.Nil(t, result) } diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 23a5ff4..354261c 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -563,7 +563,6 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe } return nil, err } - zLog.Debug().Msgf("Max block number in main storage is: %s", maxBlockNumber.String()) return maxBlockNumber, nil }