Skip to content

Commit

Permalink
stellar#4909: working test results with byte arrays in fast batch bui…
Browse files Browse the repository at this point in the history
…lder copy statements
  • Loading branch information
sreuland committed Oct 17, 2023
1 parent da5f7de commit c1d5ce7
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 101 deletions.
1 change: 1 addition & 0 deletions services/horizon/internal/actions/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func checkOuterHashResponse(
}

func TestFeeBumpTransactionPage(t *testing.T) {

tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
Expand Down
8 changes: 6 additions & 2 deletions services/horizon/internal/db2/history/fee_bump_scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -269,6 +270,8 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture {
details, err := json.Marshal(map[string]string{
"bump_to": "98",
})

fmt.Print(string(details))
tt.Assert.NoError(err)

tt.Assert.NoError(opBuilder.Add(
Expand Down Expand Up @@ -296,9 +299,10 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture {
EffectSequenceBumped,
details,
)

tt.Assert.NoError(err)
tt.Assert.NoError(accountLoader.Exec(ctx, q))
tt.Assert.NoError(effectBuilder.Exec(ctx, q))
tt.Assert.NoError(accountLoader.Exec(ctx, q.SessionInterface))
tt.Assert.NoError(effectBuilder.Exec(ctx, q.SessionInterface))

tt.Assert.NoError(q.Commit())

Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/group_processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (m *mockHorizonTransactionProcessor) ProcessTransaction(lcm xdr.LedgerClose
}

func (m *mockHorizonTransactionProcessor) Flush(ctx context.Context, session db.SessionInterface) error {
args := m.Called(ctx)
args := m.Called(ctx, session)
return args.Error(0)
}

Expand Down
17 changes: 10 additions & 7 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func (s *ProcessorRunner) buildTransactionProcessor(
ledgerTransactionStats *processors.StatsLedgerTransactionProcessor,
tradeProcessor *processors.TradeProcessor,
ledger xdr.LedgerCloseMeta,
txBuilder history.TransactionBatchInsertBuilder,
) *groupTransactionProcessors {
accountLoader := history.NewAccountLoader()
assetLoader := history.NewAssetLoader()
Expand All @@ -157,7 +156,7 @@ func (s *ProcessorRunner) buildTransactionProcessor(
tradeProcessor,
processors.NewParticipantsProcessor(accountLoader,
s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()),
processors.NewTransactionProcessor(txBuilder),
processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder()),
processors.NewClaimableBalancesTransactionProcessor(history.NewClaimableBalanceLoader(),
s.historyQ.NewTransactionClaimableBalanceBatchInsertBuilder(), s.historyQ.NewOperationClaimableBalanceBatchInsertBuilder()),
processors.NewLiquidityPoolsTransactionProcessor(lpLoader,
Expand All @@ -175,11 +174,11 @@ func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers
return newGroupTransactionFilterers(f)
}

func (s *ProcessorRunner) buildFilteredOutProcessor(txBuilder history.TransactionBatchInsertBuilder) *groupTransactionProcessors {
func (s *ProcessorRunner) buildFilteredOutProcessor() *groupTransactionProcessors {
// when in online mode, the submission result processor must always run (regardless of filtering)
var p []horizonTransactionProcessor
if s.config.EnableIngestionFiltering {
txSubProc := processors.NewTransactionFilteredTmpProcessor(txBuilder)
txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder())
p = append(p, txSubProc)
}

Expand Down Expand Up @@ -333,11 +332,10 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos
return
}

txBuilder := s.historyQ.NewTransactionBatchInsertBuilder()
groupTransactionFilterers := s.buildTransactionFilterer()
groupFilteredOutProcessors := s.buildFilteredOutProcessor(txBuilder)
groupFilteredOutProcessors := s.buildFilteredOutProcessor()
groupTransactionProcessors := s.buildTransactionProcessor(
&ledgerTransactionStats, &tradeProcessor, ledger, txBuilder)
&ledgerTransactionStats, &tradeProcessor, ledger)
err = processors.StreamLedgerTransactions(s.ctx,
groupTransactionFilterers,
groupFilteredOutProcessors,
Expand All @@ -351,6 +349,11 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos
}

if s.config.EnableIngestionFiltering {
err = groupFilteredOutProcessors.Flush(s.ctx, s.session)
if err != nil {
err = errors.Wrap(err, "Error flushing temp filtered tx from processor")
return
}
if time.Since(s.lastTransactionsTmpGC) > transactionsFilteredTmpGCPeriod {
s.historyQ.DeleteTransactionsFilteredTmpOlderThan(s.ctx, uint64(transactionsFilteredTmpGCPeriod.Seconds()))
}
Expand Down
164 changes: 76 additions & 88 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) {
q := &mockDBQ{}
defer mock.AssertExpectationsForObjects(t, q)

q.MockQTransactions.On("NewTransactionBatchInsertBuilder").
Return(&history.MockTransactionsBatchInsertBuilder{})
q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{})
q.MockQLedgers.On("NewLedgerBatchInsertBuilder").
Return(&history.MockLedgersBatchInsertBuilder{})
Expand All @@ -264,15 +266,14 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) {
historyQ: q,
}

txBuilder := &history.MockTransactionsBatchInsertBuilder{}
stats := &processors.StatsLedgerTransactionProcessor{}
trades := &processors.TradeProcessor{}
ledger := xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{},
},
}
processor := runner.buildTransactionProcessor(stats, trades, ledger, txBuilder)
processor := runner.buildTransactionProcessor(stats, trades, ledger)
assert.IsType(t, &groupTransactionProcessors{}, processor)

assert.IsType(t, &statsLedgerTransactionProcessor{}, processor.processors[0])
Expand Down Expand Up @@ -309,71 +310,16 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) {
}

