Skip to content

Commit

Permalink
stellar#4909: removed panic from Value() on loaders
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Oct 20, 2023
1 parent 1efb7a3 commit 7c3842b
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 81 deletions.
18 changes: 13 additions & 5 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const loaderLookupBatchSize = 50000

// Value implements the database/sql/driver Valuer interface.
func (a FutureAccountID) Value() (driver.Value, error) {
return a.loader.GetNow(a.address), nil
return a.loader.GetNow(a.address)
}

// AccountLoader will map account addresses to their history
Expand Down Expand Up @@ -71,11 +71,15 @@ func (a *AccountLoader) GetFuture(address string) FutureAccountID {
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AccountLoader) GetNow(address string) int64 {
if id, ok := a.ids[address]; !ok {
panic(fmt.Errorf("address %v not present", address))
func (a *AccountLoader) GetNow(address string) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid account loader state,
Exec was not called yet to properly seal and resolve %v id`, address)
}
if internalID, ok := a.ids[address]; !ok {
return 0, fmt.Errorf(`account loader address %q was not found`, address)
} else {
return id
return internalID, nil
}
}

Expand Down Expand Up @@ -207,3 +211,7 @@ func NewAccountLoaderStub() AccountLoaderStub {
func (a AccountLoaderStub) Insert(address string, id int64) {
a.Loader.ids[address] = id
}

func (a AccountLoaderStub) Sealed() {
a.Loader.sealed = true
}
24 changes: 10 additions & 14 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@ func TestAccountLoader(t *testing.T) {
}

loader := NewAccountLoader()
var futures []FutureAccountID
for _, address := range addresses {
future := loader.GetFuture(address)
futures = append(futures, future)
assert.Panics(t, func() {
loader.GetNow(address)
})
assert.Panics(t, func() {
future.Value()
})
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid account loader state,`)
duplicateFuture := loader.GetFuture(address)
assert.Equal(t, future, duplicateFuture)
}
Expand All @@ -42,15 +37,16 @@ func TestAccountLoader(t *testing.T) {
})

q := &Q{session}
for i, address := range addresses {
future := futures[i]
id := loader.GetNow(address)
val, err := future.Value()
for _, address := range addresses {
internalId, err := loader.GetNow(address)
assert.NoError(t, err)
assert.Equal(t, id, val)
var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, id)
assert.Equal(t, account.ID, internalId)
assert.Equal(t, account.Address, address)
}

_, err := loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
18 changes: 13 additions & 5 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type FutureAssetID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureAssetID) Value() (driver.Value, error) {
return a.loader.GetNow(a.asset), nil
return a.loader.GetNow(a.asset)
}

// AssetLoader will map assets to their history
Expand Down Expand Up @@ -81,11 +81,15 @@ func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID {
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AssetLoader) GetNow(asset AssetKey) int64 {
if id, ok := a.ids[asset]; !ok {
panic(fmt.Errorf("asset %v not present", asset))
func (a *AssetLoader) GetNow(asset AssetKey) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid asset loader state,
Exec was not called yet to properly seal and resolve %v id`, asset)
}
if internalID, ok := a.ids[asset]; !ok {
return 0, fmt.Errorf(`asset loader id %v was not found`, asset)
} else {
return id
return internalID, nil
}
}

Expand Down Expand Up @@ -213,3 +217,7 @@ func NewAssetLoaderStub() AssetLoaderStub {
func (a AssetLoaderStub) Insert(asset AssetKey, id int64) {
a.Loader.ids[asset] = id
}

func (a AssetLoaderStub) Sealed() {
a.Loader.sealed = true
}
22 changes: 9 additions & 13 deletions services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,11 @@ func TestAssetLoader(t *testing.T) {
}

loader := NewAssetLoader()
var futures []FutureAssetID
for _, key := range keys {
future := loader.GetFuture(key)
futures = append(futures, future)
assert.Panics(t, func() {
loader.GetNow(key)
})
assert.Panics(t, func() {
future.Value()
})
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid asset loader state,`)
duplicateFuture := loader.GetFuture(key)
assert.Equal(t, future, duplicateFuture)
}
Expand All @@ -56,12 +51,9 @@ func TestAssetLoader(t *testing.T) {
})

