Skip to content

Commit

Permalink
Insert and query rows from history lookup tables with one query
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Aug 7, 2024
1 parent 9b925b1 commit 1aa2c4a
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 197 deletions.
78 changes: 25 additions & 53 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/ordered"
)

// FutureAccountID represents a future history account.
Expand All @@ -24,8 +23,6 @@ type FutureAccountID struct {
loader *AccountLoader
}

const loaderLookupBatchSize = 50000

// Value implements the database/sql/driver Valuer interface.
func (a FutureAccountID) Value() (driver.Value, error) {
return a.loader.GetNow(a.address)
Expand Down Expand Up @@ -85,28 +82,10 @@ func (a *AccountLoader) GetNow(address string) (int64, error) {
}
}

func (a *AccountLoader) lookupKeys(ctx context.Context, q *Q, addresses []string) error {
for i := 0; i < len(addresses); i += loaderLookupBatchSize {
end := ordered.Min(len(addresses), i+loaderLookupBatchSize)

var accounts []Account
if err := q.AccountsByAddresses(ctx, &accounts, addresses[i:end]); err != nil {
return errors.Wrap(err, "could not select accounts")
}

for _, account := range accounts {
a.ids[account.Address] = account.ID
}
}
return nil
}

// LoaderStats describes the result of executing a history lookup id loader
type LoaderStats struct {
// Total is the number of elements registered to the loader
Total int
// Inserted is the number of elements inserted into the lookup table
Inserted int
}

// Exec will look up all the history account ids for the addresses registered in the loader.
Expand All @@ -122,28 +101,11 @@ func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) e
for address := range a.set {
addresses = append(addresses, address)
}

if err := a.lookupKeys(ctx, q, addresses); err != nil {
return err
}
a.stats.Total += len(addresses)

insert := 0
for _, address := range addresses {
if _, ok := a.ids[address]; ok {
continue
}
addresses[insert] = address
insert++
}
if insert == 0 {
return nil
}
addresses = addresses[:insert]
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Strings(addresses)

var accounts []Account
err := bulkInsert(
ctx,
q,
Expand All @@ -156,13 +118,16 @@ func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) e
objects: addresses,
},
},
&accounts,
)
if err != nil {
return err
}
a.stats.Inserted += insert

return a.lookupKeys(ctx, q, addresses)
for _, account := range accounts {
a.ids[account.Address] = account.ID
}
a.stats.Total += len(accounts)
return nil
}

// Stats returns the number of addresses registered in the loader and the number of addresses
Expand All @@ -181,7 +146,7 @@ type bulkInsertField struct {
objects []string
}

func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string, fields []bulkInsertField) error {
func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string, fields []bulkInsertField, response interface{}) error {
unnestPart := make([]string, 0, len(fields))
insertFieldsPart := make([]string, 0, len(fields))
pqArrays := make([]interface{}, 0, len(fields))
Expand All @@ -202,19 +167,26 @@ func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string
}

sql := `
WITH r AS
(SELECT ` + strings.Join(unnestPart, ",") + `)
INSERT INTO ` + table + `
(` + strings.Join(insertFieldsPart, ",") + `)
SELECT * from r
ON CONFLICT (` + strings.Join(conflictFields, ",") + `) DO NOTHING`

