Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - feat: prune ineffectual mempool txs #6443

Closed
wants to merge 17 commits into from
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ See [RELEASE](./RELEASE.md) for workflow instructions.

* [#6422](https://github.com/spacemeshos/go-spacemesh/pull/6422) Further improved performance of the proposal building
process to avoid late proposals.
* [#6443](https://github.com/spacemeshos/go-spacemesh/pull/6443) Improve eviction of ineffectual transactions in the database
which will now show up as ineffectual when querying them from the API.

## v1.7.6

Expand Down
4 changes: 4 additions & 0 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ func (t *ConStateAPIMock) GetStateRoot() (types.Hash32, error) {
return stateRoot, nil
}

func (t *ConStateAPIMock) HasEvicted(id types.TransactionID) (bool, error) {
panic("not implemented")
}

func (t *ConStateAPIMock) GetMeshTransaction(id types.TransactionID) (*types.MeshTransaction, error) {
tx, ok := t.returnTx[id]
if ok {
Expand Down
1 change: 1 addition & 0 deletions api/grpcserver/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type conservativeState interface {
GetMeshTransactions([]types.TransactionID) ([]*types.MeshTransaction, map[types.TransactionID]struct{})
GetTransactionsByAddress(types.LayerID, types.LayerID, types.Address) ([]*types.MeshTransaction, error)
Validation(raw types.RawTx) system.ValidationRequest
HasEvicted(tid types.TransactionID) (bool, error)
}

// syncer is the API to get sync status.
Expand Down
39 changes: 39 additions & 0 deletions api/grpcserver/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion api/grpcserver/transaction_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,15 @@
case types.APPLIED:
state = pb.TransactionState_TRANSACTION_STATE_PROCESSED
default:
state = pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED
evicted, err := s.conState.HasEvicted(txID)
if err != nil {
return nil, state
}
if evicted {
state = pb.TransactionState_TRANSACTION_STATE_INEFFECTUAL
} else {
state = pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED
}

Check warning on line 154 in api/grpcserver/transaction_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/transaction_service.go#L146-L154

Added lines #L146 - L154 were not covered by tests
}
return &tx.Transaction, state
}
Expand Down
8 changes: 4 additions & 4 deletions genvm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@

for _, reward := range rewardsResult {
if err := rewards.Add(tx, &reward); err != nil {
return nil, nil, fmt.Errorf("%w: %w", core.ErrInternal, err)
return nil, nil, fmt.Errorf("add reward %w: %w", core.ErrInternal, err)

Check warning on line 234 in genvm/vm.go

View check run for this annotation

Codecov / codecov/patch

genvm/vm.go#L234

Added line #L234 was not covered by tests
}
}

Expand All @@ -247,17 +247,17 @@
return true
})
if err != nil {
return nil, nil, fmt.Errorf("%w: %w", core.ErrInternal, err)
return nil, nil, fmt.Errorf("iterate changed %w: %w", core.ErrInternal, err)

Check warning on line 250 in genvm/vm.go

View check run for this annotation

Codecov / codecov/patch

genvm/vm.go#L250

Added line #L250 was not covered by tests
}
writesPerBlock.Observe(float64(total))

var hashSum types.Hash32
hasher.Sum(hashSum[:0])
if err := layers.UpdateStateHash(tx, layer, hashSum); err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("update state hash: %w", err)

Check warning on line 257 in genvm/vm.go

View check run for this annotation

Codecov / codecov/patch

genvm/vm.go#L257