// Batches
mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder)
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize).
Return(mockAccountSignersBatchInsertBuilder).Once()

mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder)
mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once()
q.MockQOperations.On("NewOperationBatchInsertBuilder").
Return(mockOperationsBatchInsertBuilder).Twice()

mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder)
mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil)

mockTransactionsFilteredTmpBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockTransactionsFilteredTmpBatchInsertBuilder)
mockTransactionsFilteredTmpBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once()
q.MockQTransactions.On("NewTransactionFilteredTmpBatchInsertBuilder").
Return(mockTransactionsBatchInsertBuilder)

q.MockQTransactions.On("NewTransactionBatchInsertBuilder").
Return(mockTransactionsBatchInsertBuilder)

q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize).
Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once()
Return(mockTransactionsFilteredTmpBatchInsertBuilder)

q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")).
Return(int64(0), nil)

q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{})
q.MockQLedgers.On("NewLedgerBatchInsertBuilder").
Return(&history.MockLedgersBatchInsertBuilder{})

mockEffectsBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{}
mockEffectsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once()
q.MockQEffects.On("NewEffectBatchInsertBuilder").
Return(mockEffectsBatchInsertBuilder)

mockTransactionsParticipantsBatchInsertBuilder := &history.MockTransactionParticipantsBatchInsertBuilder{}
mockTransactionsParticipantsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil)
q.On("NewTransactionParticipantsBatchInsertBuilder").
Return(mockTransactionsParticipantsBatchInsertBuilder)

mockOperationParticipantBatchInsertBuilder := &history.MockOperationParticipantBatchInsertBuilder{}
mockOperationParticipantBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil)
q.On("NewOperationParticipantBatchInsertBuilder").
Return(mockOperationParticipantBatchInsertBuilder)

mockTransactionClaimableBalanceBatchInsertBuilder := &history.MockTransactionClaimableBalanceBatchInsertBuilder{}
mockTransactionClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil)
q.MockQHistoryClaimableBalances.On("NewTransactionClaimableBalanceBatchInsertBuilder").
Return(mockTransactionClaimableBalanceBatchInsertBuilder)

mockOperationClaimableBalanceBatchInsertBuilder := &history.MockOperationClaimableBalanceBatchInsertBuilder{}
mockOperationClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil)
q.MockQHistoryClaimableBalances.On("NewOperationClaimableBalanceBatchInsertBuilder").
Return(mockOperationClaimableBalanceBatchInsertBuilder)

mockTransactionLiquidityPoolBatchInsertBuilder := &history.MockTransactionLiquidityPoolBatchInsertBuilder{}
mockTransactionLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil)
q.MockQHistoryLiquidityPools.On("NewTransactionLiquidityPoolBatchInsertBuilder").
Return(mockTransactionLiquidityPoolBatchInsertBuilder)

mockOperationLiquidityPoolBatchInsertBuilder := &history.MockOperationLiquidityPoolBatchInsertBuilder{}
mockOperationLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil)
q.MockQHistoryLiquidityPools.On("NewOperationLiquidityPoolBatchInsertBuilder").
Return(mockOperationLiquidityPoolBatchInsertBuilder)
defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx, maxBatchSize)...)

mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{}
q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder)
Expand Down Expand Up @@ -421,25 +367,7 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) {
}

// Batches
mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder)
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize).
Return(mockAccountSignersBatchInsertBuilder).Once()

mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder)
mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once()
q.MockQOperations.On("NewOperationBatchInsertBuilder").
Return(mockOperationsBatchInsertBuilder).Twice()

mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder)
mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once()
q.MockQTransactions.On("NewTransactionBatchInsertBuilder").
Return(mockTransactionsBatchInsertBuilder).Twice()

q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize).
Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once()
defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx, maxBatchSize)...)

mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{}
q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder)
Expand Down Expand Up @@ -486,21 +414,21 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t
}

// Batches
mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{}
q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize).
Return(mockTransactionsBatchInsertBuilder).Twice()

mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder)
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize).
Return(mockAccountSignersBatchInsertBuilder).Once()

mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder)
q.MockQOperations.On("NewOperationBatchInsertBuilder", maxBatchSize).
q.MockQOperations.On("NewOperationBatchInsertBuilder").
Return(mockOperationsBatchInsertBuilder).Twice()

mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder)
q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize).
Return(mockTransactionsBatchInsertBuilder).Twice()
defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder,
mockAccountSignersBatchInsertBuilder,
mockOperationsBatchInsertBuilder)

runner := ProcessorRunner{
ctx: ctx,
Expand All @@ -517,3 +445,63 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t
),
)
}

func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Context, maxBatchSize int) []interface{} {
mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{}
mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once()
q.MockQTransactions.On("NewTransactionBatchInsertBuilder").
Return(mockTransactionsBatchInsertBuilder)

mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{}
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize).
Return(mockAccountSignersBatchInsertBuilder).Once()

mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{}
mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once()
q.MockQOperations.On("NewOperationBatchInsertBuilder").
Return(mockOperationsBatchInsertBuilder).Twice()

mockEffectBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{}
mockEffectBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once()
q.MockQEffects.On("NewEffectBatchInsertBuilder").
Return(mockEffectBatchInsertBuilder)

mockTransactionsParticipantsBatchInsertBuilder := &history.MockTransactionParticipantsBatchInsertBuilder{}
mockTransactionsParticipantsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil)
q.On("NewTransactionParticipantsBatchInsertBuilder").
Return(mockTransactionsParticipantsBatchInsertBuilder)

mockOperationParticipantBatchInsertBuilder := &history.MockOperationParticipantBatchInsertBuilder{}
mockOperationParticipantBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil)
q.On("NewOperationParticipantBatchInsertBuilder").
Return(mockOperationParticipantBatchInsertBuilder)

mockTransactionClaimableBalanceBatchInsertBuilder := &history.MockTransactionClaimableBalanceBatchInsertBuilder{}
mockTransactionClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil)
q.MockQHistoryClaimableBalances.On("NewTransactionClaimableBalanceBatchInsertBuilder").
Return(mockTransactionClaimableBalanceBatchInsertBuilder)

mockOperationClaimableBalanceBatchInsertBuilder := &history.MockOperationClaimableBalanceBatchInsertBuilder{}
mockOperationClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil)
q.MockQHistoryClaimableBalances.On("NewOperationClaimableBalanceBatchInsertBuilder").
Return(mockOperationClaimableBalanceBatchInsertBuilder)

mockTransactionLiquidityPoolBatchInsertBuilder := &history.MockTransactionLiquidityPoolBatchInsertBuilder{}
mockTransactionLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil)
q.MockQHistoryLiquidityPools.On("NewTransactionLiquidityPoolBatchInsertBuilder").
Return(mockTransactionLiquidityPoolBatchInsertBuilder)

mockOperationLiquidityPoolBatchInsertBuilder := &history.MockOperationLiquidityPoolBatchInsertBuilder{}
mockOperationLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil)
q.MockQHistoryLiquidityPools.On("NewOperationLiquidityPoolBatchInsertBuilder").
Return(mockOperationLiquidityPoolBatchInsertBuilder)

q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize).
Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once()

q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{})

return []interface{}{mockAccountSignersBatchInsertBuilder,
mockOperationsBatchInsertBuilder,
mockTransactionsBatchInsertBuilder}
}
1 change: 1 addition & 0 deletions support/db/batch_insert_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type hungerRow struct {
Name string `db:"name"`
HungerLevel string `db:"hunger_level"`
JsonValue []byte `db:"json_value"`
}

type invalidHungerRow struct {
Expand Down
9 changes: 6 additions & 3 deletions support/db/fast_batch_insert_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@ func TestFastBatchInsertBuilder(t *testing.T) {
insertBuilder.Row(map[string]interface{}{
"name": "bubba",
"hunger_level": "1",
"json_value": []byte(`{\"bump_to\":\"98\"}`),
}),
)

assert.EqualError(t,
insertBuilder.Row(map[string]interface{}{
"name": "bubba",
}),
"invalid number of columns (expected=2, actual=1)",
"invalid number of columns (expected=3, actual=1)",
)

assert.EqualError(t,
insertBuilder.Row(map[string]interface{}{
"name": "bubba",
"city": "London",
"name": "bubba",
"city": "London",
"json_value": []byte(`{\"bump_to\":\"98\"}`),
}),
"column \"hunger_level\" does not exist",
)
Expand All @@ -43,6 +45,7 @@ func TestFastBatchInsertBuilder(t *testing.T) {
insertBuilder.RowStruct(hungerRow{
Name: "bubba2",
HungerLevel: "9",
JsonValue: []byte(`{\"bump_to\":\"98\"}`),
}),
)

Expand Down
1 change: 1 addition & 0 deletions support/db/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const testSchema = `
CREATE TABLE IF NOT EXISTS people (
name character varying NOT NULL,
hunger_level integer NOT NULL,
json_value jsonb,
PRIMARY KEY (name)
);
DELETE FROM people;
Expand Down

0 comments on commit c1d5ce7

Please sign in to comment.