q := &Q{session}
for i, key := range keys {
future := futures[i]
internalID := loader.GetNow(key)
val, err := future.Value()
for _, key := range keys {
internalID, err := loader.GetNow(key)
assert.NoError(t, err)
assert.Equal(t, internalID, val)
var assetXDR xdr.Asset
if key.Type == "native" {
assetXDR = xdr.MustNewNativeAsset()
Expand All @@ -72,4 +64,8 @@ func TestAssetLoader(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, assetID, internalID)
}

_, err := loader.GetNow(AssetKey{})
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type FutureClaimableBalanceID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureClaimableBalanceID) Value() (driver.Value, error) {
return a.loader.getNow(a.id), nil
return a.loader.getNow(a.id)
}

// ClaimableBalanceLoader will map claimable balance ids to their internal
Expand Down Expand Up @@ -64,11 +64,15 @@ func (a *ClaimableBalanceLoader) GetFuture(id string) FutureClaimableBalanceID {
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// call can succeed.
func (a *ClaimableBalanceLoader) getNow(id string) int64 {
func (a *ClaimableBalanceLoader) getNow(id string) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid claimable balance loader state,
Exec was not called yet to properly seal and resolve %v id`, id)
}
if internalID, ok := a.ids[id]; !ok {
panic(fmt.Errorf("id %v not present", id))
return 0, fmt.Errorf(`claimable balance loader id %q was not found`, id)
} else {
return internalID
return internalID, nil
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@ func TestClaimableBalanceLoader(t *testing.T) {
for _, id := range ids {
future := loader.GetFuture(id)
futures = append(futures, future)
assert.Panics(t, func() {
loader.getNow(id)
})
assert.Panics(t, func() {
future.Value()
})
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid claimable balance loader state,`)
duplicateFuture := loader.GetFuture(id)
assert.Equal(t, future, duplicateFuture)
}
Expand All @@ -50,13 +47,16 @@ func TestClaimableBalanceLoader(t *testing.T) {
q := &Q{session}
for i, id := range ids {
future := futures[i]
internalID := loader.getNow(id)
val, err := future.Value()
internalID, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, internalID, val)
cb, err := q.ClaimableBalanceByID(context.Background(), id)
assert.NoError(t, err)
assert.Equal(t, cb.BalanceID, id)
assert.Equal(t, cb.InternalID, internalID)
}

futureCb := &FutureClaimableBalanceID{id: "not-present", loader: loader}
_, err := futureCb.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
18 changes: 13 additions & 5 deletions services/horizon/internal/db2/history/liquidity_pool_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type FutureLiquidityPoolID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureLiquidityPoolID) Value() (driver.Value, error) {
return a.loader.GetNow(a.id), nil
return a.loader.GetNow(a.id)
}

// LiquidityPoolLoader will map liquidity pools to their internal
Expand Down Expand Up @@ -64,11 +64,15 @@ func (a *LiquidityPoolLoader) GetFuture(id string) FutureLiquidityPoolID {
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *LiquidityPoolLoader) GetNow(id string) int64 {
if id, ok := a.ids[id]; !ok {
panic(fmt.Errorf("id %v not present", id))
func (a *LiquidityPoolLoader) GetNow(id string) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid liquidity pool loader state,
Exec was not called yet to properly seal and resolve %v id`, id)
}
if internalID, ok := a.ids[id]; !ok {
return 0, fmt.Errorf(`liquidity pool loader id %q was not found`, id)
} else {
return id
return internalID, nil
}
}

Expand Down Expand Up @@ -158,3 +162,7 @@ func NewLiquidityPoolLoaderStub() LiquidityPoolLoaderStub {
func (a LiquidityPoolLoaderStub) Insert(lp string, id int64) {
a.Loader.ids[lp] = id
}

func (a LiquidityPoolLoaderStub) Sealed() {
a.Loader.sealed = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,11 @@ func TestLiquidityPoolLoader(t *testing.T) {
}

loader := NewLiquidityPoolLoader()
var futures []FutureLiquidityPoolID
for _, id := range ids {
future := loader.GetFuture(id)
futures = append(futures, future)
assert.Panics(t, func() {
loader.GetNow(id)
})
assert.Panics(t, func() {
future.Value()
})
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid liquidity pool loader state,`)
duplicateFuture := loader.GetFuture(id)
assert.Equal(t, future, duplicateFuture)
}
Expand All @@ -45,15 +40,16 @@ func TestLiquidityPoolLoader(t *testing.T) {
})