Added line #L257 was not covered by tests
}
if err := tx.Commit(); err != nil {
return nil, nil, fmt.Errorf("%w: %w", core.ErrInternal, err)
return nil, nil, fmt.Errorf("commit %w: %w", core.ErrInternal, err)

Check warning on line 260 in genvm/vm.go

View check run for this annotation

Codecov / codecov/patch

genvm/vm.go#L260

Added line #L260 was not covered by tests
}
ss.IterateChanged(func(account *core.Account) bool {
if err := events.ReportAccountUpdate(account.Address); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/seehuhn/mt19937 v1.0.0
github.com/slok/go-http-metrics v0.13.0
github.com/spacemeshos/api/release/go v1.55.0
github.com/spacemeshos/api/release/go v1.56.0
github.com/spacemeshos/economics v0.1.4
github.com/spacemeshos/fixed v0.1.2
github.com/spacemeshos/go-scale v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
github.com/spacemeshos/api/release/go v1.55.0 h1:IQ8PmQ1d7CwUiM1r3NH8uZ+JkEyNjSltiAuqEY6dn6o=
github.com/spacemeshos/api/release/go v1.55.0/go.mod h1:qM6GTS2QtUvxPNIJf+2ObH63bGXYrJnapgOd6l6pbpQ=
github.com/spacemeshos/api/release/go v1.56.0 h1:llBVijoO4I3mhHk0OtGJdTT/11I7ajo0CZp3x8h1EjA=
github.com/spacemeshos/api/release/go v1.56.0/go.mod h1:6o17nhNyXpbVeijAQqkZfL8Pe/IkMGAWMLSLZni0DOU=
github.com/spacemeshos/economics v0.1.4 h1:twlawrcQhYNqPgyDv08+24EL/OgUKz3d7q+PvJIAND0=
github.com/spacemeshos/economics v0.1.4/go.mod h1:6HKWKiKdxjVQcGa2z/wA0LR4M/DzKib856bP16yqNmQ=
github.com/spacemeshos/fixed v0.1.2 h1:pENQ8pXFAqin3f15ZLoOVVeSgcmcFJ0IFdFm4+9u4SM=
Expand Down
5 changes: 5 additions & 0 deletions sql/statesql/schema/migrations/0026_pruned_txs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE evicted_mempool (
id CHAR(32) NOT NULL,
time INT NOT NULL,
PRIMARY KEY (id)
);
7 changes: 6 additions & 1 deletion sql/statesql/schema/schema.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PRAGMA user_version = 25;
PRAGMA user_version = 26;
CREATE TABLE accounts
(
address CHAR(24),
Expand Down Expand Up @@ -69,6 +69,11 @@ CREATE TABLE certificates
valid bool NOT NULL,
PRIMARY KEY (layer, block)
);
CREATE TABLE evicted_mempool (
id CHAR(32) NOT NULL,
time INT NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE identities
(
pubkey CHAR(32) PRIMARY KEY,
Expand Down
63 changes: 63 additions & 0 deletions sql/transactions/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,17 @@
return rows > 0, nil
}

func HasEvicted(db sql.Executor, id types.TransactionID) (bool, error) {
rows, err := db.Exec("select 1 from evicted_mempool where id = ?1",
func(stmt *sql.Statement) {
stmt.BindBytes(1, id.Bytes())
}, nil)
if err != nil {
return false, fmt.Errorf("has evicted %s: %w", id, err)
}

Check warning on line 243 in sql/transactions/transactions.go

View check run for this annotation

Codecov / codecov/patch

sql/transactions/transactions.go#L242-L243

Added lines #L242 - L243 were not covered by tests
return rows > 0, nil
}

// GetByAddress finds all transactions for an address.
func GetByAddress(db sql.Executor, from, to types.LayerID, address types.Address) ([]*types.MeshTransaction, error) {
var txs []*types.MeshTransaction
Expand Down Expand Up @@ -295,6 +306,58 @@
}, "get acct pending from nonce")
}

// GetAcctPendingToNonce get all pending transactions with nonce before `to` for the given address.
func GetAcctPendingToNonce(db sql.Executor, address types.Address, to uint64) ([]types.TransactionID, error) {
ids := make([]types.TransactionID, 0)
_, err := db.Exec(`select id from transactions
where principal = ?1 and nonce < ?2 and result is null
order by nonce asc, timestamp asc;`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, address.Bytes())
stmt.BindBytes(2, util.Uint64ToBytesBigEndian(to))
}, func(stmt *sql.Statement) bool {
id := types.TransactionID{}
stmt.ColumnBytes(0, id[:])
ids = append(ids, id)
return true
})
if err != nil {
return nil, fmt.Errorf("get acct pending to nonce %s: %w", address, err)
}

Check warning on line 326 in sql/transactions/transactions.go

View check run for this annotation

Codecov / codecov/patch

