Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/ingest: historyRange and reingestHistoryRange states send batches of ledgers to tx processors #5117

Merged
merged 12 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,11 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
return errors.New("--force is incompatible with --parallel-workers > 1")
}

maxLedgersPerFlush := ingest.MaxLedgersPerFlush
if parallelJobSize < maxLedgersPerFlush {
maxLedgersPerFlush = parallelJobSize
}

ingestConfig := ingest.Config{
NetworkPassphrase: config.NetworkPassphrase,
HistoryArchiveURLs: config.HistoryArchiveURLs,
Expand All @@ -415,6 +420,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
StellarCoreURL: config.StellarCoreURL,
RoundingSlippageFilter: config.RoundingSlippageFilter,
EnableIngestionFiltering: config.EnableIngestionFiltering,
MaxLedgerPerFlush: maxLedgersPerFlush,
}

if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil {
Expand Down
283 changes: 0 additions & 283 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/errors"
logpkg "github.com/stellar/go/support/log"
"github.com/stellar/go/toid"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -585,288 +584,6 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string]
}
}

type historyRangeState struct {
fromLedger uint32
toLedger uint32
}

func (h historyRangeState) String() string {
return fmt.Sprintf(
"historyRange(fromLedger=%d, toLedger=%d)",
h.fromLedger,
h.toLedger,
)
}

func (historyRangeState) GetState() State {
return HistoryRange
}

// historyRangeState is used when catching up history data
func (h historyRangeState) run(s *system) (transition, error) {
if h.fromLedger == 0 || h.toLedger == 0 ||
h.fromLedger > h.toLedger {
return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger)
}

err := s.maybePrepareRange(s.ctx, h.fromLedger)
if err != nil {
return start(), err
}

if err = s.historyQ.Begin(s.ctx); err != nil {
return start(), errors.Wrap(err, "Error starting a transaction")
}
defer s.historyQ.Rollback()

// acquire distributed lock so no one else can perform ingestion operations.
if _, err = s.historyQ.GetLastLedgerIngest(s.ctx); err != nil {
return start(), errors.Wrap(err, getLastIngestedErrMsg)
}

lastHistoryLedger, err := s.historyQ.GetLatestHistoryLedger(s.ctx)
if err != nil {
return start(), errors.Wrap(err, "could not get latest history ledger")
}

// We should be ingesting the ledger which occurs after
// lastHistoryLedger. Otherwise, some other horizon node has
// already completed the ingest history range operation and
// we should go back to the init state
if lastHistoryLedger != h.fromLedger-1 {
return start(), nil
}

for cur := h.fromLedger; cur <= h.toLedger; cur++ {
var ledgerCloseMeta xdr.LedgerCloseMeta

log.WithField("sequence", cur).Info("Waiting for ledger to be available in the backend...")
startTime := time.Now()

ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur)
if err != nil {
// Commit finished work in case of ledger backend error.
commitErr := s.historyQ.Commit()
if commitErr != nil {
log.WithError(commitErr).Error("Error committing partial range results")
} else {
log.Info("Committed partial range results")
}
return start(), errors.Wrap(err, "error getting ledger")
}

log.WithFields(logpkg.F{
"sequence": cur,
"duration": time.Since(startTime).Seconds(),
}).Info("Ledger returned from the backend")

if err = runTransactionProcessorsOnLedger(s, ledgerCloseMeta); err != nil {
return start(), err
}
}

if err = s.historyQ.Commit(); err != nil {
return start(), errors.Wrap(err, commitErrMsg)
}

return start(), nil
}

func runTransactionProcessorsOnLedger(s *system, ledger xdr.LedgerCloseMeta) error {
log.WithFields(logpkg.F{
"sequence": ledger.LedgerSequence(),
"state": false,
"ledger": true,
"commit": false,
}).Info("Processing ledger")
startTime := time.Now()

ledgerTransactionStats, _, tradeStats, err := s.runner.RunTransactionProcessorsOnLedger(ledger)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("error processing ledger sequence=%d", ledger.LedgerSequence()))
}

log.
WithFields(ledgerTransactionStats.Map()).
WithFields(tradeStats.Map()).
WithFields(logpkg.F{
"sequence": ledger.LedgerSequence(),
"duration": time.Since(startTime).Seconds(),
"state": false,
"ledger": true,
"commit": false,
}).
Info("Processed ledger")
return nil
}

type reingestHistoryRangeState struct {
fromLedger uint32
toLedger uint32
force bool
}

func (h reingestHistoryRangeState) String() string {
return fmt.Sprintf(
"reingestHistoryRange(fromLedger=%d, toLedger=%d, force=%t)",
h.fromLedger,
h.toLedger,
h.force,
)
}

