diff --git a/.github/workflows/horizon.yml b/.github/workflows/horizon.yml index f315e27791..c64a5b6716 100644 --- a/.github/workflows/horizon.yml +++ b/.github/workflows/horizon.yml @@ -126,6 +126,10 @@ jobs: STELLAR_CORE_VERSION: 21.3.1-2007.4ede19620.focal CAPTIVE_CORE_STORAGE_PATH: /tmp steps: + - name: Free Disk Space (Ubuntu) + uses: jlumbroso/free-disk-space@main + with: + tool-cache: true - uses: actions/checkout@v3 with: # For pull requests, build and test the PR head not a merge of the PR with the destination. @@ -134,8 +138,8 @@ jobs: - name: Build and test the Verify Range Docker image run: | docker build --build-arg="GO_VERSION=$(sed -En 's/^toolchain[[:space:]]+go([[:digit:].]+)$/\1/p' go.mod)" -f services/horizon/docker/verify-range/Dockerfile -t stellar/horizon-verify-range services/horizon/docker/verify-range/ - # Any range should do for basic testing, this range was chosen pretty early in history so that it only takes a few mins to run - docker run -e BRANCH=$(git rev-parse HEAD) -e FROM=10000063 -e TO=10000127 stellar/horizon-verify-range + # Use small default range of two most recent checkpoints back from latest archived checkpoint. + docker run -e TESTNET=true -e BRANCH=$(git rev-parse HEAD) -e FROM=0 -e TO=0 stellar/horizon-verify-range # Push image - if: github.ref == 'refs/heads/master' diff --git a/ingest/CHANGELOG.md b/ingest/CHANGELOG.md index ed168de74e..7671573d73 100644 --- a/ingest/CHANGELOG.md +++ b/ingest/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## Pending + +### Fixed +* The Captive Core backend now performs 'online' stellar-core `run` for bounded modes of tx-meta retrieval. Refer to [runFrom.go](./ledgerbackend/run_from.go). Enables core to build, validate, and emit trusted ledger hashes in tx-meta stream from lastest of network for a bounded ledger range. The bounded mode will no longer do the 'offline' mode of running core `catchup` for getting tx-meta from just history archives, which does not guarantee verification of the ledger hashes to that of live network. ([#4538](https://github.com/stellar/go/pull/4538)). + * Note - due to the usage of `run` with LCL set to the `from` , there is now potential for longer run time execution durations due to core having to perform online replay from network latest ledger back to `from`. The longer runtime duration will be proportional to the older age of the `from` ledger. + + + All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). ### Stellar Core Protocol 21 Configuration Update: diff --git a/ingest/ledger_change_reader_test.go b/ingest/ledger_change_reader_test.go index 0b077007e5..8826ffa289 100644 --- a/ingest/ledger_change_reader_test.go +++ b/ingest/ledger_change_reader_test.go @@ -23,7 +23,7 @@ const ( func TestNewLedgerChangeReaderFails(t *testing.T) { ctx := context.Background() - mock := &ledgerbackend.MockDatabaseBackend{} + mock := &ledgerbackend.MockLedgerBackend{} seq := uint32(123) mock.On("GetLedger", ctx, seq).Return( xdr.LedgerCloseMeta{}, @@ -39,7 +39,7 @@ func TestNewLedgerChangeReaderFails(t *testing.T) { func TestNewLedgerChangeReaderSucceeds(t *testing.T) { ctx := context.Background() - mock := &ledgerbackend.MockDatabaseBackend{} + mock := &ledgerbackend.MockLedgerBackend{} seq := uint32(123) header := xdr.LedgerHeaderHistoryEntry{ @@ -146,7 +146,7 @@ func assertChangesEqual( func TestLedgerChangeReaderOrder(t *testing.T) { ctx := context.Background() - mock := &ledgerbackend.MockDatabaseBackend{} + mock := &ledgerbackend.MockLedgerBackend{} seq := uint32(123) src := xdr.MustAddress("GBXGQJWVLWOYHFLVTKWV5FGHA3LNYY2JQKM7OAJAUEQFU6LPCSEFVXON") @@ -353,7 +353,7 @@ func TestLedgerChangeReaderOrder(t *testing.T) { func TestLedgerChangeLedgerCloseMetaV2(t *testing.T) { ctx := context.Background() - mock := &ledgerbackend.MockDatabaseBackend{} + mock := &ledgerbackend.MockLedgerBackend{} seq := uint32(123) src := xdr.MustAddress("GBXGQJWVLWOYHFLVTKWV5FGHA3LNYY2JQKM7OAJAUEQFU6LPCSEFVXON") @@ -600,7 +600,7 @@ func TestLedgerChangeLedgerCloseMetaV2(t *testing.T) { func TestLedgerChangeLedgerCloseMetaV2Empty(t *testing.T) { ctx := context.Background() - mock := &ledgerbackend.MockDatabaseBackend{} + mock := &ledgerbackend.MockLedgerBackend{} seq := uint32(123) baseFee := xdr.Int64(100) diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index c8f28974f5..ef7fa8c51a 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -28,16 +28,6 @@ var _ LedgerBackend = (*CaptiveStellarCore)(nil) // ErrCannotStartFromGenesis is returned when attempting to prepare a range from ledger 1 var ErrCannotStartFromGenesis = errors.New("CaptiveCore is unable to start from ledger 1, start from ledger 2") -func (c *CaptiveStellarCore) roundDownToFirstReplayAfterCheckpointStart(ledger uint32) uint32 { - r := c.checkpointManager.GetCheckpointRange(ledger) - if r.Low <= 1 { - // Stellar-Core doesn't stream ledger 1 - return 2 - } - // All other checkpoints start at the next multiple of 64 - return r.Low -} - // CaptiveStellarCore is a ledger backend that starts internal Stellar-Core // subprocess responsible for streaming ledger data. It provides better decoupling // than DatabaseBackend but requires some extra init time. @@ -163,7 +153,7 @@ type CaptiveCoreConfig struct { } // NewCaptive returns a new CaptiveStellarCore instance. -func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { +func NewCaptive(config CaptiveCoreConfig) (LedgerBackend, error) { // Here we set defaults in the config. Because config is not a pointer this code should // not mutate the original CaptiveCoreConfig instance which was passed into NewCaptive() @@ -327,56 +317,18 @@ func (c *CaptiveStellarCore) getLatestCheckpointSequence() (uint32, error) { return has.CurrentLedger, nil } -func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error { - latestCheckpointSequence, err := c.getLatestCheckpointSequence() - if err != nil { - return errors.Wrap(err, "error getting latest checkpoint sequence") - } - - if from > latestCheckpointSequence { - return errors.Errorf( - "from sequence: %d is greater than max available in history archives: %d", - from, - latestCheckpointSequence, - ) - } - - if to > latestCheckpointSequence { - return errors.Errorf( - "to sequence: %d is greater than max available in history archives: %d", - to, - latestCheckpointSequence, - ) - } - - stellarCoreRunner := c.stellarCoreRunnerFactory() - if err = stellarCoreRunner.catchup(from, to); err != nil { - return errors.Wrap(err, "error running stellar-core") - } - c.stellarCoreRunner = stellarCoreRunner - - // The next ledger should be the first ledger of the checkpoint containing - // the requested ledger - ran := BoundedRange(from, to) - c.ledgerSequenceLock.Lock() - defer c.ledgerSequenceLock.Unlock() - - c.prepared = &ran - c.nextLedger = c.roundDownToFirstReplayAfterCheckpointStart(from) - c.lastLedger = &to - c.previousLedgerHash = nil - - return nil -} - -func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, from uint32) error { - runFrom, ledgerHash, err := c.runFromParams(ctx, from) +func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, ledgerRange Range) error { + runFrom, ledgerHash, err := c.runFromParams(ctx, ledgerRange.from) if err != nil { return errors.Wrap(err, "error calculating ledger and hash for stellar-core run") } stellarCoreRunner := c.stellarCoreRunnerFactory() - if err = stellarCoreRunner.runFrom(runFrom, ledgerHash); err != nil { + runnerMode := stellarCoreRunnerModeActive + if ledgerRange.bounded { + runnerMode = stellarCoreRunnerModePassive + } + if err = stellarCoreRunner.runFrom(runFrom, ledgerHash, runnerMode); err != nil { return errors.Wrap(err, "error running stellar-core") } c.stellarCoreRunner = stellarCoreRunner @@ -388,9 +340,15 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro defer c.ledgerSequenceLock.Unlock() c.nextLedger = 0 - ran := UnboundedRange(from) + ran := ledgerRange + var last *uint32 + if ledgerRange.bounded { + boundedTo := ledgerRange.to + last = &boundedTo + } + + c.lastLedger = last c.prepared = &ran - c.lastLedger = nil c.previousLedgerHash = nil return nil @@ -497,12 +455,8 @@ func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRang } } - var err error - if ledgerRange.bounded { - err = c.openOfflineReplaySubprocess(ledgerRange.from, ledgerRange.to) - } else { - err = c.openOnlineReplaySubprocess(ctx, ledgerRange.from) - } + err := c.openOnlineReplaySubprocess(ctx, ledgerRange) + if err != nil { return false, errors.Wrap(err, "opening subprocess") } @@ -513,13 +467,9 @@ func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRang // PrepareRange prepares the given range (including from and to) to be loaded. // Captive stellar-core backend needs to initialize Stellar-Core state to be // able to stream ledgers. -// Stellar-Core mode depends on the provided ledgerRange: -// - For BoundedRange it will start Stellar-Core in catchup mode. -// - For UnboundedRange it will first catchup to starting ledger and then run -// it normally (including connecting to the Stellar network). // -// Please note that using a BoundedRange, currently, requires a full-trust on -// history archive. This issue is being fixed in Stellar-Core. +// ctx - caller context +// ledgerRange - specify the range info func (c *CaptiveStellarCore) PrepareRange(ctx context.Context, ledgerRange Range) error { if alreadyPrepared, err := c.startPreparingRange(ctx, ledgerRange); err != nil { return errors.Wrap(err, "error starting prepare range") diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index f8161aec25..d754e61e7e 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -32,13 +32,8 @@ func (m *stellarCoreRunnerMock) context() context.Context { return a.Get(0).(context.Context) } -func (m *stellarCoreRunnerMock) catchup(from, to uint32) error { - a := m.Called(from, to) - return a.Error(0) -} - -func (m *stellarCoreRunnerMock) runFrom(from uint32, hash string) error { - a := m.Called(from, hash) +func (m *stellarCoreRunnerMock) runFrom(from uint32, hash string, runnerMode stellarCoreRunnerMode) error { + a := m.Called(from, hash, runnerMode) return a.Error(0) } @@ -151,7 +146,7 @@ func TestCaptiveNew(t *testing.T) { networkPassphrase := network.PublicNetworkPassphrase historyURLs := []string{server.URL} - captiveStellarCore, err := NewCaptive( + captiveLedgerBackend, err := NewCaptive( CaptiveCoreConfig{ BinaryPath: executablePath, NetworkPassphrase: networkPassphrase, @@ -161,6 +156,7 @@ func TestCaptiveNew(t *testing.T) { CoreProtocolVersionFn: func(string) (uint, error) { return 21, nil }, }, ) + captiveStellarCore := captiveLedgerBackend.(*CaptiveStellarCore) assert.NoError(t, err) assert.Equal(t, uint32(0), captiveStellarCore.nextLedger) @@ -212,10 +208,12 @@ func TestCaptivePrepareRange(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once() + mockRunner.On("runFrom", uint32(99), "", stellarCoreRunnerModePassive).Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) + bounded := BoundedRange(100, 200) + mockArchive := &historyarchive.MockArchive{} mockArchive. On("GetRootHAS"). @@ -233,9 +231,10 @@ func TestCaptivePrepareRange(t *testing.T) { cancel: context.CancelFunc(func() { cancelCalled = true }), + useDB: true, } - err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) + err := captiveBackend.PrepareRange(ctx, bounded) assert.NoError(t, err) mockRunner.On("close").Return(nil).Once() err = captiveBackend.Close() @@ -250,12 +249,13 @@ func TestCaptivePrepareRangeCrash(t *testing.T) { close(metaChan) ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once() + mockRunner.On("runFrom", uint32(99), "", stellarCoreRunnerModePassive).Return(nil).Once() mockRunner.On("getProcessExitError").Return(errors.New("exit code -1"), true) mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("close").Return(nil).Once() mockRunner.On("context").Return(ctx) + bounded := BoundedRange(100, 200) mockArchive := &historyarchive.MockArchive{} mockArchive. On("GetRootHAS"). @@ -269,9 +269,10 @@ func TestCaptivePrepareRangeCrash(t *testing.T) { return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), + useDB: true, } - err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) + err := captiveBackend.PrepareRange(ctx, bounded) assert.EqualError(t, err, "Error fast-forwarding to 100: stellar core exited unexpectedly: exit code -1") mockRunner.AssertExpectations(t) mockArchive.AssertExpectations(t) @@ -291,10 +292,12 @@ func TestCaptivePrepareRangeTerminated(t *testing.T) { close(metaChan) ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once() + mockRunner.On("runFrom", uint32(99), "", stellarCoreRunnerModePassive).Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) + bounded := BoundedRange(100, 200) + mockArchive := &historyarchive.MockArchive{} mockArchive. On("GetRootHAS"). @@ -308,9 +311,10 @@ func TestCaptivePrepareRangeTerminated(t *testing.T) { return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), + useDB: true, } - err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) + err := captiveBackend.PrepareRange(ctx, bounded) assert.NoError(t, err) mockRunner.AssertExpectations(t) mockArchive.AssertExpectations(t) @@ -327,7 +331,7 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Twice() + mockRunner.On("runFrom", uint32(99), "", stellarCoreRunnerModePassive).Return(nil).Twice() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil) @@ -339,21 +343,24 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) { CurrentLedger: uint32(200), }, nil) + bounded := BoundedRange(100, 200) + captiveBackend := CaptiveStellarCore{ archive: mockArchive, stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), + useDB: true, } - err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) + err := captiveBackend.PrepareRange(ctx, bounded) assert.NoError(t, err) // Simulates a long (but graceful) shutdown... cancel() - err = captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) + err = captiveBackend.PrepareRange(ctx, bounded) assert.NoError(t, err) mockRunner.AssertExpectations(t) @@ -393,7 +400,7 @@ func TestCaptivePrepareRange_ErrGettingRootHAS(t *testing.T) { } err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) - assert.EqualError(t, err, "error starting prepare range: opening subprocess: error getting latest checkpoint sequence: error getting root HAS: transient error") + assert.EqualError(t, err, "error starting prepare range: opening subprocess: error calculating ledger and hash for stellar-core run: error getting latest checkpoint sequence: error getting root HAS: transient error") err = captiveBackend.PrepareRange(ctx, UnboundedRange(100)) assert.EqualError(t, err, "error starting prepare range: opening subprocess: error calculating ledger and hash for stellar-core run: error getting latest checkpoint sequence: error getting root HAS: transient error") @@ -420,8 +427,8 @@ func TestCaptivePrepareRange_FromIsAheadOfRootHAS(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) - assert.EqualError(t, err, "error starting prepare range: opening subprocess: from sequence: 100 is greater than max available in history archives: 64") + err := captiveBackend.PrepareRange(ctx, BoundedRange(193, 200)) + assert.EqualError(t, err, "error starting prepare range: opening subprocess: error calculating ledger and hash for stellar-core run: trying to start online mode too far (latest checkpoint=64), only two checkpoints in the future allowed") err = captiveBackend.PrepareRange(ctx, UnboundedRange(193)) assert.EqualError(t, err, "error starting prepare range: opening subprocess: error calculating ledger and hash for stellar-core run: trying to start online mode too far (latest checkpoint=64), only two checkpoints in the future allowed") @@ -439,7 +446,7 @@ func TestCaptivePrepareRange_FromIsAheadOfRootHAS(t *testing.T) { } } - mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() + mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000", stellarCoreRunnerModeActive).Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) @@ -469,18 +476,12 @@ func TestCaptivePrepareRangeWithDB_FromIsAheadOfRootHAS(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) - assert.EqualError(t, err, "error starting prepare range: opening subprocess: from sequence: 100 is greater than max available in history archives: 64") - - err = captiveBackend.PrepareRange(ctx, UnboundedRange(193)) - assert.EqualError(t, err, "error starting prepare range: opening subprocess: error calculating ledger and hash for stellar-core run: trying to start online mode too far (latest checkpoint=64), only two checkpoints in the future allowed") - metaChan := make(chan metaResult, 100) meta := buildLedgerCloseMeta(testLedgerHeader{sequence: 100}) metaChan <- metaResult{ LedgerCloseMeta: &meta, } - mockRunner.On("runFrom", uint32(99), "").Return(nil).Once() + mockRunner.On("runFrom", uint32(99), "", stellarCoreRunnerModeActive).Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) @@ -490,33 +491,9 @@ func TestCaptivePrepareRangeWithDB_FromIsAheadOfRootHAS(t *testing.T) { mockRunner.AssertExpectations(t) } -func TestCaptivePrepareRange_ToIsAheadOfRootHAS(t *testing.T) { - mockRunner := &stellarCoreRunnerMock{} - mockArchive := &historyarchive.MockArchive{} - mockArchive. - On("GetRootHAS"). - Return(historyarchive.HistoryArchiveState{ - CurrentLedger: uint32(192), - }, nil) - - captiveBackend := CaptiveStellarCore{ - archive: mockArchive, - stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { - return mockRunner - }, - checkpointManager: historyarchive.NewCheckpointManager(64), - } - - err := captiveBackend.PrepareRange(context.Background(), BoundedRange(100, 200)) - assert.EqualError(t, err, "error starting prepare range: opening subprocess: to sequence: 200 is greater than max available in history archives: 192") - - mockArchive.AssertExpectations(t) - mockRunner.AssertExpectations(t) -} - func TestCaptivePrepareRange_ErrCatchup(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("catchup", uint32(100), uint32(192)).Return(errors.New("transient error")).Once() + mockRunner.On("runFrom", uint32(99), "", stellarCoreRunnerModePassive).Return(errors.New("transient error")).Once() mockArchive := &historyarchive.MockArchive{} mockArchive. @@ -525,6 +502,8 @@ func TestCaptivePrepareRange_ErrCatchup(t *testing.T) { CurrentLedger: uint32(192), }, nil) + bounded := BoundedRange(100, 192) + ctx := context.Background() cancelCalled := false captiveBackend := CaptiveStellarCore{ @@ -535,9 +514,11 @@ func TestCaptivePrepareRange_ErrCatchup(t *testing.T) { cancel: context.CancelFunc(func() { cancelCalled = true }), + useDB: true, + checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 192)) + err := captiveBackend.PrepareRange(ctx, bounded) assert.EqualError(t, err, "error starting prepare range: opening subprocess: error running stellar-core: transient error") // make sure we can Close without errors @@ -550,7 +531,7 @@ func TestCaptivePrepareRange_ErrCatchup(t *testing.T) { func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("runFrom", uint32(126), "0000000000000000000000000000000000000000000000000000000000000000").Return(errors.New("transient error")).Once() + mockRunner.On("runFrom", uint32(126), "0000000000000000000000000000000000000000000000000000000000000000", stellarCoreRunnerModeActive).Return(errors.New("transient error")).Once() mockArchive := &historyarchive.MockArchive{} mockArchive. @@ -601,7 +582,7 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("runFrom", uint32(64), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() + mockRunner.On("runFrom", uint32(64), "0000000000000000000000000000000000000000000000000000000000000000", stellarCoreRunnerModeActive).Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("getProcessExitError").Return(nil, false) @@ -650,7 +631,7 @@ func TestGetLatestLedgerSequence(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() + mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000", stellarCoreRunnerModeActive).Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) @@ -699,7 +680,7 @@ func TestGetLatestLedgerSequenceRaceCondition(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) - mockRunner.On("runFrom", mock.Anything, mock.Anything).Return(nil) + mockRunner.On("runFrom", mock.Anything, mock.Anything, stellarCoreRunnerModeActive).Return(nil) mockArchive := &historyarchive.MockArchive{} mockArchive. @@ -763,7 +744,7 @@ func TestCaptiveGetLedger(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) + mockRunner.On("runFrom", uint32(64), "", stellarCoreRunnerModePassive).Return(nil) mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("getProcessExitError").Return(nil, false) @@ -775,19 +756,21 @@ func TestCaptiveGetLedger(t *testing.T) { CurrentLedger: uint32(200), }, nil) + ledgerRange := BoundedRange(65, 66) + captiveBackend := CaptiveStellarCore{ archive: mockArchive, stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), + useDB: true, } // requires PrepareRange _, err := captiveBackend.GetLedger(ctx, 64) tt.EqualError(err, "session is not prepared, call PrepareRange first") - ledgerRange := BoundedRange(65, 66) tt.False(captiveBackend.isPrepared(ledgerRange), "core is not prepared until explicitly prepared") tt.False(captiveBackend.closed) err = captiveBackend.PrepareRange(ctx, ledgerRange) @@ -854,7 +837,7 @@ func TestCaptiveGetLedgerCacheLatestLedger(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("runFrom", uint32(65), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once() + mockRunner.On("runFrom", uint32(65), "0101010100000000000000000000000000000000000000000000000000000000", stellarCoreRunnerModeActive).Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) @@ -916,7 +899,7 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T) ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) + mockRunner.On("runFrom", uint32(64), "", stellarCoreRunnerModePassive).Return(nil) mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil) @@ -928,15 +911,18 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T) CurrentLedger: uint32(200), }, nil) + ledgerRange := BoundedRange(65, 66) + captiveBackend := CaptiveStellarCore{ archive: mockArchive, stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), + useDB: true, } - err := captiveBackend.PrepareRange(ctx, BoundedRange(65, 66)) + err := captiveBackend.PrepareRange(ctx, ledgerRange) assert.NoError(t, err) _, err = captiveBackend.GetLedger(ctx, 66) @@ -962,7 +948,7 @@ func TestCaptiveGetLedger_NextLedger0RangeFromIsSmallerThanLedgerFromBuffer(t *t ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("runFrom", uint32(64), mock.Anything).Return(nil) + mockRunner.On("runFrom", uint32(64), mock.Anything, stellarCoreRunnerModeActive).Return(nil) mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil) @@ -1011,7 +997,7 @@ func TestCaptiveStellarCore_PrepareRangeAfterClose(t *testing.T) { captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) assert.NoError(t, err) - captiveStellarCore, err := NewCaptive( + captiveLedgerBackend, err := NewCaptive( CaptiveCoreConfig{ BinaryPath: executablePath, NetworkPassphrase: networkPassphrase, @@ -1019,18 +1005,17 @@ func TestCaptiveStellarCore_PrepareRangeAfterClose(t *testing.T) { Toml: captiveCoreToml, StoragePath: storagePath, CoreProtocolVersionFn: func(string) (uint, error) { return 21, nil }, + UseDB: true, }, ) assert.NoError(t, err) + captiveStellarCore := captiveLedgerBackend.(*CaptiveStellarCore) assert.NoError(t, captiveStellarCore.Close()) - assert.EqualError( + assert.ErrorContains( t, - captiveStellarCore.PrepareRange(ctx, BoundedRange(65, 66)), - "error starting prepare range: opening subprocess: error getting latest checkpoint sequence: "+ - "error getting root HAS: Get \"http://localhost/.well-known/stellar-history.json\": context canceled", - ) + captiveStellarCore.PrepareRange(ctx, BoundedRange(65, 66)), "context canceled") // even if the request to fetch the latest checkpoint succeeds, we should fail at creating the subprocess mockArchive := &historyarchive.MockArchive{} @@ -1040,11 +1025,10 @@ func TestCaptiveStellarCore_PrepareRangeAfterClose(t *testing.T) { CurrentLedger: uint32(200), }, nil) captiveStellarCore.archive = mockArchive - assert.EqualError( + + assert.ErrorContains( t, - captiveStellarCore.PrepareRange(ctx, BoundedRange(65, 66)), - "error starting prepare range: opening subprocess: error running stellar-core: context canceled", - ) + captiveStellarCore.PrepareRange(ctx, BoundedRange(65, 66)), "context canceled") mockArchive.AssertExpectations(t) } @@ -1064,7 +1048,7 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) + mockRunner.On("runFrom", uint32(64), "", stellarCoreRunnerModePassive).Return(nil) mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) ctx, cancel := context.WithCancel(ctx) mockRunner.On("context").Return(ctx) @@ -1081,15 +1065,18 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) { CurrentLedger: uint32(200), }, nil) + ledgerRange := BoundedRange(65, 66) + captiveBackend := CaptiveStellarCore{ archive: mockArchive, stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), + useDB: true, } - err := captiveBackend.PrepareRange(ctx, BoundedRange(65, 66)) + err := captiveBackend.PrepareRange(ctx, ledgerRange) assert.NoError(t, err) meta, err := captiveBackend.GetLedger(ctx, 65) @@ -1122,7 +1109,7 @@ func TestCaptiveGetLedger_ErrClosingAfterLastLedger(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) + mockRunner.On("runFrom", uint32(64), "", stellarCoreRunnerModePassive).Return(nil) mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(fmt.Errorf("transient error")).Once() @@ -1134,15 +1121,18 @@ func TestCaptiveGetLedger_ErrClosingAfterLastLedger(t *testing.T) { CurrentLedger: uint32(200), }, nil) + ledgerRange := BoundedRange(65, 66) + captiveBackend := CaptiveStellarCore{ archive: mockArchive, stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), + useDB: true, } - err := captiveBackend.PrepareRange(ctx, BoundedRange(65, 66)) + err := captiveBackend.PrepareRange(ctx, ledgerRange) assert.NoError(t, err) _, err = captiveBackend.GetLedger(ctx, 66) @@ -1164,7 +1154,7 @@ func TestCaptiveAfterClose(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} ctx, cancel := context.WithCancel(context.Background()) - mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) + mockRunner.On("runFrom", uint32(64), "", stellarCoreRunnerModePassive).Return(nil) mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil).Once() @@ -1176,6 +1166,8 @@ func TestCaptiveAfterClose(t *testing.T) { CurrentLedger: uint32(200), }, nil) + boundedRange := BoundedRange(65, 66) + captiveBackend := CaptiveStellarCore{ archive: mockArchive, stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { @@ -1183,9 +1175,9 @@ func TestCaptiveAfterClose(t *testing.T) { }, checkpointManager: historyarchive.NewCheckpointManager(64), cancel: cancel, + useDB: true, } - boundedRange := BoundedRange(65, 66) err := captiveBackend.PrepareRange(ctx, boundedRange) assert.NoError(t, err) @@ -1219,7 +1211,7 @@ func TestGetLedgerBoundsCheck(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("catchup", uint32(128), uint32(130)).Return(nil).Once() + mockRunner.On("runFrom", uint32(127), "", stellarCoreRunnerModePassive).Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) @@ -1230,15 +1222,18 @@ func TestGetLedgerBoundsCheck(t *testing.T) { CurrentLedger: uint32(200), }, nil) + boundedRange := BoundedRange(128, 130) + captiveBackend := CaptiveStellarCore{ archive: mockArchive, stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), + useDB: true, } - err := captiveBackend.PrepareRange(ctx, BoundedRange(128, 130)) + err := captiveBackend.PrepareRange(ctx, boundedRange) assert.NoError(t, err) meta, err := captiveBackend.GetLedger(ctx, 128) @@ -1343,7 +1338,7 @@ func TestCaptiveGetLedgerTerminatedUnexpectedly(t *testing.T) { ctx := testCase.ctx mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("catchup", uint32(64), uint32(100)).Return(nil).Once() + mockRunner.On("runFrom", uint32(63), "", stellarCoreRunnerModePassive).Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("getProcessExitError").Return(testCase.processExitedError, testCase.processExited) @@ -1356,15 +1351,18 @@ func TestCaptiveGetLedgerTerminatedUnexpectedly(t *testing.T) { CurrentLedger: uint32(200), }, nil) + boundedRange := BoundedRange(64, 100) + captiveBackend := CaptiveStellarCore{ archive: mockArchive, stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), + useDB: true, } - err := captiveBackend.PrepareRange(ctx, BoundedRange(64, 100)) + err := captiveBackend.PrepareRange(ctx, boundedRange) assert.NoError(t, err) meta, err := captiveBackend.GetLedger(ctx, 64) @@ -1627,7 +1625,7 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("runFrom", uint32(254), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once() + mockRunner.On("runFrom", uint32(254), "0101010100000000000000000000000000000000000000000000000000000000", stellarCoreRunnerModeActive).Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil).Once() diff --git a/ingest/ledgerbackend/catchup.go b/ingest/ledgerbackend/catchup.go deleted file mode 100644 index 2cd12df0f3..0000000000 --- a/ingest/ledgerbackend/catchup.go +++ /dev/null @@ -1,76 +0,0 @@ -package ledgerbackend - -import ( - "context" - "fmt" - - "github.com/stellar/go/support/log" -) - -type catchupStream struct { - dir workingDir - from uint32 - to uint32 - coreCmdFactory coreCmdFactory - log *log.Entry - useDB bool -} - -func newCatchupStream(r *stellarCoreRunner, from, to uint32) catchupStream { - // We want to use ephemeral directories in running the catchup command - // (used for the reingestion use case) because it's possible to run parallel - // reingestion where multiple captive cores are active on the same machine. - // Having ephemeral directories will ensure that each ingestion worker will - // have a separate working directory - dir := newWorkingDir(r, true) - return catchupStream{ - dir: dir, - from: from, - to: to, - coreCmdFactory: newCoreCmdFactory(r, dir), - log: r.log, - useDB: r.useDB, - } -} - -func (s catchupStream) getWorkingDir() workingDir { - return s.dir -} - -func (s catchupStream) start(ctx context.Context) (cmdI, pipe, error) { - var err error - var cmd cmdI - var captiveCorePipe pipe - - rangeArg := fmt.Sprintf("%d/%d", s.to, s.to-s.from+1) - params := []string{"catchup", rangeArg, "--metadata-output-stream", s.coreCmdFactory.getPipeName()} - - // horizon operator has specified to use external storage for captive core ledger state - // instruct captive core invocation to not use memory, and in that case - // cc will look at DATABASE property in cfg toml for the external storage source to use. - // when using external storage of ledgers, use new-db to first set the state of - // remote db storage to genesis to purge any prior state and reset. - if s.useDB { - cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOffline, true, "new-db") - if err != nil { - return nil, pipe{}, fmt.Errorf("error creating command: %w", err) - } - if err = cmd.Run(); err != nil { - return nil, pipe{}, fmt.Errorf("error initializing core db: %w", err) - } - } else { - params = append(params, "--in-memory") - } - - cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOffline, true, params...) - if err != nil { - return nil, pipe{}, fmt.Errorf("error creating command: %w", err) - } - - captiveCorePipe, err = s.coreCmdFactory.startCaptiveCore(cmd) - if err != nil { - return nil, pipe{}, fmt.Errorf("error starting `stellar-core run` subprocess: %w", err) - } - - return cmd, captiveCorePipe, nil -} diff --git a/ingest/ledgerbackend/cmd.go b/ingest/ledgerbackend/cmd.go index 8af729f0a6..265b15ed77 100644 --- a/ingest/ledgerbackend/cmd.go +++ b/ingest/ledgerbackend/cmd.go @@ -134,7 +134,10 @@ func newCoreCmdFactory(r *stellarCoreRunner, dir workingDir) coreCmdFactory { } } -func (c coreCmdFactory) newCmd(ctx context.Context, mode stellarCoreRunnerMode, redirectOutputToLogs bool, params ...string) (cmdI, error) { +func (c coreCmdFactory) newCmd(ctx context.Context, + mode stellarCoreRunnerMode, + redirectOutputToLogs bool, + params ...string) (cmdI, error) { if err := c.dir.createIfNotExists(); err != nil { return nil, err } diff --git a/ingest/ledgerbackend/dir.go b/ingest/ledgerbackend/dir.go index d26835936c..50e2047426 100644 --- a/ingest/ledgerbackend/dir.go +++ b/ingest/ledgerbackend/dir.go @@ -18,20 +18,24 @@ type workingDir struct { systemCaller systemCaller } -func newWorkingDir(r *stellarCoreRunner, ephemeral bool) workingDir { +func newWorkingDir(r *stellarCoreRunner, ephemeral bool) (workingDir, error) { var path string if ephemeral { path = filepath.Join(r.storagePath, "captive-core-"+createRandomHexString(8)) } else { path = filepath.Join(r.storagePath, "captive-core") } + tomlClone, err := r.toml.clone() + if err != nil { + return workingDir{}, err + } return workingDir{ ephemeral: ephemeral, path: path, log: r.log, - toml: r.toml, + toml: tomlClone, systemCaller: r.systemCaller, - } + }, nil } func (w workingDir) createIfNotExists() error { @@ -72,9 +76,9 @@ func (w workingDir) remove() error { } func generateConfig(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode) ([]byte, error) { - if mode == stellarCoreRunnerModeOffline { + if mode == stellarCoreRunnerModePassive { var err error - captiveCoreToml, err = captiveCoreToml.CatchupToml() + captiveCoreToml, err = captiveCoreToml.PassiveToml() if err != nil { return nil, fmt.Errorf("could not generate catch up config: %w", err) } diff --git a/ingest/ledgerbackend/mock_database_backend.go b/ingest/ledgerbackend/mock_database_backend.go index c5f85ecef7..591309eb9b 100644 --- a/ingest/ledgerbackend/mock_database_backend.go +++ b/ingest/ledgerbackend/mock_database_backend.go @@ -8,33 +8,33 @@ import ( "github.com/stellar/go/xdr" ) -var _ LedgerBackend = (*MockDatabaseBackend)(nil) +var _ LedgerBackend = (*MockLedgerBackend)(nil) -type MockDatabaseBackend struct { +type MockLedgerBackend struct { mock.Mock } -func (m *MockDatabaseBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { +func (m *MockLedgerBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { args := m.Called(ctx) return args.Get(0).(uint32), args.Error(1) } -func (m *MockDatabaseBackend) PrepareRange(ctx context.Context, ledgerRange Range) error { +func (m *MockLedgerBackend) PrepareRange(ctx context.Context, ledgerRange Range) error { args := m.Called(ctx, ledgerRange) return args.Error(0) } -func (m *MockDatabaseBackend) IsPrepared(ctx context.Context, ledgerRange Range) (bool, error) { +func (m *MockLedgerBackend) IsPrepared(ctx context.Context, ledgerRange Range) (bool, error) { args := m.Called(ctx, ledgerRange) return args.Bool(0), args.Error(1) } -func (m *MockDatabaseBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { +func (m *MockLedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { args := m.Called(ctx, sequence) return args.Get(0).(xdr.LedgerCloseMeta), args.Error(1) } -func (m *MockDatabaseBackend) Close() error { +func (m *MockLedgerBackend) Close() error { args := m.Called() return args.Error(0) } diff --git a/ingest/ledgerbackend/run_from.go b/ingest/ledgerbackend/run_from.go index 2d02322519..3c5961118c 100644 --- a/ingest/ledgerbackend/run_from.go +++ b/ingest/ledgerbackend/run_from.go @@ -13,26 +13,37 @@ import ( type runFromStream struct { dir workingDir from uint32 - hash string + fromHash string + runnerMode stellarCoreRunnerMode coreCmdFactory coreCmdFactory log *log.Entry useDB bool } -func newRunFromStream(r *stellarCoreRunner, from uint32, hash string) runFromStream { - // We only use ephemeral directories on windows because there is +// all ledger tx meta emitted on pipe from this function will have trusted hashes, as it is built +// from core's online replay +// r - the core runner +// from - the ledger sequnce to start streaming additional ledgers there after +// fromHash - the hash of from ledger +// runnerMode - stellarCoreRunnerModePassive or stellarCoreRunnerModeActive +func newRunFromStream(r *stellarCoreRunner, from uint32, fromHash string, runnerMode stellarCoreRunnerMode) (runFromStream, error) { + // Use ephemeral directories on windows because there is // no way to terminate captive core gracefully on windows. // Having an ephemeral directory ensures that it is wiped out - // whenever we terminate captive core - dir := newWorkingDir(r, runtime.GOOS == "windows") + // whenever captive core is terminated. + dir, err := newWorkingDir(r, runnerMode == stellarCoreRunnerModePassive || runtime.GOOS == "windows") + if err != nil { + return runFromStream{}, err + } return runFromStream{ dir: dir, from: from, - hash: hash, + fromHash: fromHash, + runnerMode: runnerMode, coreCmdFactory: newCoreCmdFactory(r, dir), log: r.log, useDB: r.useDB, - } + }, nil } func (s runFromStream) getWorkingDir() workingDir { @@ -40,7 +51,7 @@ func (s runFromStream) getWorkingDir() workingDir { } func (s runFromStream) offlineInfo(ctx context.Context) (stellarcore.InfoResponse, error) { - cmd, err := s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOnline, false, "offline-info") + cmd, err := s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModePassive, false, "offline-info") if err != nil { return stellarcore.InfoResponse{}, fmt.Errorf("error creating offline-info cmd: %w", err) } @@ -83,7 +94,7 @@ func (s runFromStream) start(ctx context.Context) (cmd cmdI, captiveCorePipe pip return nil, pipe{}, fmt.Errorf("error removing existing storage-dir contents: %w", err) } - cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOnline, true, "new-db") + cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModePassive, true, "new-db") if err != nil { return nil, pipe{}, fmt.Errorf("error creating command: %w", err) } @@ -92,38 +103,45 @@ func (s runFromStream) start(ctx context.Context) (cmd cmdI, captiveCorePipe pip return nil, pipe{}, fmt.Errorf("error initializing core db: %w", err) } - // Do a quick catch-up to set the LCL in core to be our expected starting - // point. + // This catchup is only run to set LCL on core's local storage to be our expected starting point. + // No ledgers are emitted or collected from pipe during this execution. if s.from > 2 { - cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOnline, true, "catchup", fmt.Sprintf("%d/0", s.from-1)) - } else { - cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOnline, true, "catchup", "2/0") - } - if err != nil { - return nil, pipe{}, fmt.Errorf("error creating command: %w", err) - } + cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModePassive, true, + "catchup", "--force-untrusted-catchup", fmt.Sprintf("%d/0", s.from-1)) - if err = cmd.Run(); err != nil { - return nil, pipe{}, fmt.Errorf("error runing stellar-core catchup: %w", err) + if err != nil { + return nil, pipe{}, fmt.Errorf("error creating catchup command to set LCL: %w", err) + } + + if err = cmd.Run(); err != nil { + return nil, pipe{}, fmt.Errorf("error running stellar-core catchup to set LCL: %w", err) + } + } else { + // If the from is < 3, the caller wants ledger 2, to get that from core 'run' + // we don't run catchup to set LCL, leave it at empty, new db state with LCL=1 + // and instead we set CATCHUP_COMPLETE=true, which will trigger core to emit ledger 2 first + s.dir.toml.CatchupComplete = true } } - cmd, err = s.coreCmdFactory.newCmd( - ctx, - stellarCoreRunnerModeOnline, + // this will emit ledgers on the pipe, starting with sequence LCL+1 + cmd, err = s.coreCmdFactory.newCmd(ctx, + s.runnerMode, true, "run", "--metadata-output-stream", s.coreCmdFactory.getPipeName(), ) } else { + // TODO, remove, this is effectively obsolete production code path, only tests reach this, production code path + // only allows on-disk aka useDB mode. cmd, err = s.coreCmdFactory.newCmd( ctx, - stellarCoreRunnerModeOnline, + stellarCoreRunnerModeActive, true, "run", "--in-memory", "--start-at-ledger", fmt.Sprintf("%d", s.from), - "--start-at-hash", s.hash, + "--start-at-hash", s.fromHash, "--metadata-output-stream", s.coreCmdFactory.getPipeName(), ) } diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 5245051dce..c89272d1f3 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -11,8 +11,7 @@ import ( ) type stellarCoreRunnerInterface interface { - catchup(from, to uint32) error - runFrom(from uint32, hash string) error + runFrom(from uint32, hash string, runnerMode stellarCoreRunnerMode) error getMetaPipe() (<-chan metaResult, bool) context() context.Context getProcessExitError() (error, bool) @@ -23,8 +22,8 @@ type stellarCoreRunnerMode int const ( _ stellarCoreRunnerMode = iota // unset - stellarCoreRunnerModeOnline - stellarCoreRunnerModeOffline + stellarCoreRunnerModeActive + stellarCoreRunnerModePassive ) // stellarCoreRunner uses a named pipe ( https://en.wikipedia.org/wiki/Named_pipe ) to stream ledgers directly @@ -103,13 +102,12 @@ func (r *stellarCoreRunner) context() context.Context { } // runFrom executes the run command with a starting ledger on the captive core subprocess -func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { - return r.startMetaStream(newRunFromStream(r, from, hash)) -} - -// catchup executes the catchup command on the captive core subprocess -func (r *stellarCoreRunner) catchup(from, to uint32) error { - return r.startMetaStream(newCatchupStream(r, from, to)) +func (r *stellarCoreRunner) runFrom(from uint32, hash string, runnerMode stellarCoreRunnerMode) error { + runFromStream, err := newRunFromStream(r, from, hash, runnerMode) + if err != nil { + return err + } + return r.startMetaStream(runFromStream) } type metaStream interface { diff --git a/ingest/ledgerbackend/stellar_core_runner_test.go b/ingest/ledgerbackend/stellar_core_runner_test.go index f53cd88328..12634f7e10 100644 --- a/ingest/ledgerbackend/stellar_core_runner_test.go +++ b/ingest/ledgerbackend/stellar_core_runner_test.go @@ -15,46 +15,6 @@ import ( "github.com/stellar/go/support/log" ) -func TestCloseOffline(t *testing.T) { - captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) - assert.NoError(t, err) - - runner := newStellarCoreRunner(CaptiveCoreConfig{ - BinaryPath: "/usr/bin/stellar-core", - HistoryArchiveURLs: []string{"http://localhost"}, - Log: log.New(), - Context: context.Background(), - Toml: captiveCoreToml, - StoragePath: "/tmp/captive-core", - }) - - cmdMock := simpleCommandMock() - cmdMock.On("Wait").Return(nil) - - // Replace system calls with a mock - scMock := &mockSystemCaller{} - defer scMock.AssertExpectations(t) - scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) - scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) - scMock.On("command", - runner.ctx, - "/usr/bin/stellar-core", - "--conf", - mock.Anything, - "--console", - "catchup", - "200/101", - "--metadata-output-stream", - "fd:3", - "--in-memory", - ).Return(cmdMock) - scMock.On("removeAll", mock.Anything).Return(nil).Once() - runner.systemCaller = scMock - - assert.NoError(t, runner.catchup(100, 200)) - assert.NoError(t, runner.close()) -} - func TestCloseOnline(t *testing.T) { captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) assert.NoError(t, err) @@ -95,7 +55,7 @@ func TestCloseOnline(t *testing.T) { ).Return(cmdMock) runner.systemCaller = scMock - assert.NoError(t, runner.runFrom(100, "hash")) + assert.NoError(t, runner.runFrom(100, "hash", stellarCoreRunnerModeActive)) assert.NoError(t, runner.close()) } @@ -140,7 +100,7 @@ func TestCloseOnlineWithError(t *testing.T) { scMock.On("removeAll", mock.Anything).Return(nil).Once() runner.systemCaller = scMock - assert.NoError(t, runner.runFrom(100, "hash")) + assert.NoError(t, runner.runFrom(100, "hash", stellarCoreRunnerModeActive)) // Wait with calling close until r.processExitError is set to Wait() error for { @@ -183,16 +143,19 @@ func TestCloseConcurrency(t *testing.T) { "--conf", mock.Anything, "--console", - "catchup", - "200/101", + "run", + "--in-memory", + "--start-at-ledger", + "100", + "--start-at-hash", + "", "--metadata-output-stream", "fd:3", - "--in-memory", ).Return(cmdMock) scMock.On("removeAll", mock.Anything).Return(nil).Once() runner.systemCaller = scMock - assert.NoError(t, runner.catchup(100, 200)) + assert.NoError(t, runner.runFrom(100, "", stellarCoreRunnerModeActive)) var wg sync.WaitGroup for i := 0; i < 10; i++ { @@ -261,7 +224,7 @@ func TestRunFromUseDBLedgersMatch(t *testing.T) { // removeAll not called runner.systemCaller = scMock - assert.NoError(t, runner.runFrom(100, "hash")) + assert.NoError(t, runner.runFrom(100, "hash", stellarCoreRunnerModeActive)) assert.NoError(t, runner.close()) } @@ -323,7 +286,7 @@ func TestRunFromUseDBLedgersBehind(t *testing.T) { ).Return(cmdMock) runner.systemCaller = scMock - assert.NoError(t, runner.runFrom(100, "hash")) + assert.NoError(t, runner.runFrom(100, "hash", stellarCoreRunnerModeActive)) assert.NoError(t, runner.close()) } @@ -389,6 +352,7 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { mock.Anything, "--console", "catchup", + "--force-untrusted-catchup", "99/0", ).Return(catchupCmdMock) scMock.On("command", @@ -403,6 +367,6 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { ).Return(cmdMock) runner.systemCaller = scMock - assert.NoError(t, runner.runFrom(100, "hash")) + assert.NoError(t, runner.runFrom(100, "hash", stellarCoreRunnerModeActive)) assert.NoError(t, runner.close()) } diff --git a/ingest/ledgerbackend/testdata/expected-offline-core.cfg b/ingest/ledgerbackend/testdata/expected-offline-core.cfg index ec37e504fc..a268e61cc9 100644 --- a/ingest/ledgerbackend/testdata/expected-offline-core.cfg +++ b/ingest/ledgerbackend/testdata/expected-offline-core.cfg @@ -7,7 +7,6 @@ HTTP_PORT = 0 LOG_FILE_PATH = "" NETWORK_PASSPHRASE = "Public Global Stellar Network ; September 2015" PEER_PORT = 12345 -RUN_STANDALONE = true UNSAFE_QUORUM = true [HISTORY.h0] diff --git a/ingest/ledgerbackend/testdata/expected-offline-enforce-diag-events-and-metav1.cfg b/ingest/ledgerbackend/testdata/expected-offline-enforce-diag-events-and-metav1.cfg index fa25d69b98..2ce4a664e9 100644 --- a/ingest/ledgerbackend/testdata/expected-offline-enforce-diag-events-and-metav1.cfg +++ b/ingest/ledgerbackend/testdata/expected-offline-enforce-diag-events-and-metav1.cfg @@ -7,7 +7,6 @@ FAILURE_SAFETY = 0 HTTP_PORT = 0 LOG_FILE_PATH = "" NETWORK_PASSPHRASE = "Public Global Stellar Network ; September 2015" -RUN_STANDALONE = true UNSAFE_QUORUM = true [HISTORY.h0] diff --git a/ingest/ledgerbackend/testdata/expected-offline-enforce-disabled-diagnostic-events.cfg b/ingest/ledgerbackend/testdata/expected-offline-enforce-disabled-diagnostic-events.cfg index 4a971a19d2..1c059f7491 100644 --- a/ingest/ledgerbackend/testdata/expected-offline-enforce-disabled-diagnostic-events.cfg +++ b/ingest/ledgerbackend/testdata/expected-offline-enforce-disabled-diagnostic-events.cfg @@ -4,7 +4,6 @@ FAILURE_SAFETY = 0 HTTP_PORT = 0 LOG_FILE_PATH = "" NETWORK_PASSPHRASE = "Public Global Stellar Network ; September 2015" -RUN_STANDALONE = true UNSAFE_QUORUM = true [HISTORY.h0] diff --git a/ingest/ledgerbackend/testdata/expected-offline-with-appendix-core.cfg b/ingest/ledgerbackend/testdata/expected-offline-with-appendix-core.cfg index 30159c150a..273db38687 100644 --- a/ingest/ledgerbackend/testdata/expected-offline-with-appendix-core.cfg +++ b/ingest/ledgerbackend/testdata/expected-offline-with-appendix-core.cfg @@ -5,7 +5,6 @@ HTTP_PORT = 0 LOG_FILE_PATH = "" NETWORK_PASSPHRASE = "Public Global Stellar Network ; September 2015" PEER_PORT = 12345 -RUN_STANDALONE = true UNSAFE_QUORUM = true [[HOME_DOMAINS]] diff --git a/ingest/ledgerbackend/testdata/expected-offline-with-extra-fields.cfg b/ingest/ledgerbackend/testdata/expected-offline-with-extra-fields.cfg index eab3b67bf8..dc291c02d6 100644 --- a/ingest/ledgerbackend/testdata/expected-offline-with-extra-fields.cfg +++ b/ingest/ledgerbackend/testdata/expected-offline-with-extra-fields.cfg @@ -6,7 +6,6 @@ LOG_FILE_PATH = "" NETWORK_PASSPHRASE = "Public Global Stellar Network ; September 2015" NODE_NAMES = ["GA22N4YGO7IJDRF2SISA5KHULGYYKDXBQGYIWUVNMSNHF5G2DNBKP3M5 eliza", "GCDENOCHA6TQL6DFC4FS54HIH7RP7XR7VZCQZFANMGLT2WXJ7D7KGV2P hal9000", "GDV46EIEF57TDL4W27UFDAUVPDDCKJNVBYB3WIV2WYUYUG753FCFU6EJ victor"] PEER_PORT = 12345 -RUN_STANDALONE = true UNSAFE_QUORUM = true [HISTORY] diff --git a/ingest/ledgerbackend/testdata/expected-offline-with-no-peer-port.cfg b/ingest/ledgerbackend/testdata/expected-offline-with-no-peer-port.cfg index 9c5fd26769..5f431cb3b4 100644 --- a/ingest/ledgerbackend/testdata/expected-offline-with-no-peer-port.cfg +++ b/ingest/ledgerbackend/testdata/expected-offline-with-no-peer-port.cfg @@ -4,7 +4,6 @@ FAILURE_SAFETY = 0 HTTP_PORT = 0 LOG_FILE_PATH = "/var/stellar-core/test.log" NETWORK_PASSPHRASE = "Public Global Stellar Network ; September 2015" -RUN_STANDALONE = true UNSAFE_QUORUM = true [HISTORY.h0] diff --git a/ingest/ledgerbackend/toml.go b/ingest/ledgerbackend/toml.go index b7f41b03a7..40328761de 100644 --- a/ingest/ledgerbackend/toml.go +++ b/ingest/ledgerbackend/toml.go @@ -87,6 +87,7 @@ type captiveCoreTomlValues struct { // we cannot omitempty because 0 is a valid configuration for FAILURE_SAFETY // and the default is -1 FailureSafety int `toml:"FAILURE_SAFETY"` + CatchupComplete bool `toml:"CATCHUP_COMPLETE,omitempty"` UnsafeQuorum bool `toml:"UNSAFE_QUORUM,omitempty"` RunStandalone bool `toml:"RUN_STANDALONE,omitempty"` ArtificiallyAccelerateTimeForTesting bool `toml:"ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING,omitempty"` @@ -415,32 +416,32 @@ func (c *CaptiveCoreToml) clone() (*CaptiveCoreToml, error) { return &cloned, nil } -// CatchupToml returns a new CaptiveCoreToml instance based off the existing +// PassiveToml returns a new CaptiveCoreToml instance based off the existing // instance with some modifications which are suitable for running -// the catchup command on captive core. -func (c *CaptiveCoreToml) CatchupToml() (*CaptiveCoreToml, error) { - offline, err := c.clone() +// the run command on captive core with some defaults to ensure startup +// and disabling optional services like the http port. +func (c *CaptiveCoreToml) PassiveToml() (*CaptiveCoreToml, error) { + passiveToml, err := c.clone() if err != nil { return nil, errors.Wrap(err, "could not clone toml") } - offline.RunStandalone = true - offline.UnsafeQuorum = true - offline.PublicHTTPPort = false - offline.HTTPPort = 0 - offline.FailureSafety = 0 + passiveToml.UnsafeQuorum = true + passiveToml.PublicHTTPPort = false + passiveToml.HTTPPort = 0 + passiveToml.FailureSafety = 0 if !c.QuorumSetIsConfigured() { // Add a fictional quorum -- necessary to convince core to start up; // but not used at all for our purposes. Pubkey here is just random. - offline.QuorumSetEntries = map[string]QuorumSet{ + passiveToml.QuorumSetEntries = map[string]QuorumSet{ "QUORUM_SET": { ThresholdPercent: 100, Validators: []string{"GCZBOIAY4HLKAJVNJORXZOZRAY2BJDBZHKPBHZCRAIUR5IHC2UHBGCQR"}, }, } } - return offline, nil + return passiveToml, nil } func (c *CaptiveCoreToml) setDefaults(params CaptiveCoreTomlParams) { diff --git a/ingest/ledgerbackend/toml_test.go b/ingest/ledgerbackend/toml_test.go index 42412b85f1..73ef4aea0d 100644 --- a/ingest/ledgerbackend/toml_test.go +++ b/ingest/ledgerbackend/toml_test.go @@ -247,7 +247,7 @@ func TestGenerateConfig(t *testing.T) { }{ { name: "offline config with no appendix", - mode: stellarCoreRunnerModeOffline, + mode: stellarCoreRunnerModePassive, appendPath: "", expectedPath: filepath.Join("testdata", "expected-offline-core.cfg"), httpPort: newUint(6789), @@ -257,7 +257,7 @@ func TestGenerateConfig(t *testing.T) { }, { name: "offline config with no peer port", - mode: stellarCoreRunnerModeOffline, + mode: stellarCoreRunnerModePassive, appendPath: "", expectedPath: filepath.Join("testdata", "expected-offline-with-no-peer-port.cfg"), httpPort: newUint(6789), @@ -266,7 +266,7 @@ func TestGenerateConfig(t *testing.T) { }, { name: "online config with appendix", - mode: stellarCoreRunnerModeOnline, + mode: stellarCoreRunnerModeActive, appendPath: filepath.Join("testdata", "sample-appendix.cfg"), expectedPath: filepath.Join("testdata", "expected-online-core.cfg"), httpPort: newUint(6789), @@ -275,7 +275,7 @@ func TestGenerateConfig(t *testing.T) { }, { name: "online config with unsupported field in appendix", - mode: stellarCoreRunnerModeOnline, + mode: stellarCoreRunnerModeActive, appendPath: filepath.Join("testdata", "invalid-captive-core-field.cfg"), expectedPath: filepath.Join("testdata", "expected-online-core.cfg"), httpPort: newUint(6789), @@ -284,7 +284,7 @@ func TestGenerateConfig(t *testing.T) { }, { name: "online config with no peer port", - mode: stellarCoreRunnerModeOnline, + mode: stellarCoreRunnerModeActive, appendPath: filepath.Join("testdata", "sample-appendix.cfg"), expectedPath: filepath.Join("testdata", "expected-online-with-no-peer-port.cfg"), httpPort: newUint(6789), @@ -293,7 +293,7 @@ func TestGenerateConfig(t *testing.T) { }, { name: "online config with no http port", - mode: stellarCoreRunnerModeOnline, + mode: stellarCoreRunnerModeActive, appendPath: filepath.Join("testdata", "sample-appendix.cfg"), expectedPath: filepath.Join("testdata", "expected-online-with-no-http-port.cfg"), httpPort: nil, @@ -302,7 +302,7 @@ func TestGenerateConfig(t *testing.T) { }, { name: "offline config with appendix", - mode: stellarCoreRunnerModeOffline, + mode: stellarCoreRunnerModePassive, appendPath: filepath.Join("testdata", "sample-appendix.cfg"), expectedPath: filepath.Join("testdata", "expected-offline-with-appendix-core.cfg"), httpPort: newUint(6789), @@ -311,7 +311,7 @@ func TestGenerateConfig(t *testing.T) { }, { name: "offline config with extra fields in appendix", - mode: stellarCoreRunnerModeOffline, + mode: stellarCoreRunnerModePassive, appendPath: filepath.Join("testdata", "appendix-with-fields.cfg"), expectedPath: filepath.Join("testdata", "expected-offline-with-extra-fields.cfg"), httpPort: newUint(6789), @@ -320,7 +320,7 @@ func TestGenerateConfig(t *testing.T) { }, { name: "offline config with enforce diagnostic events and metav1", - mode: stellarCoreRunnerModeOffline, + mode: stellarCoreRunnerModePassive, expectedPath: filepath.Join("testdata", "expected-offline-enforce-diag-events-and-metav1.cfg"), logPath: nil, enforceSorobanDiagnosticEvents: true, @@ -328,7 +328,7 @@ func TestGenerateConfig(t *testing.T) { }, { name: "offline config disabling enforced diagnostic events and metav1", - mode: stellarCoreRunnerModeOffline, + mode: stellarCoreRunnerModePassive, expectedPath: filepath.Join("testdata", "expected-offline-enforce-disabled-diagnostic-events.cfg"), appendPath: filepath.Join("testdata", "appendix-disable-diagnostic-events-and-metav1.cfg"), logPath: nil, @@ -337,7 +337,7 @@ func TestGenerateConfig(t *testing.T) { }, { name: "online config with enforce diagnostic events and meta v1", - mode: stellarCoreRunnerModeOnline, + mode: stellarCoreRunnerModeActive, appendPath: filepath.Join("testdata", "sample-appendix.cfg"), expectedPath: filepath.Join("testdata", "expected-online-with-no-http-port-diag-events-metav1.cfg"), httpPort: nil, @@ -348,14 +348,14 @@ func TestGenerateConfig(t *testing.T) { }, { name: "offline config with minimum persistent entry in appendix", - mode: stellarCoreRunnerModeOnline, + mode: stellarCoreRunnerModeActive, appendPath: filepath.Join("testdata", "appendix-with-minimum-persistent-entry.cfg"), expectedPath: filepath.Join("testdata", "expected-online-with-appendix-minimum-persistent-entry.cfg"), logPath: nil, }, { name: "default BucketlistDB config", - mode: stellarCoreRunnerModeOnline, + mode: stellarCoreRunnerModeActive, appendPath: filepath.Join("testdata", "sample-appendix.cfg"), expectedPath: filepath.Join("testdata", "expected-default-bucketlistdb-core.cfg"), useDB: true, @@ -363,7 +363,7 @@ func TestGenerateConfig(t *testing.T) { }, { name: "BucketlistDB config in appendix", - mode: stellarCoreRunnerModeOnline, + mode: stellarCoreRunnerModeActive, appendPath: filepath.Join("testdata", "sample-appendix-bucketlistdb.cfg"), expectedPath: filepath.Join("testdata", "expected-bucketlistdb-core.cfg"), useDB: true, @@ -416,7 +416,7 @@ func TestGenerateCoreConfigInMemory(t *testing.T) { captiveCoreToml, err = NewCaptiveCoreTomlFromFile(appendPath, params) assert.NoError(t, err) - configBytes, err := generateConfig(captiveCoreToml, stellarCoreRunnerModeOnline) + configBytes, err := generateConfig(captiveCoreToml, stellarCoreRunnerModeActive) assert.NoError(t, err) expectedByte, err := ioutil.ReadFile(expectedPath) @@ -467,7 +467,7 @@ func TestExternalStorageConfigUsesDatabaseToml(t *testing.T) { assert.NoError(t, err) captiveCoreToml.Database = "sqlite3:///etc/defaults/stellar.db" - configBytes, err := generateConfig(captiveCoreToml, stellarCoreRunnerModeOffline) + configBytes, err := generateConfig(captiveCoreToml, stellarCoreRunnerModePassive) assert.NoError(t, err) toml := CaptiveCoreToml{} @@ -495,7 +495,7 @@ func TestDBConfigDefaultsToSqlite(t *testing.T) { captiveCoreToml, err = NewCaptiveCoreToml(params) assert.NoError(t, err) - configBytes, err := generateConfig(captiveCoreToml, stellarCoreRunnerModeOffline) + configBytes, err := generateConfig(captiveCoreToml, stellarCoreRunnerModePassive) assert.NoError(t, err) toml := CaptiveCoreToml{} @@ -526,7 +526,7 @@ func TestNonDBConfigDoesNotUpdateDatabase(t *testing.T) { captiveCoreToml, err = NewCaptiveCoreToml(params) assert.NoError(t, err) - configBytes, err := generateConfig(captiveCoreToml, stellarCoreRunnerModeOffline) + configBytes, err := generateConfig(captiveCoreToml, stellarCoreRunnerModePassive) assert.NoError(t, err) toml := CaptiveCoreToml{} diff --git a/services/galexie/internal/exportmanager_test.go b/services/galexie/internal/exportmanager_test.go index 08829a8f8e..0ad573a0ee 100644 --- a/services/galexie/internal/exportmanager_test.go +++ b/services/galexie/internal/exportmanager_test.go @@ -46,12 +46,12 @@ func TestExporterSuite(t *testing.T) { type ExportManagerSuite struct { suite.Suite ctx context.Context - mockBackend ledgerbackend.MockDatabaseBackend + mockBackend ledgerbackend.MockLedgerBackend } func (s *ExportManagerSuite) SetupTest() { s.ctx = context.Background() - s.mockBackend = ledgerbackend.MockDatabaseBackend{} + s.mockBackend = ledgerbackend.MockLedgerBackend{} } func (s *ExportManagerSuite) TearDownTest() { diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index cd5d8af57b..7aa080315a 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -5,14 +5,21 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). ## Pending -### Added +### Fixed + +- Horizon: not passing trusted hash to captive-core when configured to run catchup "on disk" ([4538](https://github.com/stellar/go/pull/4538)) + * The Captive Core backend now performs 'online' stellar-core `run` for bounded modes of tx-meta retrieval, which will be used for `db reingest range` and `ingest verify-range` commands. Enables core to build, validate, and emit trusted ledger hashes in tx-meta stream for the requested ledger range. These bounded range commands will no longer do the 'offline' mode of running core `catchup` for getting tx-meta from just history archives, which does not guarantee verification of the ledger hashes to that of live network. ([#4538](https://github.com/stellar/go/pull/4538)). + * Note - due to the usage of `run` with LCL set to the `from` , there is now potential for longer run time `reingest` and `verify-range` execution durations due to core having to perform online replay from network latest ledger back to `from`. The longer runtime duration will be proportional to the older age of the `from` ledger. + + +## 2.32.0 - Reingest from pre-computed tx meta on remote cloud storage. ([4911](https://github.com/stellar/go/issues/4911)), ([5374](https://github.com/stellar/go/pull/5374)) - Configure horizon reingestion to obtain ledger tx meta in pre-computed files from a Google Cloud Storage(GCS) location. - Using this option will no longer require a captive core binary be present and it no longer runs a captive core sub-process, instead obtaining the tx meta from the GCS backend. - Horizon supports this new feature with two new parameters `ledgerbackend` and `datastore-config` on the `reingest` command. Refer to [Reingestion README](./internal/ingest/README.md#reingestion). - - +- Add metrics for reaping of history lookup tables ([5385](https://github.com/stellar/go/pull/5385)). +- Add `--reap-lookup-tables` (defaults to true) which will disable reaping of history lookup tables when set to false. ([5366](https://github.com/stellar/go/pull/5366)). ## 2.31.0 diff --git a/services/horizon/cmd/core-test.cfg b/services/horizon/cmd/core-test.cfg new file mode 100644 index 0000000000..94aedc4a6f --- /dev/null +++ b/services/horizon/cmd/core-test.cfg @@ -0,0 +1 @@ +# place holder file for test reference. \ No newline at end of file diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 92a732e002..b5ad89e809 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -481,7 +481,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor var err error var storageBackendConfig ingest.StorageBackendConfig - options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} + options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: true} if ledgerBackendType == ingest.BufferedStorageBackend { if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil { return err @@ -543,7 +543,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor var err error var storageBackendConfig ingest.StorageBackendConfig - options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} + options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: true} if ledgerBackendType == ingest.BufferedStorageBackend { if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil { return err diff --git a/services/horizon/cmd/db_test.go b/services/horizon/cmd/db_test.go index 6a00576bd3..dcf66a71fc 100644 --- a/services/horizon/cmd/db_test.go +++ b/services/horizon/cmd/db_test.go @@ -121,6 +121,7 @@ func (s *DBCommandsTestSuite) TestDbReingestAndFillGapsCmds() { "1", "100", "--network-passphrase", "passphrase", "--history-archive-urls", "[]", + "--captive-core-config-path", "core-test.cfg", }, expectError: false, }, diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index f6b94a8f52..02c08341e5 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -95,7 +95,7 @@ var ingestVerifyRangeCmd = &cobra.Command{ co.SetValue() } - if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil { + if err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: true}); err != nil { return err } diff --git a/services/horizon/docker/captive-core-reingest-range-integration-tests.cfg b/services/horizon/docker/captive-core-reingest-range-integration-tests.cfg deleted file mode 100644 index 44820f5933..0000000000 --- a/services/horizon/docker/captive-core-reingest-range-integration-tests.cfg +++ /dev/null @@ -1,12 +0,0 @@ -ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING=true -TESTING_SOROBAN_HIGH_LIMIT_OVERRIDE=true -TESTING_MINIMUM_PERSISTENT_ENTRY_LIFETIME=10 -ENABLE_SOROBAN_DIAGNOSTIC_EVENTS=true - -[[VALIDATORS]] -NAME="local_core" -HOME_DOMAIN="core.local" -# From "SACJC372QBSSKJYTV5A7LWT4NXWHTQO6GHG4QDAVC2XDPX6CNNXFZ4JK" -PUBLIC_KEY="GD5KD2KEZJIGTC63IGW6UMUSMVUVG5IHG64HUTFWCHVZH2N2IBOQN7PS" -ADDRESS="localhost" -QUALITY="MEDIUM" diff --git a/services/horizon/docker/verify-range/Dockerfile b/services/horizon/docker/verify-range/Dockerfile index 6323870f38..eaa5d9274d 100644 --- a/services/horizon/docker/verify-range/Dockerfile +++ b/services/horizon/docker/verify-range/Dockerfile @@ -10,8 +10,8 @@ ADD dependencies / RUN ["chmod", "+x", "/dependencies"] RUN /dependencies -ADD stellar-core.cfg / ADD captive-core-pubnet.cfg / +ADD captive-core-testnet.cfg / RUN mkdir -p /cc RUN mkdir -p /data diff --git a/services/horizon/docker/verify-range/captive-core-pubnet.cfg b/services/horizon/docker/verify-range/captive-core-pubnet.cfg index 5a702711fe..171a50358f 100644 --- a/services/horizon/docker/verify-range/captive-core-pubnet.cfg +++ b/services/horizon/docker/verify-range/captive-core-pubnet.cfg @@ -1,5 +1,5 @@ PEER_PORT=11725 - +NETWORK_PASSPHRASE="Public Global Stellar Network ; September 2015" FAILURE_SAFETY=1 [[HOME_DOMAINS]] diff --git a/services/horizon/docker/verify-range/captive-core-testnet.cfg b/services/horizon/docker/verify-range/captive-core-testnet.cfg new file mode 100644 index 0000000000..018f413dea --- /dev/null +++ b/services/horizon/docker/verify-range/captive-core-testnet.cfg @@ -0,0 +1,29 @@ +PEER_PORT=11725 +NETWORK_PASSPHRASE="Test SDF Network ; September 2015" +UNSAFE_QUORUM=true +FAILURE_SAFETY=1 + +[[HOME_DOMAINS]] +HOME_DOMAIN="testnet.stellar.org" +QUALITY="HIGH" + +[[VALIDATORS]] +NAME="sdf_testnet_1" +HOME_DOMAIN="testnet.stellar.org" +PUBLIC_KEY="GDKXE2OZMJIPOSLNA6N6F2BVCI3O777I2OOC4BV7VOYUEHYX7RTRYA7Y" +ADDRESS="core-testnet1.stellar.org" +HISTORY="curl -sf http://history.stellar.org/prd/core-testnet/core_testnet_001/{0} -o {1}" + +[[VALIDATORS]] +NAME="sdf_testnet_2" +HOME_DOMAIN="testnet.stellar.org" +PUBLIC_KEY="GCUCJTIYXSOXKBSNFGNFWW5MUQ54HKRPGJUTQFJ5RQXZXNOLNXYDHRAP" +ADDRESS="core-testnet2.stellar.org" +HISTORY="curl -sf http://history.stellar.org/prd/core-testnet/core_testnet_002/{0} -o {1}" + +[[VALIDATORS]] +NAME="sdf_testnet_3" +HOME_DOMAIN="testnet.stellar.org" +PUBLIC_KEY="GC2V2EFSXN6SQTWVYA5EPJPBWWIMSD2XQNKUOHGEKB535AQE2I6IXV2Z" +ADDRESS="core-testnet3.stellar.org" +HISTORY="curl -sf http://history.stellar.org/prd/core-testnet/core_testnet_003/{0} -o {1}" \ No newline at end of file diff --git a/services/horizon/docker/verify-range/dependencies b/services/horizon/docker/verify-range/dependencies index 3eacede44b..dd157fe8fd 100644 --- a/services/horizon/docker/verify-range/dependencies +++ b/services/horizon/docker/verify-range/dependencies @@ -2,7 +2,7 @@ set -e apt-get update -apt-get install -y curl git libpq-dev libsqlite3-dev libsasl2-dev postgresql-client postgresql postgresql-contrib sudo vim zlib1g-dev wget gnupg2 lsb-release +apt-get install -y curl git libpq-dev libsqlite3-dev libsasl2-dev postgresql-client postgresql postgresql-contrib sudo vim zlib1g-dev wget jq gnupg2 lsb-release apt-get clean wget -qO - https://apt.stellar.org/SDF.asc | apt-key add - diff --git a/services/horizon/docker/verify-range/start b/services/horizon/docker/verify-range/start index 0e7c69403d..a81342e27b 100644 --- a/services/horizon/docker/verify-range/start +++ b/services/horizon/docker/verify-range/start @@ -1,6 +1,20 @@ #! /usr/bin/env bash set -e +if [ -z "${TESTNET}" ]; then + export CAPTIVE_CORE_CONFIG_APPEND_PATH="/captive-core-pubnet.cfg" + export HISTORY_ARCHIVE_URLS="https://s3-eu-west-1.amazonaws.com/history.stellar.org/prd/core-live/core_live_001" + export NETWORK_PASSPHRASE="Public Global Stellar Network ; September 2015" + +else + export HISTORY_ARCHIVE_URLS="http://history.stellar.org/prd/core-testnet/core_testnet_001" + export CAPTIVE_CORE_CONFIG_APPEND_PATH="/captive-core-testnet.cfg" + export NETWORK_PASSPHRASE="Test SDF Network ; September 2015" +fi + +export DATABASE_URL="postgres://postgres:postgres@localhost:5432/horizon?sslmode=disable" +export STELLAR_CORE_BINARY_PATH="/usr/bin/stellar-core" + # configure postgres if [ -z "${PGDATA}" ]; then export PGDATA="/data" @@ -13,6 +27,18 @@ sudo -u postgres --preserve-env=PGDATA /usr/lib/postgresql/14/bin/pg_ctl start sudo -u postgres createdb horizon sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'postgres';" +if [ $FROM -eq 0 ] && [ $TO -eq 0 ]; then + echo "obtaining current ledger from archive url $HISTORY_ARCHIVE_URLS" + CURRENTHAS=$(curl -s "$HISTORY_ARCHIVE_URLS/.well-known/stellar-history.json" | jq -sr 'if (. | length) == 0 then null else .[0] end | select( . != null ) | if .currentLedger < 64 then "0;63" else "\(.currentLedger-64);\(.currentLedger)" end' 2> /dev/null) + if [ -z "$CURRENTHAS" ]; then + echo "unable to get current sequence from $HISTORY_ARCHIVE_URLS" + exit 1 + fi + IFS=';' read -a RECENTSMALLRANGE <<< "$CURRENTHAS" + FROM=${RECENTSMALLRANGE[0]} + TO=${RECENTSMALLRANGE[1]} +fi + # Calculate params for AWS Batch if [ ! -z "$AWS_BATCH_JOB_ARRAY_INDEX" ]; then # The batch should have three env variables: @@ -72,13 +98,6 @@ dump_horizon_db() { psql "postgres://postgres:postgres@localhost:5432/horizon?sslmode=disable" -t -A -F"," --variable="FETCH_COUNT=100" -c "select history_transaction_id, address from history_transaction_participants left join history_accounts on history_accounts.id = history_transaction_participants.history_account_id order by history_transaction_id, address" > "${1}_transaction_participants" } -# pubnet horizon config -export NETWORK_PASSPHRASE="Public Global Stellar Network ; September 2015" -export HISTORY_ARCHIVE_URLS="https://s3-eu-west-1.amazonaws.com/history.stellar.org/prd/core-live/core_live_001" -export DATABASE_URL="postgres://postgres:postgres@localhost:5432/horizon?sslmode=disable" -export CAPTIVE_CORE_CONFIG_APPEND_PATH="/captive-core-pubnet.cfg" -export STELLAR_CORE_BINARY_PATH="/usr/bin/stellar-core" - cd stellar-go git pull origin if [ ! -z "$BRANCH" ]; then @@ -133,7 +152,7 @@ function compare() { fi } -BASE_BRANCH=${BASE_BRANCH:-horizon-v2.0.0} +BASE_BRANCH=${BASE_BRANCH:-horizon-v2.32.0} rm -rf /data/compare mkdir /data/compare diff --git a/services/horizon/docker/verify-range/stellar-core.cfg b/services/horizon/docker/verify-range/stellar-core.cfg deleted file mode 100644 index 6139f9f528..0000000000 --- a/services/horizon/docker/verify-range/stellar-core.cfg +++ /dev/null @@ -1,56 +0,0 @@ -AUTOMATIC_MAINTENANCE_PERIOD=0 -PUBLIC_HTTP_PORT=true -LOG_FILE_PATH="" -NETWORK_PASSPHRASE="Public Global Stellar Network ; September 2015" -CATCHUP_RECENT=100 - -AUTOMATIC_MAINTENANCE_COUNT=0 - -NODE_NAMES=[ -"GAOO3LWBC4XF6VWRP5ESJ6IBHAISVJMSBTALHOQM2EZG7Q477UWA6L7U eno", -"GAXP5DW4CVCW2BJNPFGTWCEGZTJKTNWFQQBE5SCWNJIJ54BOHR3WQC3W moni", -"GBFZFQRGOPQC5OEAWO76NOY6LBRLUNH4I5QYPUYAK53QSQWVTQ2D4FT5 dzham", -"GDXWQCSKVYAJSUGR2HBYVFVR7NA7YWYSYK3XYKKFO553OQGOHAUP2PX2 jianing", -"GCJCSMSPIWKKPR7WEPIQG63PDF7JGGEENRC33OKVBSPUDIRL6ZZ5M7OO tempo.eu.com", -"GCCW4H2DKAC7YYW62H3ZBDRRE5KXRLYLI4T5QOSO6EAMUOE37ICSKKRJ sparrow_tw", -"GD5DJQDDBKGAYNEAXU562HYGOOSYAEOO6AS53PZXBOZGCP5M2OPGMZV3 fuxi.lab", -"GBGGNBZVYNMVLCWNQRO7ASU6XX2MRPITAGLASRWOWLB4ZIIPHMGNMC4I huang.lab", -"GDPJ4DPPFEIP2YTSQNOKT7NMLPKU2FFVOEIJMG36RCMBWBUR4GTXLL57 nezha.lab", -"GCDLFPQ76D6YUSCUECLKI3AFEVXFWVRY2RZH2YQNYII35FDECWUGV24T SnT.Lux", -"GBAR4OY6T6M4P344IF5II5DNWHVUJU7OLQPSMG2FWVJAFF642BX5E3GB telindus", -# non validating -"GCGB2S2KGYARPVIA37HYZXVRM2YZUEXA6S33ZU5BUDC6THSB62LZSTYH sdf_watcher1", -"GCM6QMP3DLRPTAZW2UZPCPX2LF3SXWXKPMP3GKFZBDSF3QZGV2G5QSTK sdf_watcher2", -"GABMKJM6I25XI4K7U6XWMULOUQIQ27BCTMLS6BYYSOWKTBUXVRJSXHYQ sdf_watcher3", -# seem down -"GB6REF5GOGGSEHZ3L2YK6K4T4KX3YDMWHDCPMV7MZJDLHBDNZXEPRBGM donovan", -"GBGR22MRCIVW2UZHFXMY5UIBJGPYABPQXQ5GGMNCSUM2KHE3N6CNH6G5 nelisky1", -"GA2DE5AQF32LU5OZ5OKAFGPA2DLW4H6JHPGYJUVTNS3W7N2YZCTQFFV6 nelisky2", -"GDJ73EX25GGUVMUBCK6DPSTJLYP3IC7I3H2URLXJQ5YP56BW756OUHIG w00kie", -"GAM7A32QZF5PJASRSGVFPAB36WWTHCBHO5CHG3WUFTUQPT7NZX3ONJU4 ptarasov" -] - -KNOWN_PEERS=[ -"core-live-a.stellar.org:11625", -"core-live-b.stellar.org:11625", -"core-live-c.stellar.org:11625", -"confucius.strllar.org", -"stellar1.bitventure.co", -"stellar.256kw.com"] - -UNSAFE_QUORUM=true - -[QUORUM_SET] -VALIDATORS=[ -"$sdf_watcher1","$sdf_watcher2","$sdf_watcher3" -] - -# Stellar.org history store -[HISTORY.sdf1] -get="curl -sf http://s3-eu-west-1.amazonaws.com/history.stellar.org/prd/core-live/core_live_001/{0} -o {1}" - -[HISTORY.sdf2] -get="curl -sf http://s3-eu-west-1.amazonaws.com/history.stellar.org/prd/core-live/core_live_002/{0} -o {1}" - -[HISTORY.sdf3] -get="curl -sf http://s3-eu-west-1.amazonaws.com/history.stellar.org/prd/core-live/core_live_003/{0} -o {1}" diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index 38fca67576..12d21b452d 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -898,7 +898,7 @@ func setCaptiveCoreConfiguration(config *Config, options ApplyOptions) error { } } else if !options.RequireCaptiveCoreFullConfig { // Creates a minimal captive-core config (without quorum information), just enough to run captive core. - // This is used by certain database commands, such as `reingest and fill-gaps, to reingest historical data. + // This is used by certain commands that don't depend on captive core 'run' which does live network peer connection. config.CaptiveCoreToml, err = ledgerbackend.NewCaptiveCoreToml(config.CaptiveCoreTomlParams) if err != nil { return errors.Wrap(err, "invalid captive core toml file") diff --git a/services/horizon/internal/ingest/build_state_test.go b/services/horizon/internal/ingest/build_state_test.go index d1409182d9..473da03776 100644 --- a/services/horizon/internal/ingest/build_state_test.go +++ b/services/horizon/internal/ingest/build_state_test.go @@ -22,7 +22,7 @@ type BuildStateTestSuite struct { ctx context.Context historyQ *mockDBQ historyAdapter *mockHistoryArchiveAdapter - ledgerBackend *ledgerbackend.MockDatabaseBackend + ledgerBackend *ledgerbackend.MockLedgerBackend system *system runner *mockProcessorsRunner stellarCoreClient *mockStellarCoreClient @@ -35,7 +35,7 @@ func (s *BuildStateTestSuite) SetupTest() { s.historyQ = &mockDBQ{} s.runner = &mockProcessorsRunner{} s.historyAdapter = &mockHistoryArchiveAdapter{} - s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{} + s.ledgerBackend = &ledgerbackend.MockLedgerBackend{} s.stellarCoreClient = &mockStellarCoreClient{} s.checkpointLedger = uint32(63) s.lastLedger = 0 @@ -87,7 +87,7 @@ func (s *BuildStateTestSuite) mockCommonHistoryQ() { func (s *BuildStateTestSuite) TestCheckPointLedgerIsZero() { // Recreate mock in this single test to remove assertions. *s.historyQ = mockDBQ{} - *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} + *s.ledgerBackend = ledgerbackend.MockLedgerBackend{} next, err := buildState{checkpointLedger: 0}.run(s.system) s.Assert().Error(err) @@ -98,7 +98,7 @@ func (s *BuildStateTestSuite) TestCheckPointLedgerIsZero() { func (s *BuildStateTestSuite) TestRangeNotPreparedFailPrepare() { // Recreate mock in this single test to remove assertions. *s.historyQ = mockDBQ{} - *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} + *s.ledgerBackend = ledgerbackend.MockLedgerBackend{} s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(63)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(63)).Return(errors.New("my error")).Once() @@ -113,7 +113,7 @@ func (s *BuildStateTestSuite) TestRangeNotPreparedFailPrepare() { func (s *BuildStateTestSuite) TestRangeNotPreparedSuccessPrepareGetLedgerFail() { // Recreate mock in this single test to remove assertions. *s.historyQ = mockDBQ{} - *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} + *s.ledgerBackend = ledgerbackend.MockLedgerBackend{} s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(63)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(63)).Return(nil).Once() @@ -218,7 +218,7 @@ func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionReturnsError() { func (s *BuildStateTestSuite) TestRunHistoryArchiveIngestionGenesisReturnsError() { // Recreate mock in this single test to remove assertions. - *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} + *s.ledgerBackend = ledgerbackend.MockLedgerBackend{} s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once() diff --git a/services/horizon/internal/ingest/db_integration_test.go b/services/horizon/internal/ingest/db_integration_test.go index 0ac6e9d796..d8fd19c763 100644 --- a/services/horizon/internal/ingest/db_integration_test.go +++ b/services/horizon/internal/ingest/db_integration_test.go @@ -82,7 +82,7 @@ type DBTestSuite struct { sampleFile string sequence uint32 checkpointHash xdr.Hash - ledgerBackend *ledgerbackend.MockDatabaseBackend + ledgerBackend *ledgerbackend.MockLedgerBackend historyAdapter *mockHistoryArchiveAdapter system *system tt *test.T @@ -98,7 +98,7 @@ func (s *DBTestSuite) SetupTest() { // and commit the new file to the git repo. s.sampleFile = filepath.Join("testdata", "sample-changes.xdr") s.checkpointHash = xdr.Hash{1, 2, 3} - s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{} + s.ledgerBackend = &ledgerbackend.MockLedgerBackend{} s.historyAdapter = &mockHistoryArchiveAdapter{} var err error diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index ce63151fb4..379a59b329 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -24,7 +24,7 @@ type IngestHistoryRangeStateTestSuite struct { ctx context.Context historyQ *mockDBQ historyAdapter *mockHistoryArchiveAdapter - ledgerBackend *ledgerbackend.MockDatabaseBackend + ledgerBackend *ledgerbackend.MockLedgerBackend runner *mockProcessorsRunner system *system } @@ -32,7 +32,7 @@ type IngestHistoryRangeStateTestSuite struct { func (s *IngestHistoryRangeStateTestSuite) SetupTest() { s.ctx = context.Background() s.historyQ = &mockDBQ{} - s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{} + s.ledgerBackend = &ledgerbackend.MockLedgerBackend{} s.historyAdapter = &mockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} s.system = &system{ @@ -57,7 +57,7 @@ func (s *IngestHistoryRangeStateTestSuite) TearDownTest() { } func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeInvalidRange() { - *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} + *s.ledgerBackend = ledgerbackend.MockLedgerBackend{} next, err := historyRangeState{fromLedger: 0, toLedger: 0}.run(s.system) s.Assert().Error(err) @@ -81,7 +81,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeInvalidRange() { } func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeInvalidMaxFlush() { - *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} + *s.ledgerBackend = ledgerbackend.MockLedgerBackend{} s.system.maxLedgerPerFlush = 0 next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) @@ -91,7 +91,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeInvalidMaxFlush() { } func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeFailPrepare() { - *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} + *s.ledgerBackend = ledgerbackend.MockLedgerBackend{} s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(100)).Return(errors.New("my error")).Once() diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index feb5e13bb0..0afc0cc527 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -22,7 +22,7 @@ func TestResumeTestTestSuite(t *testing.T) { type ResumeTestTestSuite struct { suite.Suite ctx context.Context - ledgerBackend *ledgerbackend.MockDatabaseBackend + ledgerBackend *ledgerbackend.MockLedgerBackend historyQ *mockDBQ historyAdapter *mockHistoryArchiveAdapter runner *mockProcessorsRunner @@ -32,7 +32,7 @@ type ResumeTestTestSuite struct { func (s *ResumeTestTestSuite) SetupTest() { s.ctx = context.Background() - s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{} + s.ledgerBackend = &ledgerbackend.MockLedgerBackend{} s.historyQ = &mockDBQ{} s.historyAdapter = &mockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} @@ -78,7 +78,7 @@ func (s *ResumeTestTestSuite) TearDownTest() { func (s *ResumeTestTestSuite) TestInvalidParam() { // Recreate mock in this single test to remove Rollback assertion. *s.historyQ = mockDBQ{} - *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} + *s.ledgerBackend = ledgerbackend.MockLedgerBackend{} next, err := resumeState{latestSuccessfullyProcessedLedger: 0}.run(s.system) s.Assert().Error(err) @@ -92,7 +92,7 @@ func (s *ResumeTestTestSuite) TestInvalidParam() { func (s *ResumeTestTestSuite) TestRangeNotPreparedFailPrepare() { // Recreate mock in this single test to remove Rollback assertion. *s.historyQ = mockDBQ{} - *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} + *s.ledgerBackend = ledgerbackend.MockLedgerBackend{} s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(101)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(101)).Return(errors.New("my error")).Once() @@ -109,7 +109,7 @@ func (s *ResumeTestTestSuite) TestRangeNotPreparedFailPrepare() { func (s *ResumeTestTestSuite) TestRangeNotPreparedSuccessPrepareGetLedgerFail() { // Recreate mock in this single test to remove Rollback assertion. *s.historyQ = mockDBQ{} - *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} + *s.ledgerBackend = ledgerbackend.MockLedgerBackend{} s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(101)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(101)).Return(nil).Once() @@ -286,7 +286,7 @@ func (s *ResumeTestTestSuite) mockSuccessfulIngestion() { s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once() } func (s *ResumeTestTestSuite) TestBumpIngestLedger() { - *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} + *s.ledgerBackend = ledgerbackend.MockLedgerBackend{} s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(100)).Return(nil).Once() diff --git a/services/horizon/internal/ingest/verify_range_state_test.go b/services/horizon/internal/ingest/verify_range_state_test.go index a1df30d854..45484bb0cb 100644 --- a/services/horizon/internal/ingest/verify_range_state_test.go +++ b/services/horizon/internal/ingest/verify_range_state_test.go @@ -32,7 +32,7 @@ func TestVerifyRangeStateTestSuite(t *testing.T) { type VerifyRangeStateTestSuite struct { suite.Suite ctx context.Context - ledgerBackend *ledgerbackend.MockDatabaseBackend + ledgerBackend *ledgerbackend.MockLedgerBackend historyQ *mockDBQ historyAdapter *mockHistoryArchiveAdapter runner *mockProcessorsRunner @@ -41,7 +41,7 @@ type VerifyRangeStateTestSuite struct { func (s *VerifyRangeStateTestSuite) SetupTest() { s.ctx = context.Background() - s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{} + s.ledgerBackend = &ledgerbackend.MockLedgerBackend{} s.historyQ = &mockDBQ{} s.historyAdapter = &mockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} diff --git a/services/horizon/internal/integration/db_test.go b/services/horizon/internal/integration/db_test.go index 5a2b03e48b..f77c776363 100644 --- a/services/horizon/internal/integration/db_test.go +++ b/services/horizon/internal/integration/db_test.go @@ -537,11 +537,8 @@ func TestReingestDB(t *testing.T) { // subprocesses to conflict. itest.StopHorizon() - horizonConfig.CaptiveCoreConfigPath = filepath.Join( - filepath.Dir(horizonConfig.CaptiveCoreConfigPath), - "captive-core-reingest-range-integration-tests.cfg", - ) - + var disableCaptiveHTTPPort = uint(0) + horizonConfig.CaptiveCoreTomlParams.HTTPPort = &disableCaptiveHTTPPort var rootCmd = horizoncmd.NewRootCmd() rootCmd.SetArgs(command(t, horizonConfig, "db", "reingest", @@ -740,7 +737,11 @@ func TestReingestDBWithFilterRules(t *testing.T) { // repopulate the db with reingestion which should catchup using core reapply filter rules // correctly on reingestion ranged rootCmd = horizoncmd.NewRootCmd() - rootCmd.SetArgs(command(t, itest.GetHorizonIngestConfig(), "db", + + horizonConfig := itest.GetHorizonIngestConfig() + var disableCaptiveHTTPPort = uint(0) + horizonConfig.CaptiveCoreTomlParams.HTTPPort = &disableCaptiveHTTPPort + rootCmd.SetArgs(command(t, horizonConfig, "db", "reingest", "range", "1", @@ -787,6 +788,12 @@ func TestReingestDBWithFilterRules(t *testing.T) { } func command(t *testing.T, horizonConfig horizon.Config, args ...string) []string { + + coreHttpPort := "0" + if horizonConfig.CaptiveCoreTomlParams.HTTPPort != nil { + coreHttpPort = strconv.FormatUint(uint64(*horizonConfig.CaptiveCoreTomlParams.HTTPPort), 10) + } + return append([]string{ "--stellar-core-url", horizonConfig.StellarCoreURL, @@ -805,6 +812,8 @@ func command(t *testing.T, horizonConfig horizon.Config, args ...string) []strin // due to ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING "--checkpoint-frequency", "8", + "--captive-core-http-port", + coreHttpPort, // Create the storage directory outside of the source repo, // otherwise it will break Golang test caching. "--captive-core-storage-path=" + t.TempDir(), @@ -919,17 +928,14 @@ func TestFillGaps(t *testing.T) { // subprocesses to conflict. itest.StopHorizon() + var disableCaptiveHTTPPort = uint(0) + horizonConfig.CaptiveCoreTomlParams.HTTPPort = &disableCaptiveHTTPPort var oldestLedger, latestLedger int64 tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger)) tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger)) _, err = historyQ.DeleteRangeAll(context.Background(), oldestLedger, latestLedger) tt.NoError(err) - horizonConfig.CaptiveCoreConfigPath = filepath.Join( - filepath.Dir(horizonConfig.CaptiveCoreConfigPath), - "captive-core-reingest-range-integration-tests.cfg", - ) - rootCmd := horizoncmd.NewRootCmd() rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "--parallel-workers=1")) tt.NoError(rootCmd.Execute()) diff --git a/services/horizon/internal/integration/txsub_test.go b/services/horizon/internal/integration/txsub_test.go index e253522893..731795377e 100644 --- a/services/horizon/internal/integration/txsub_test.go +++ b/services/horizon/internal/integration/txsub_test.go @@ -6,7 +6,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/stellar/go/clients/horizonclient" "github.com/stellar/go/services/horizon/internal/test/integration" "github.com/stellar/go/txnbuild" ) @@ -55,39 +54,38 @@ func TestTxSub(t *testing.T) { } func TestTxSubLimitsBodySize(t *testing.T) { - if integration.GetCoreMaxSupportedProtocol() < 20 { - t.Skip("This test run does not support less than Protocol 20") - } - + // the base64 tx blob posted for tx with just 'op1' is 289, with both ops, 365 + // setup the test so that it given one active size threshold configured, + // it passes with just 'op1' and will fail with two ops that surpass size threshold. itest := integration.NewTest(t, integration.Config{ - EnableSorobanRPC: true, HorizonEnvironment: map[string]string{ - "MAX_HTTP_REQUEST_SIZE": "1800", + "MAX_HTTP_REQUEST_SIZE": "300", + "LOG_LEVEL": "Debug", }, }) - // establish which account will be contract owner, and load it's current seq - sourceAccount, err := itest.Client().AccountDetail(horizonclient.AccountRequest{ - AccountID: itest.Master().Address(), - }) - require.NoError(t, err) + master := itest.Master() + op1 := txnbuild.Payment{ + Destination: master.Address(), + Amount: "10", + Asset: txnbuild.NativeAsset{}, + } + + op2 := txnbuild.Payment{ + Destination: master.Address(), + Amount: "10", + Asset: txnbuild.NativeAsset{}, + } + + _, err := itest.SubmitOperations(itest.MasterAccount(), master, &op1, &op2) - installContractOp := assembleInstallContractCodeOp(t, itest.Master().Address(), "soroban_sac_test.wasm") - preFlightOp, minFee := itest.PreflightHostFunctions(&sourceAccount, *installContractOp) - _, err = itest.SubmitOperationsWithFee(&sourceAccount, itest.Master(), minFee+txnbuild.MinBaseFee, &preFlightOp) assert.EqualError( t, err, "horizon error: \"Transaction Malformed\" - check horizon.Error.Problem for more information", ) - sourceAccount, err = itest.Client().AccountDetail(horizonclient.AccountRequest{ - AccountID: itest.Master().Address(), - }) - require.NoError(t, err) - - installContractOp = assembleInstallContractCodeOp(t, itest.Master().Address(), "soroban_add_u64.wasm") - preFlightOp, minFee = itest.PreflightHostFunctions(&sourceAccount, *installContractOp) - tx, err := itest.SubmitOperationsWithFee(&sourceAccount, itest.Master(), minFee+txnbuild.MinBaseFee, &preFlightOp) + // assert that the single op payload is under the limit and still works. + tx, err := itest.SubmitOperations(itest.MasterAccount(), master, &op1) require.NoError(t, err) require.True(t, tx.Successful) } diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index 107cb33759..65c45a3068 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -553,6 +553,7 @@ const coreStartupPingInterval = time.Second // Wait for core to be up and manually close the first ledger func (i *Test) waitForCore() { + integrationYaml := filepath.Join(i.composePath, "docker-compose.integration-tests.yml") i.t.Log("Waiting for core to be up...") startTime := time.Now() for time.Since(startTime) < maxWaitForCoreStartup { @@ -566,6 +567,13 @@ func (i *Test) waitForCore() { if durationSince := time.Since(infoTime); durationSince < coreStartupPingInterval { time.Sleep(coreStartupPingInterval - durationSince) } + cmdline := []string{"-f", integrationYaml, "logs", "core"} + cmdline = append([]string{"compose"}, cmdline...) + cmd := exec.Command("docker", cmdline...) + out, _ := cmd.Output() + if len(out) > 0 { + fmt.Printf("core container logs:\n%s\n", string(out)) + } continue } break