sql/transactions/transactions.go#L325-L326

Added lines #L325 - L326 were not covered by tests
return ids, nil
}

func SetEvicted(db sql.Executor, id types.TransactionID) error {
if _, err := db.Exec("insert into evicted_mempool (id, time) values (?1, ?2) on conflict do nothing;",
func(stmt *sql.Statement) {
stmt.BindBytes(1, id.Bytes())
stmt.BindInt64(2, time.Now().UnixNano())
}, nil); err != nil {
return fmt.Errorf("set evicted %s: %w", id, err)
}

Check warning on line 337 in sql/transactions/transactions.go

View check run for this annotation

Codecov / codecov/patch

sql/transactions/transactions.go#L336-L337

Added lines #L336 - L337 were not covered by tests
return nil
}

func Delete(db sql.Executor, id types.TransactionID) error {
if _, err := db.Exec("delete from transactions where id = ?1;",
func(stmt *sql.Statement) {
stmt.BindBytes(1, id.Bytes())
}, nil); err != nil {
return fmt.Errorf("delete %s: %w", id, err)
}

Check warning on line 347 in sql/transactions/transactions.go

View check run for this annotation

Codecov / codecov/patch

sql/transactions/transactions.go#L346-L347

Added lines #L346 - L347 were not covered by tests
return nil
}

func PruneEvicted(db sql.Executor, before time.Time) error {
if _, err := db.Exec("delete from evicted_mempool where time < ?1;",
func(stmt *sql.Statement) {
stmt.BindInt64(1, before.UnixNano())
}, nil); err != nil {
return fmt.Errorf("prune evicted %w", err)
}

Check warning on line 357 in sql/transactions/transactions.go

View check run for this annotation

Codecov / codecov/patch

sql/transactions/transactions.go#L356-L357

Added lines #L356 - L357 were not covered by tests
return nil
}

// query MUST ensure that this order of fields tx, header, layer, block, timestamp, id.
func queryPending(
db sql.Executor,
Expand Down
70 changes: 70 additions & 0 deletions sql/transactions/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,3 +562,73 @@ func TestTransactionInBlock(t *testing.T) {
_, _, err = transactions.TransactionInBlock(db, tid, lids[2])
require.ErrorIs(t, err, sql.ErrNotFound)
}

func TestTransactionEvictMempool(t *testing.T) {
principals := []types.Address{
{1},
{2},
{3},
}
txs := []types.Transaction{
{
RawTx: types.RawTx{ID: types.TransactionID{1}},
TxHeader: &types.TxHeader{Principal: principals[0], Nonce: 0},
},
{
RawTx: types.RawTx{ID: types.TransactionID{2}},
TxHeader: &types.TxHeader{Principal: principals[0], Nonce: 1},
},
{
RawTx: types.RawTx{ID: types.TransactionID{3}},
TxHeader: &types.TxHeader{Principal: principals[1], Nonce: 0},
},
}
db := statesql.InMemoryTest(t)
for _, tx := range txs {
require.NoError(t, transactions.Add(db, &tx, time.Time{}))
}
err := transactions.SetEvicted(db, types.TransactionID{1})
require.NoError(t, err)

err = transactions.Delete(db, types.TransactionID{1})
require.NoError(t, err)

pending, err := transactions.GetAcctPendingFromNonce(db, principals[0], 1)
require.NoError(t, err)
require.Len(t, pending, 1)
require.Equal(t, pending[0].ID, txs[1].ID)

pending, err = transactions.GetAcctPendingFromNonce(db, principals[1], 0)
require.NoError(t, err)
require.Len(t, pending, 1)
require.Equal(t, pending[0].ID, txs[2].ID)

has, err := transactions.Has(db, txs[0].ID)
require.False(t, has)
require.NoError(t, err)

has, err = transactions.HasEvicted(db, txs[0].ID)
require.True(t, has)
require.NoError(t, err)
}

func TestPruneEvicted(t *testing.T) {
txId := types.TransactionID{1}
db := statesql.InMemoryTest(t)
db.Exec(`insert into evicted_mempool (id, time) values (?1,?2);`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, txId.Bytes())
stmt.BindInt64(2, time.Now().Add(-13*time.Hour).UnixNano())
}, nil)

