Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest batches of ledgers in-memory before flushing to DB #5099

Closed
2 of 4 tasks
tamirms opened this issue Nov 1, 2023 · 1 comment · Fixed by #5117
Closed
2 of 4 tasks

Ingest batches of ledgers in-memory before flushing to DB #5099

tamirms opened this issue Nov 1, 2023 · 1 comment · Fixed by #5117
Assignees
Labels
horizon performance issues aimed at improving performance

Comments

@tamirms
Copy link
Contributor

tamirms commented Nov 1, 2023

In #4909 we have updated all the transaction processors to use the FastBatchInsertBuilder to insert rows into the history tables with the postgres COPY command. We also refactored the transaction processor interface to allow a single processor to accumulate records across multiple ledgers:

type horizonTransactionProcessor interface {
	ProcessTransaction(xdr.LedgerCloseMeta, ingest.LedgerTransaction) error
	Flush(ctx context.Context, session db.SessionInterface) error
}

Now we can build on this work to improve the performance of reingestion. #4909 improved the performance of ingesting a single ledger. But we can extract more performance gains by ingesting multiple ledgers within a single processor lifetime.

In the spike branch the dataflow for ingesting batches of ledgers within a single processor lifetime is:

	accountLoader := history.NewAccountLoader()
	cbLoader := history.NewClaimableBalanceLoader()
	lpLoader := history.NewLiquidityPoolLoader()
	assetLoader := history.NewAssetLoader()
	processors := buildTransactionProcessors(
		s.historyQ,
		accountLoader,
		cbLoader,
		lpLoader,
		assetLoader,
	)

       // apply all the ledgers in the batch on the processors
       for _, ledger := range ledgers {
		if err = s.runner.ApplyProcessorsOnLedger(processors, ledgerCloseMeta); err != nil {
			return err
		}
       }

       // use the loaders to lookup all the accounts, assets, claimable balances, and liquidity pools registered
       // by the processors
       err = func() error {
		if err := s.historyQ.Begin(); err != nil {
			return errors.Wrap(err, "Error starting a transaction")
		}
		defer s.historyQ.Rollback()

		if err := accountLoader.Exec(s.ctx, s.historyQ); err != nil {
			return err
		}
		if err := cbLoader.Exec(s.ctx, s.historyQ); err != nil {
			return err
		}
		if err := lpLoader.Exec(s.ctx, s.historyQ); err != nil {
			return err
		}
		if err := assetLoader.Exec(s.ctx, s.historyQ); err != nil {
			return err
		}
		if err := s.historyQ.Commit(); err != nil {
			return errors.Wrap(err, commitErrMsg)
		}
		return nil
	}()

        // flush the rows to the db, the processors will be able to obtain the integer ids from the loaders
	if err := s.historyQ.Begin(); err != nil {
		return errors.Wrap(err, "Error starting a transaction")
	}
	defer s.historyQ.Rollback()
	if err := processors.Commit(s.ctx, s.historyQ); err != nil {
		return err
	}
	if err := s.historyQ.Commit(); err != nil {
		return errors.Wrap(err, commitErrMsg)
	}

We will need to implement this new dataflow on the following states in the ingestion state machine:

Note the resume state only ingests a single ledger so it is already covered by #4909 .

Also, the following bugs will need to be addressed when implementing this issue:

@tamirms tamirms added horizon performance issues aimed at improving performance labels Nov 1, 2023
@mollykarcher mollykarcher moved this from Backlog to Next Sprint Proposal in Platform Scrum Nov 1, 2023
@sreuland sreuland self-assigned this Nov 2, 2023
@sreuland sreuland moved this from Next Sprint Proposal to In Progress in Platform Scrum Nov 2, 2023
@sreuland
Copy link
Contributor

sreuland commented Nov 9, 2023

reduced scope slightly, removed - verifyRangeState from list of states that need ranged ledger enablement, due to that state invoked change processors also, which we want to retain those processors to having a single ledger scope, we want to limit ledger ranged scope to only the tx processors.

sreuland added a commit to sreuland/go that referenced this issue Nov 15, 2023
sreuland added a commit to sreuland/go that referenced this issue Nov 17, 2023
sreuland added a commit to sreuland/go that referenced this issue Nov 17, 2023
@sreuland sreuland moved this from In Progress to Needs Review in Platform Scrum Nov 17, 2023
sreuland added a commit to sreuland/go that referenced this issue Nov 27, 2023
sreuland added a commit to sreuland/go that referenced this issue Nov 27, 2023
sreuland added a commit to sreuland/go that referenced this issue Nov 28, 2023
sreuland added a commit to sreuland/go that referenced this issue Nov 28, 2023
…st max flush size if lower than default of 100
sreuland added a commit that referenced this issue Nov 28, 2023
… send batches of ledgers to tx processors (#5117)

closes #5099: Ingest batches of ledgers in-memory before flushing to DB
@github-project-automation github-project-automation bot moved this from Needs Review to Done in Platform Scrum Nov 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
horizon performance issues aimed at improving performance
Projects
Status: Done
2 participants