q := &Q{session}
for i, id := range ids {
future := futures[i]
internalID := loader.GetNow(id)
val, err := future.Value()
for _, id := range ids {
internalID, err := loader.GetNow(id)
assert.NoError(t, err)
assert.Equal(t, internalID, val)
lp, err := q.LiquidityPoolByID(context.Background(), id)
assert.NoError(t, err)
assert.Equal(t, lp.PoolID, id)
assert.Equal(t, lp.InternalID, internalID)
}

_, err := loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func TestAddOperationParticipants(t *testing.T) {

op := ops[0]
tt.Assert.Equal(int64(240518172673), op.OperationID)
tt.Assert.Equal(accountLoader.GetNow(address), op.AccountID)
val, err := accountLoader.GetNow(address)
tt.Assert.NoError(err)
tt.Assert.Equal(val, op.AccountID)
}
}
4 changes: 3 additions & 1 deletion services/horizon/internal/db2/history/participants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func TestTransactionParticipantsBatch(t *testing.T) {
{TransactionID: 2},
}
for i := range expected {
expected[i].AccountID = accountLoader.GetNow(addresses[i])
val, err := accountLoader.GetNow(addresses[i])
tt.Assert.NoError(err)
expected[i].AccountID = val
}
tt.Assert.ElementsMatch(expected, participants)
}
10 changes: 4 additions & 6 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ func (s *ProcessorRunner) buildTransactionProcessor(
accountLoader := history.NewAccountLoader()
assetLoader := history.NewAssetLoader()
lpLoader := history.NewLiquidityPoolLoader()
cbLoader := history.NewClaimableBalanceLoader()

lazyLoaders := []horizonLazyLoader{accountLoader, assetLoader, lpLoader}
lazyLoaders := []horizonLazyLoader{accountLoader, assetLoader, lpLoader, cbLoader}

statsLedgerTransactionProcessor := &statsLedgerTransactionProcessor{
StatsLedgerTransactionProcessor: ledgerTransactionStats,
Expand All @@ -158,7 +159,7 @@ func (s *ProcessorRunner) buildTransactionProcessor(
processors.NewParticipantsProcessor(accountLoader,
s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()),
processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder()),
processors.NewClaimableBalancesTransactionProcessor(history.NewClaimableBalanceLoader(),
processors.NewClaimableBalancesTransactionProcessor(cbLoader,
s.historyQ.NewTransactionClaimableBalanceBatchInsertBuilder(), s.historyQ.NewOperationClaimableBalanceBatchInsertBuilder()),
processors.NewLiquidityPoolsTransactionProcessor(lpLoader,
s.historyQ.NewTransactionLiquidityPoolBatchInsertBuilder(), s.historyQ.NewOperationLiquidityPoolBatchInsertBuilder())}
Expand Down Expand Up @@ -328,7 +329,7 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos
}

// ensure capture of the ledger to history regardless of whether it has transactions.
ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), int(ledger.LedgerHeaderHistoryEntry().Header.LedgerVersion))
ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion)
ledgersProcessor.ProcessLedger(ledger)

transactionReader, err = ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(s.config.NetworkPassphrase, ledger)
Expand Down Expand Up @@ -406,9 +407,6 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (

stats.transactionStats, stats.transactionDurations, stats.tradeStats, err =
s.RunTransactionProcessorsOnLedger(ledger)
if err != nil {
return
}

return
}
Loading

0 comments on commit 7c3842b

Please sign in to comment.