Skip to content

Commit

Permalink
services/horizon/internal/db2/history: Optimize query for reaping loo…
Browse files Browse the repository at this point in the history
…kup tables (#5393)
  • Loading branch information
tamirms authored Jul 29, 2024
1 parent 31fc8f4 commit ecd28b6
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 193 deletions.
45 changes: 45 additions & 0 deletions services/horizon/internal/db2/history/key_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package history
import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/support/errors"
)

Expand All @@ -18,6 +21,7 @@ const (
stateInvalid = "exp_state_invalid"
offerCompactionSequence = "offer_compaction_sequence"
liquidityPoolCompactionSequence = "liquidity_pool_compaction_sequence"
lookupTableReapOffsetSuffix = "_reap_offset"
)

// GetLastLedgerIngestNonBlocking works like GetLastLedgerIngest but
Expand Down Expand Up @@ -203,6 +207,47 @@ func (q *Q) getValueFromStore(ctx context.Context, key string, forUpdate bool) (
return value, nil
}

type KeyValuePair struct {
Key string `db:"key"`
Value string `db:"value"`
}

func (q *Q) getLookupTableReapOffsets(ctx context.Context) (map[string]int64, error) {
keys := make([]string, 0, len(historyLookupTables))
for table := range historyLookupTables {
keys = append(keys, table+lookupTableReapOffsetSuffix)
}
offsets := map[string]int64{}
var pairs []KeyValuePair
query := sq.Select("key", "value").
From("key_value_store").
Where(map[string]interface{}{
"key": keys,
})
err := q.Select(ctx, &pairs, query)
if err != nil {
return nil, err
}
for _, pair := range pairs {
table := strings.TrimSuffix(pair.Key, lookupTableReapOffsetSuffix)
if _, ok := historyLookupTables[table]; !ok {
return nil, fmt.Errorf("invalid key: %s", pair.Key)
}

var offset int64
offset, err = strconv.ParseInt(pair.Value, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid offset: %s", pair.Value)
}
offsets[table] = offset
}
return offsets, err
}

func (q *Q) updateLookupTableReapOffset(ctx context.Context, table string, offset int64) error {
return q.updateValueInStore(ctx, table+lookupTableReapOffsetSuffix, strconv.FormatInt(offset, 10))
}

// updateValueInStore updates a value for a given key in KV store
func (q *Q) updateValueInStore(ctx context.Context, key, value string) error {
query := sq.Insert("key_value_store").
Expand Down
225 changes: 108 additions & 117 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ type IngestionQ interface {
NewTradeBatchInsertBuilder() TradeBatchInsertBuilder
RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error
RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error
ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]LookupTableReapResult, error)
ReapLookupTables(ctx context.Context, batchSize int) (map[string]LookupTableReapResult, error)
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
Expand Down Expand Up @@ -981,88 +981,27 @@ type LookupTableReapResult struct {
// which aren't used (orphaned), i.e. history entries for them were reaped.
// This method must be executed inside ingestion transaction. Otherwise it may
// create invalid state in lookup and history tables.
func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
func (q *Q) ReapLookupTables(ctx context.Context, batchSize int) (
map[string]LookupTableReapResult,
error,
) {
if q.GetTx() == nil {
return nil, errors.New("cannot be called outside of an ingestion transaction")
}

const batchSize = 1000
offsets, err := q.getLookupTableReapOffsets(ctx)
if err != nil {
return nil, fmt.Errorf("could not obtain offsets: %w", err)
}

results := map[string]LookupTableReapResult{}
for table, historyTables := range map[string][]tableObjectFieldPair{
"history_accounts": {
{
name: "history_effects",
objectField: "history_account_id",
},
{
name: "history_operation_participants",
objectField: "history_account_id",
},
{
name: "history_trades",
objectField: "base_account_id",
},
{
name: "history_trades",
objectField: "counter_account_id",
},
{
name: "history_transaction_participants",
objectField: "history_account_id",
},
},
"history_assets": {
{
name: "history_trades",
objectField: "base_asset_id",
},
{
name: "history_trades",
objectField: "counter_asset_id",
},
{
name: "history_trades_60000",
objectField: "base_asset_id",
},
{
name: "history_trades_60000",
objectField: "counter_asset_id",
},
},
"history_claimable_balances": {
{
name: "history_operation_claimable_balances",
objectField: "history_claimable_balance_id",
},
{
name: "history_transaction_claimable_balances",
objectField: "history_claimable_balance_id",
},
},
"history_liquidity_pools": {
{
name: "history_operation_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
{
name: "history_transaction_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
},
} {
for table, historyTables := range historyLookupTables {
startTime := time.Now()
query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])
if err != nil {
return nil, errors.Wrap(err, "error constructing a query")
}
query := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])

// Find new offset before removing the rows
var newOffset int64
err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize))
err := q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize))
if err != nil {
if q.NoRows(err) {
newOffset = 0
Expand All @@ -1079,6 +1018,10 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
return nil, errors.Wrapf(err, "error running query: %s", query)
}

if err = q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil {
return nil, fmt.Errorf("error updating offset: %w", err)
}

rows, err := res.RowsAffected()
if err != nil {
return nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query)
Expand All @@ -1093,22 +1036,86 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
return results, nil
}