func (reingestHistoryRangeState) GetState() State {
return ReingestHistoryRange
}

func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error {
if s.historyQ.GetTx() == nil {
return errors.New("expected transaction to be present")
}

// Clear history data before ingesting - used in `reingest range` command.
start, end, err := toid.LedgerRangeInclusive(
int32(fromLedger),
int32(toLedger),
)
if err != nil {
return errors.Wrap(err, "Invalid range")
}

err = s.historyQ.DeleteRangeAll(s.ctx, start, end)
if err != nil {
return errors.Wrap(err, "error in DeleteRangeAll")
}

for cur := fromLedger; cur <= toLedger; cur++ {
var ledgerCloseMeta xdr.LedgerCloseMeta
ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur)
if err != nil {
return errors.Wrap(err, "error getting ledger")
}

if err = runTransactionProcessorsOnLedger(s, ledgerCloseMeta); err != nil {
return err
}
}

return nil
}

func (h reingestHistoryRangeState) prepareRange(s *system) (transition, error) {
log.WithFields(logpkg.F{
"from": h.fromLedger,
"to": h.toLedger,
}).Info("Preparing ledger backend to retrieve range")
startTime := time.Now()

err := s.ledgerBackend.PrepareRange(s.ctx, ledgerbackend.BoundedRange(h.fromLedger, h.toLedger))
if err != nil {
return stop(), errors.Wrap(err, "error preparing range")
}

log.WithFields(logpkg.F{
"from": h.fromLedger,
"to": h.toLedger,
"duration": time.Since(startTime).Seconds(),
}).Info("Range ready")

return transition{}, nil
}

// reingestHistoryRangeState is used as a command to reingest historical data
func (h reingestHistoryRangeState) run(s *system) (transition, error) {
if h.fromLedger == 0 || h.toLedger == 0 ||
h.fromLedger > h.toLedger {
return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger)
}

if h.fromLedger == 1 {
log.Warn("Ledger 1 is pregenerated and not available, starting from ledger 2.")
h.fromLedger = 2
}

var startTime time.Time

if h.force {
if t, err := h.prepareRange(s); err != nil {
return t, err
}
startTime = time.Now()

if err := s.historyQ.Begin(s.ctx); err != nil {
return stop(), errors.Wrap(err, "Error starting a transaction")
}
defer s.historyQ.Rollback()

// acquire distributed lock so no one else can perform ingestion operations.
if _, err := s.historyQ.GetLastLedgerIngest(s.ctx); err != nil {
return stop(), errors.Wrap(err, getLastIngestedErrMsg)
}

if err := h.ingestRange(s, h.fromLedger, h.toLedger); err != nil {
return stop(), err
}

if err := s.historyQ.Commit(); err != nil {
return stop(), errors.Wrap(err, commitErrMsg)
}
} else {
lastIngestedLedger, err := s.historyQ.GetLastLedgerIngestNonBlocking(s.ctx)
if err != nil {
return stop(), errors.Wrap(err, getLastIngestedErrMsg)
}

if lastIngestedLedger > 0 && h.toLedger >= lastIngestedLedger {
return stop(), ErrReingestRangeConflict{lastIngestedLedger}
}

// Only prepare the range after checking the bounds to enable an early error return
var t transition
if t, err = h.prepareRange(s); err != nil {
return t, err
}
startTime = time.Now()

for cur := h.fromLedger; cur <= h.toLedger; cur++ {
err = func(ledger uint32) error {
if e := s.historyQ.Begin(s.ctx); e != nil {
return errors.Wrap(e, "Error starting a transaction")
}
defer s.historyQ.Rollback()

// ingest each ledger in a separate transaction to prevent deadlocks
// when acquiring ShareLocks from multiple parallel reingest range processes
if e := h.ingestRange(s, ledger, ledger); e != nil {
return e
}

if e := s.historyQ.Commit(); e != nil {
return errors.Wrap(e, commitErrMsg)
}

return nil
}(cur)
if err != nil {
return stop(), err
}
}
}

err := s.historyQ.RebuildTradeAggregationBuckets(s.ctx, h.fromLedger, h.toLedger, s.config.RoundingSlippageFilter)
if err != nil {
return stop(), errors.Wrap(err, "Error rebuilding trade aggregations")
}

log.WithFields(logpkg.F{
"from": h.fromLedger,
"to": h.toLedger,
"duration": time.Since(startTime).Seconds(),
}).Info("Reingestion done")

return stop(), nil
}

type waitForCheckpointState struct{}

func (waitForCheckpointState) String() string {
Expand Down
Loading
Loading