_, err := q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType),
WITH rows AS
(SELECT ` + strings.Join(unnestPart, ",") + `),
inserted_rows AS (
INSERT INTO ` + table + `
(` + strings.Join(insertFieldsPart, ",") + `)
SELECT * FROM rows
ON CONFLICT (` + strings.Join(conflictFields, ",") + `) DO NOTHING
RETURNING *
)
SELECT * FROM inserted_rows
UNION ALL
SELECT * FROM ` + table + ` WHERE (` + strings.Join(conflictFields, ",") + `) IN
(SELECT * FROM rows)`

return q.SelectRaw(

Check failure on line 184 in services/horizon/internal/db2/history/account_loader.go

View workflow job for this annotation

GitHub Actions / golangci

q.SelectRaw undefined (type *Q has no field or method SelectRaw) (typecheck)
ctx,
response,
sql,
pqArrays...,
)
return err
}

// AccountLoaderStub is a stub wrapper around AccountLoader which allows
Expand Down
3 changes: 1 addition & 2 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ func TestAccountLoader(t *testing.T) {
err := loader.Exec(context.Background(), session)
assert.NoError(t, err)
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 100,
Total: 100,
}, loader.Stats())
assert.Panics(t, func() {
loader.GetFuture(keypair.MustRandom().Address())
Expand Down
67 changes: 14 additions & 53 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/ordered"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -101,37 +99,6 @@ func (a *AssetLoader) GetNow(asset AssetKey) (int64, error) {
}
}

func (a *AssetLoader) lookupKeys(ctx context.Context, q *Q, keys []AssetKey) error {
var rows []Asset
for i := 0; i < len(keys); i += loaderLookupBatchSize {
end := ordered.Min(len(keys), i+loaderLookupBatchSize)
subset := keys[i:end]
args := make([]interface{}, 0, 3*len(subset))
placeHolders := make([]string, 0, len(subset))
for _, key := range subset {
args = append(args, key.Code, key.Type, key.Issuer)
placeHolders = append(placeHolders, "(?, ?, ?)")
}
rawSQL := fmt.Sprintf(
"SELECT * FROM history_assets WHERE (asset_code, asset_type, asset_issuer) in (%s)",
strings.Join(placeHolders, ", "),
)
err := q.SelectRaw(ctx, &rows, rawSQL, args...)
if err != nil {
return errors.Wrap(err, "could not select assets")
}

for _, row := range rows {
a.ids[AssetKey{
Type: row.Type,
Code: row.Code,
Issuer: row.Issuer,
}] = row.ID
}
}
return nil
}

// Exec will look up all the history asset ids for the assets registered in the loader.
// If there are no history asset ids for a given set of assets, Exec will insert rows
// into the history_assets table.
Expand All @@ -146,35 +113,21 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
keys = append(keys, key)
}

if err := a.lookupKeys(ctx, q, keys); err != nil {
return err
}
a.stats.Total += len(keys)

assetTypes := make([]string, 0, len(a.set)-len(a.ids))
assetCodes := make([]string, 0, len(a.set)-len(a.ids))
assetIssuers := make([]string, 0, len(a.set)-len(a.ids))
assetTypes := make([]string, 0, len(keys))
assetCodes := make([]string, 0, len(keys))
assetIssuers := make([]string, 0, len(keys))
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Slice(keys, func(i, j int) bool {
return keys[i].String() < keys[j].String()
})
insert := 0
for _, key := range keys {
if _, ok := a.ids[key]; ok {
continue
}
assetTypes = append(assetTypes, key.Type)
assetCodes = append(assetCodes, key.Code)
assetIssuers = append(assetIssuers, key.Issuer)
keys[insert] = key
insert++
}
if insert == 0 {
return nil
}
keys = keys[:insert]

var rows []Asset
err := bulkInsert(
ctx,
q,
Expand All @@ -197,13 +150,21 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
objects: assetTypes,
},
},
&rows,
)
if err != nil {
return err
}
a.stats.Inserted += insert
for _, row := range rows {
a.ids[AssetKey{
Type: row.Type,
Code: row.Code,
Issuer: row.Issuer,
}] = row.ID
}
a.stats.Total += len(keys)

return a.lookupKeys(ctx, q, keys)
return nil
}

// Stats returns the number of assets registered in the loader and the number of assets
Expand Down
3 changes: 1 addition & 2 deletions services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ func TestAssetLoader(t *testing.T) {
err := loader.Exec(context.Background(), session)
assert.NoError(t, err)
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 100,
Total: 100,
}, loader.Stats())
assert.Panics(t, func() {
loader.GetFuture(AssetKey{Type: "invalid"})
Expand Down
46 changes: 7 additions & 39 deletions services/horizon/internal/db2/history/claimable_balance_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/ordered"
)

// FutureClaimableBalanceID represents a future history claimable balance.
Expand Down Expand Up @@ -78,22 +76,6 @@ func (a *ClaimableBalanceLoader) getNow(id string) (int64, error) {
}
}

func (a *ClaimableBalanceLoader) lookupKeys(ctx context.Context, q *Q, ids []string) error {
for i := 0; i < len(ids); i += loaderLookupBatchSize {
end := ordered.Min(len(ids), i+loaderLookupBatchSize)

cbs, err := q.ClaimableBalancesByIDs(ctx, ids[i:end])
if err != nil {
return errors.Wrap(err, "could not select claimable balances")
}

for _, cb := range cbs {
a.ids[cb.BalanceID] = cb.InternalID
}
}
return nil
}

// Exec will look up all the internal history ids for the claimable balances registered in the loader.
// If there are no internal ids for a given set of claimable balances, Exec will insert rows
// into the history_claimable_balances table.
Expand All @@ -108,27 +90,10 @@ func (a *ClaimableBalanceLoader) Exec(ctx context.Context, session db.SessionInt
ids = append(ids, id)
}

if err := a.lookupKeys(ctx, q, ids); err != nil {
return err
}
a.stats.Total += len(ids)

insert := 0
for _, id := range ids {
if _, ok := a.ids[id]; ok {
continue
}
ids[insert] = id
insert++
}
if insert == 0 {
return nil
}
ids = ids[:insert]
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Strings(ids)

var rows []HistoryClaimableBalance
err := bulkInsert(
ctx,
q,
Expand All @@ -141,13 +106,16 @@ func (a *ClaimableBalanceLoader) Exec(ctx context.Context, session db.SessionInt
objects: ids,
},
},
&rows,
)
if err != nil {
return err
}
a.stats.Inserted += insert

return a.lookupKeys(ctx, q, ids)
for _, row := range rows {
a.ids[row.BalanceID] = row.InternalID
}
a.stats.Total += len(ids)
return nil
}

// Stats returns the number of claimable balances registered in the loader and the number of claimable balances
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func TestClaimableBalanceLoader(t *testing.T) {
err := loader.Exec(context.Background(), session)
assert.NoError(t, err)
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 100,
Total: 100,
}, loader.Stats())
assert.Panics(t, func() {
loader.GetFuture("not-present")
Expand Down
Loading

0 comments on commit 1aa2c4a

Please sign in to comment.