var historyLookupTables = map[string][]tableObjectFieldPair{
"history_accounts": {
{
name: "history_transaction_participants",
objectField: "history_account_id",
},

{
name: "history_effects",
objectField: "history_account_id",
},
{
name: "history_operation_participants",
objectField: "history_account_id",
},
{
name: "history_trades",
objectField: "base_account_id",
},
{
name: "history_trades",
objectField: "counter_account_id",
},
},
"history_assets": {
{
name: "history_trades",
objectField: "base_asset_id",
},
{
name: "history_trades",
objectField: "counter_asset_id",
},
{
name: "history_trades_60000",
objectField: "base_asset_id",
},
{
name: "history_trades_60000",
objectField: "counter_asset_id",
},
},
"history_claimable_balances": {
{
name: "history_transaction_claimable_balances",
objectField: "history_claimable_balance_id",
},
{
name: "history_operation_claimable_balances",
objectField: "history_claimable_balance_id",
},
},
"history_liquidity_pools": {
{
name: "history_transaction_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
{
name: "history_operation_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
},
}

// constructReapLookupTablesQuery creates a query like (using history_claimable_balances
// as an example):
//
// delete from history_claimable_balances where id in
// delete from history_claimable_balances where id in (
//
// (select id from
// (select id,
// (select 1 from history_operation_claimable_balances
// where history_claimable_balance_id = hcb.id limit 1) as c1,
// (select 1 from history_transaction_claimable_balances
// where history_claimable_balance_id = hcb.id limit 1) as c2,
// 1 as cx,
// from history_claimable_balances hcb where id > 1000 order by id limit 100)
// as sub where c1 IS NULL and c2 IS NULL and 1=1);
// WITH ha_batch AS (
// SELECT id
// FROM history_claimable_balances
// WHERE id >= 1000
// ORDER BY id limit 1000
// ) SELECT e1.id as id FROM ha_batch e1
// WHERE NOT EXISTS (SELECT 1 FROM history_transaction_claimable_balances WHERE history_transaction_claimable_balances.history_claimable_balance_id = id limit 1)
// AND NOT EXISTS (SELECT 1 FROM history_operation_claimable_balances WHERE history_operation_claimable_balances.history_claimable_balance_id = id limit 1)
// )
//
// In short it checks the 100 rows omitting 1000 row of history_claimable_balances
// In short it checks the 1000 rows omitting 1000 row of history_claimable_balances
// and counts occurrences of each row in corresponding history tables.
// If there are no history rows for a given id, the row in
// history_claimable_balances is removed.
Expand All @@ -1118,45 +1125,29 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
// possible that rows will be skipped from deletion. But offset is reset
// when it reaches the table size so eventually all orphaned rows are
// deleted.
func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize, offset int64) (string, error) {
var sb strings.Builder
var err error
_, err = fmt.Fprintf(&sb, "delete from %s where id IN (select id from (select id, ", table)
if err != nil {
return "", err
}

for i, historyTable := range historyTables {
_, err = fmt.Fprintf(
&sb,
`(select 1 from %s where %s = hcb.id limit 1) as c%d, `,
historyTable.name,
historyTable.objectField,
i,
func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize int, offset int64) string {
var conditions []string

for _, historyTable := range historyTables {
conditions = append(
conditions,
fmt.Sprintf(
"NOT EXISTS ( SELECT 1 as row FROM %s WHERE %s.%s = id LIMIT 1)",
historyTable.name,
historyTable.name, historyTable.objectField,
),
)
if err != nil {
return "", err
}
}

_, err = fmt.Fprintf(&sb, "1 as cx from %s hcb where id >= %d order by id limit %d) as sub where ", table, offset, batchSize)
if err != nil {
return "", err
}

for i := range historyTables {
_, err = fmt.Fprintf(&sb, "c%d IS NULL and ", i)
if err != nil {
return "", err
}
}

_, err = sb.WriteString("1=1);")
if err != nil {
return "", err
}

return sb.String(), nil
return fmt.Sprintf(
"DELETE FROM %s WHERE id IN ("+
"WITH ha_batch AS (SELECT id FROM %s WHERE id >= %d ORDER BY id limit %d) "+
"SELECT e1.id as id FROM ha_batch e1 WHERE ",
table,
table,
offset,
batchSize,
) + strings.Join(conditions, " AND ") + ")"
}

// DeleteRangeAll deletes a range of rows from all history tables between
Expand Down
Loading

0 comments on commit ecd28b6

Please sign in to comment.