has, err := transactions.HasEvicted(db, txId)
require.True(t, has)
require.NoError(t, err)

err = transactions.PruneEvicted(db, time.Now().Add(-12*time.Hour))
require.NoError(t, err)

has, err = transactions.HasEvicted(db, txId)
require.False(t, has)
require.NoError(t, err)
}
28 changes: 28 additions & 0 deletions txs/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,32 @@
ac.txsByNonce = list.New()
ac.startNonce = nextNonce
ac.startBalance = newBalance

err := ac.evictPendingNonce(db)
if err != nil {
return fmt.Errorf("evict pending: %w", err)
}

Check warning on line 403 in txs/cache.go

View check run for this annotation

Codecov / codecov/patch

txs/cache.go#L402-L403

Added lines #L402 - L403 were not covered by tests
return ac.addPendingFromNonce(logger, db, ac.startNonce, applied)
}

func (ac *accountCache) evictPendingNonce(db sql.StateDatabase) error {
return db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
txIds, err := transactions.GetAcctPendingToNonce(tx, ac.addr, ac.startNonce)
if err != nil {
return fmt.Errorf("get pending to nonce: %w", err)
}

Check warning on line 412 in txs/cache.go

View check run for this annotation

Codecov / codecov/patch

txs/cache.go#L411-L412

Added lines #L411 - L412 were not covered by tests
for _, tid := range txIds {
if err := transactions.SetEvicted(tx, tid); err != nil {
return fmt.Errorf("set evicted for %s: %w", tid, err)
}

Check warning on line 416 in txs/cache.go

View check run for this annotation

Codecov / codecov/patch

txs/cache.go#L415-L416

Added lines #L415 - L416 were not covered by tests
if err := transactions.Delete(tx, tid); err != nil {
return fmt.Errorf("delete tx %s: %w", tid, err)
}

Check warning on line 419 in txs/cache.go

View check run for this annotation

Codecov / codecov/patch

txs/cache.go#L418-L419

Added lines #L418 - L419 were not covered by tests
}
return nil
})
}

func (ac *accountCache) shouldEvict() bool {
return ac.txsByNonce.Len() == 0 && !ac.moreInDB
}
Expand Down Expand Up @@ -776,6 +799,11 @@
}
acctResetDuration.Observe(float64(time.Since(t2)))
}

err := transactions.PruneEvicted(db, time.Now().Add(-12*time.Hour))
if err != nil {
logger.Warn("failed to prune evicted", zap.Error(err))
}

Check warning on line 806 in txs/cache.go

View check run for this annotation

Codecov / codecov/patch

txs/cache.go#L805-L806

Added lines #L805 - L806 were not covered by tests
return nil
}

Expand Down
8 changes: 3 additions & 5 deletions txs/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,10 @@ func TestCache_Account_HappyFlow(t *testing.T) {
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance+income)
// mempool is unchanged
checkMempool(t, tc.Cache, expectedMempool)

// pruning has removed old and ineffective txs
for _, mtx := range append(oldNonces, sameNonces...) {
got, err := transactions.Get(tc.db, mtx.ID)
require.NoError(t, err)
require.Equal(t, types.MEMPOOL, got.State)
checkTXNotInDB(t, tc.db, mtx.ID)
}

// revert to one layer before lid
Expand All @@ -357,8 +357,6 @@ func TestCache_Account_HappyFlow(t *testing.T) {
}
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance)
checkTXStateFromDB(t, tc.db, mtxs, types.MEMPOOL)
checkTXStateFromDB(t, tc.db, oldNonces, types.MEMPOOL)
checkTXStateFromDB(t, tc.db, sameNonces, types.MEMPOOL)
}

func TestCache_Account_TXInMultipleLayers(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions txs/conservative_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,7 @@
})))
return result
}

func (cs *ConservativeState) HasEvicted(tid types.TransactionID) (bool, error) {
return transactions.HasEvicted(cs.db, tid)

Check warning on line 288 in txs/conservative_state.go

View check run for this annotation

Codecov / codecov/patch

txs/conservative_state.go#L287-L288

Added lines #L287 - L288 were not covered by tests
}
Loading