diff --git a/services/horizon/internal/actions/transaction_test.go b/services/horizon/internal/actions/transaction_test.go index e029edef3a..b76cf1b0bf 100644 --- a/services/horizon/internal/actions/transaction_test.go +++ b/services/horizon/internal/actions/transaction_test.go @@ -149,6 +149,7 @@ func checkOuterHashResponse( } func TestFeeBumpTransactionPage(t *testing.T) { + tt := test.Start(t) defer tt.Finish() test.ResetHorizonDB(t, tt.HorizonDB) diff --git a/services/horizon/internal/db2/history/fee_bump_scenario.go b/services/horizon/internal/db2/history/fee_bump_scenario.go index 5d155ac5e8..be7a21ef14 100644 --- a/services/horizon/internal/db2/history/fee_bump_scenario.go +++ b/services/horizon/internal/db2/history/fee_bump_scenario.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "encoding/json" + "fmt" "testing" "time" @@ -269,6 +270,8 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { details, err := json.Marshal(map[string]string{ "bump_to": "98", }) + + fmt.Print(string(details)) tt.Assert.NoError(err) tt.Assert.NoError(opBuilder.Add( @@ -296,9 +299,10 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { EffectSequenceBumped, details, ) + tt.Assert.NoError(err) - tt.Assert.NoError(accountLoader.Exec(ctx, q)) - tt.Assert.NoError(effectBuilder.Exec(ctx, q)) + tt.Assert.NoError(accountLoader.Exec(ctx, q.SessionInterface)) + tt.Assert.NoError(effectBuilder.Exec(ctx, q.SessionInterface)) tt.Assert.NoError(q.Commit()) diff --git a/services/horizon/internal/ingest/group_processors_test.go b/services/horizon/internal/ingest/group_processors_test.go index 80d43a0d01..73d4f56f3f 100644 --- a/services/horizon/internal/ingest/group_processors_test.go +++ b/services/horizon/internal/ingest/group_processors_test.go @@ -43,7 +43,7 @@ func (m *mockHorizonTransactionProcessor) ProcessTransaction(lcm xdr.LedgerClose } func (m *mockHorizonTransactionProcessor) Flush(ctx context.Context, session db.SessionInterface) error { - args := m.Called(ctx) + args := m.Called(ctx, session) return args.Error(0) } diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 28efe53a6e..a95f88cf6f 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -135,7 +135,6 @@ func (s *ProcessorRunner) buildTransactionProcessor( ledgerTransactionStats *processors.StatsLedgerTransactionProcessor, tradeProcessor *processors.TradeProcessor, ledger xdr.LedgerCloseMeta, - txBuilder history.TransactionBatchInsertBuilder, ) *groupTransactionProcessors { accountLoader := history.NewAccountLoader() assetLoader := history.NewAssetLoader() @@ -157,7 +156,7 @@ func (s *ProcessorRunner) buildTransactionProcessor( tradeProcessor, processors.NewParticipantsProcessor(accountLoader, s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()), - processors.NewTransactionProcessor(txBuilder), + processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder()), processors.NewClaimableBalancesTransactionProcessor(history.NewClaimableBalanceLoader(), s.historyQ.NewTransactionClaimableBalanceBatchInsertBuilder(), s.historyQ.NewOperationClaimableBalanceBatchInsertBuilder()), processors.NewLiquidityPoolsTransactionProcessor(lpLoader, @@ -175,11 +174,11 @@ func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers return newGroupTransactionFilterers(f) } -func (s *ProcessorRunner) buildFilteredOutProcessor(txBuilder history.TransactionBatchInsertBuilder) *groupTransactionProcessors { +func (s *ProcessorRunner) buildFilteredOutProcessor() *groupTransactionProcessors { // when in online mode, the submission result processor must always run (regardless of filtering) var p []horizonTransactionProcessor if s.config.EnableIngestionFiltering { - txSubProc := processors.NewTransactionFilteredTmpProcessor(txBuilder) + txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder()) p = append(p, txSubProc) } @@ -333,11 +332,10 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos return } - txBuilder := s.historyQ.NewTransactionBatchInsertBuilder() groupTransactionFilterers := s.buildTransactionFilterer() - groupFilteredOutProcessors := s.buildFilteredOutProcessor(txBuilder) + groupFilteredOutProcessors := s.buildFilteredOutProcessor() groupTransactionProcessors := s.buildTransactionProcessor( - &ledgerTransactionStats, &tradeProcessor, ledger, txBuilder) + &ledgerTransactionStats, &tradeProcessor, ledger) err = processors.StreamLedgerTransactions(s.ctx, groupTransactionFilterers, groupFilteredOutProcessors, @@ -351,6 +349,11 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos } if s.config.EnableIngestionFiltering { + err = groupFilteredOutProcessors.Flush(s.ctx, s.session) + if err != nil { + err = errors.Wrap(err, "Error flushing temp filtered tx from processor") + return + } if time.Since(s.lastTransactionsTmpGC) > transactionsFilteredTmpGCPeriod { s.historyQ.DeleteTransactionsFilteredTmpOlderThan(s.ctx, uint64(transactionsFilteredTmpGCPeriod.Seconds())) } diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 3470cb9fec..326ad37583 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -238,6 +238,8 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { q := &mockDBQ{} defer mock.AssertExpectationsForObjects(t, q) + q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). + Return(&history.MockTransactionsBatchInsertBuilder{}) q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) q.MockQLedgers.On("NewLedgerBatchInsertBuilder"). Return(&history.MockLedgersBatchInsertBuilder{}) @@ -264,7 +266,6 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { historyQ: q, } - txBuilder := &history.MockTransactionsBatchInsertBuilder{} stats := &processors.StatsLedgerTransactionProcessor{} trades := &processors.TradeProcessor{} ledger := xdr.LedgerCloseMeta{ @@ -272,7 +273,7 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { LedgerHeader: xdr.LedgerHeaderHistoryEntry{}, }, } - processor := runner.buildTransactionProcessor(stats, trades, ledger, txBuilder) + processor := runner.buildTransactionProcessor(stats, trades, ledger) assert.IsType(t, &groupTransactionProcessors{}, processor) assert.IsType(t, &statsLedgerTransactionProcessor{}, processor.processors[0]) @@ -309,71 +310,16 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { } // Batches - mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder) - q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). - Return(mockAccountSignersBatchInsertBuilder).Once() - - mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder) - mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() - q.MockQOperations.On("NewOperationBatchInsertBuilder"). - Return(mockOperationsBatchInsertBuilder).Twice() - - mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder) - mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - + mockTransactionsFilteredTmpBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} + defer mock.AssertExpectationsForObjects(t, mockTransactionsFilteredTmpBatchInsertBuilder) + mockTransactionsFilteredTmpBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQTransactions.On("NewTransactionFilteredTmpBatchInsertBuilder"). - Return(mockTransactionsBatchInsertBuilder) - - q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). - Return(mockTransactionsBatchInsertBuilder) - - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). - Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() + Return(mockTransactionsFilteredTmpBatchInsertBuilder) q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")). Return(int64(0), nil) - q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) - q.MockQLedgers.On("NewLedgerBatchInsertBuilder"). - Return(&history.MockLedgersBatchInsertBuilder{}) - - mockEffectsBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{} - mockEffectsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() - q.MockQEffects.On("NewEffectBatchInsertBuilder"). - Return(mockEffectsBatchInsertBuilder) - - mockTransactionsParticipantsBatchInsertBuilder := &history.MockTransactionParticipantsBatchInsertBuilder{} - mockTransactionsParticipantsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - q.On("NewTransactionParticipantsBatchInsertBuilder"). - Return(mockTransactionsParticipantsBatchInsertBuilder) - - mockOperationParticipantBatchInsertBuilder := &history.MockOperationParticipantBatchInsertBuilder{} - mockOperationParticipantBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - q.On("NewOperationParticipantBatchInsertBuilder"). - Return(mockOperationParticipantBatchInsertBuilder) - - mockTransactionClaimableBalanceBatchInsertBuilder := &history.MockTransactionClaimableBalanceBatchInsertBuilder{} - mockTransactionClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - q.MockQHistoryClaimableBalances.On("NewTransactionClaimableBalanceBatchInsertBuilder"). - Return(mockTransactionClaimableBalanceBatchInsertBuilder) - - mockOperationClaimableBalanceBatchInsertBuilder := &history.MockOperationClaimableBalanceBatchInsertBuilder{} - mockOperationClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - q.MockQHistoryClaimableBalances.On("NewOperationClaimableBalanceBatchInsertBuilder"). - Return(mockOperationClaimableBalanceBatchInsertBuilder) - - mockTransactionLiquidityPoolBatchInsertBuilder := &history.MockTransactionLiquidityPoolBatchInsertBuilder{} - mockTransactionLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - q.MockQHistoryLiquidityPools.On("NewTransactionLiquidityPoolBatchInsertBuilder"). - Return(mockTransactionLiquidityPoolBatchInsertBuilder) - - mockOperationLiquidityPoolBatchInsertBuilder := &history.MockOperationLiquidityPoolBatchInsertBuilder{} - mockOperationLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - q.MockQHistoryLiquidityPools.On("NewOperationLiquidityPoolBatchInsertBuilder"). - Return(mockOperationLiquidityPoolBatchInsertBuilder) + defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx, maxBatchSize)...) mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) @@ -421,25 +367,7 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { } // Batches - mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder) - q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). - Return(mockAccountSignersBatchInsertBuilder).Once() - - mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder) - mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() - q.MockQOperations.On("NewOperationBatchInsertBuilder"). - Return(mockOperationsBatchInsertBuilder).Twice() - - mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder) - mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() - q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). - Return(mockTransactionsBatchInsertBuilder).Twice() - - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). - Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() + defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx, maxBatchSize)...) mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) @@ -486,21 +414,21 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t } // Batches + mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} + q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize). + Return(mockTransactionsBatchInsertBuilder).Twice() mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder) q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). Return(mockAccountSignersBatchInsertBuilder).Once() mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder) - q.MockQOperations.On("NewOperationBatchInsertBuilder", maxBatchSize). + q.MockQOperations.On("NewOperationBatchInsertBuilder"). Return(mockOperationsBatchInsertBuilder).Twice() - mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder) - q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize). - Return(mockTransactionsBatchInsertBuilder).Twice() + defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder, + mockAccountSignersBatchInsertBuilder, + mockOperationsBatchInsertBuilder) runner := ProcessorRunner{ ctx: ctx, @@ -517,3 +445,63 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t ), ) } + +func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Context, maxBatchSize int) []interface{} { + mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} + mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). + Return(mockTransactionsBatchInsertBuilder) + + mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} + q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). + Return(mockAccountSignersBatchInsertBuilder).Once() + + mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} + mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQOperations.On("NewOperationBatchInsertBuilder"). + Return(mockOperationsBatchInsertBuilder).Twice() + + mockEffectBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{} + mockEffectBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQEffects.On("NewEffectBatchInsertBuilder"). + Return(mockEffectBatchInsertBuilder) + + mockTransactionsParticipantsBatchInsertBuilder := &history.MockTransactionParticipantsBatchInsertBuilder{} + mockTransactionsParticipantsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.On("NewTransactionParticipantsBatchInsertBuilder"). + Return(mockTransactionsParticipantsBatchInsertBuilder) + + mockOperationParticipantBatchInsertBuilder := &history.MockOperationParticipantBatchInsertBuilder{} + mockOperationParticipantBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.On("NewOperationParticipantBatchInsertBuilder"). + Return(mockOperationParticipantBatchInsertBuilder) + + mockTransactionClaimableBalanceBatchInsertBuilder := &history.MockTransactionClaimableBalanceBatchInsertBuilder{} + mockTransactionClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryClaimableBalances.On("NewTransactionClaimableBalanceBatchInsertBuilder"). + Return(mockTransactionClaimableBalanceBatchInsertBuilder) + + mockOperationClaimableBalanceBatchInsertBuilder := &history.MockOperationClaimableBalanceBatchInsertBuilder{} + mockOperationClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryClaimableBalances.On("NewOperationClaimableBalanceBatchInsertBuilder"). + Return(mockOperationClaimableBalanceBatchInsertBuilder) + + mockTransactionLiquidityPoolBatchInsertBuilder := &history.MockTransactionLiquidityPoolBatchInsertBuilder{} + mockTransactionLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryLiquidityPools.On("NewTransactionLiquidityPoolBatchInsertBuilder"). + Return(mockTransactionLiquidityPoolBatchInsertBuilder) + + mockOperationLiquidityPoolBatchInsertBuilder := &history.MockOperationLiquidityPoolBatchInsertBuilder{} + mockOperationLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryLiquidityPools.On("NewOperationLiquidityPoolBatchInsertBuilder"). + Return(mockOperationLiquidityPoolBatchInsertBuilder) + + q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). + Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() + + q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) + + return []interface{}{mockAccountSignersBatchInsertBuilder, + mockOperationsBatchInsertBuilder, + mockTransactionsBatchInsertBuilder} +} diff --git a/support/db/batch_insert_builder_test.go b/support/db/batch_insert_builder_test.go index e283e8bf57..e0d28e145d 100644 --- a/support/db/batch_insert_builder_test.go +++ b/support/db/batch_insert_builder_test.go @@ -13,6 +13,7 @@ import ( type hungerRow struct { Name string `db:"name"` HungerLevel string `db:"hunger_level"` + JsonValue []byte `db:"json_value"` } type invalidHungerRow struct { diff --git a/support/db/fast_batch_insert_builder_test.go b/support/db/fast_batch_insert_builder_test.go index c31f502735..2d55c8446a 100644 --- a/support/db/fast_batch_insert_builder_test.go +++ b/support/db/fast_batch_insert_builder_test.go @@ -21,6 +21,7 @@ func TestFastBatchInsertBuilder(t *testing.T) { insertBuilder.Row(map[string]interface{}{ "name": "bubba", "hunger_level": "1", + "json_value": []byte(`{\"bump_to\":\"98\"}`), }), ) @@ -28,13 +29,14 @@ func TestFastBatchInsertBuilder(t *testing.T) { insertBuilder.Row(map[string]interface{}{ "name": "bubba", }), - "invalid number of columns (expected=2, actual=1)", + "invalid number of columns (expected=3, actual=1)", ) assert.EqualError(t, insertBuilder.Row(map[string]interface{}{ - "name": "bubba", - "city": "London", + "name": "bubba", + "city": "London", + "json_value": []byte(`{\"bump_to\":\"98\"}`), }), "column \"hunger_level\" does not exist", ) @@ -43,6 +45,7 @@ func TestFastBatchInsertBuilder(t *testing.T) { insertBuilder.RowStruct(hungerRow{ Name: "bubba2", HungerLevel: "9", + JsonValue: []byte(`{\"bump_to\":\"98\"}`), }), ) diff --git a/support/db/internal_test.go b/support/db/internal_test.go index 8ce0370a92..8884bb62c6 100644 --- a/support/db/internal_test.go +++ b/support/db/internal_test.go @@ -7,6 +7,7 @@ const testSchema = ` CREATE TABLE IF NOT EXISTS people ( name character varying NOT NULL, hunger_level integer NOT NULL, + json_value jsonb, PRIMARY KEY (name) ); DELETE FROM people;