Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - feat: prune ineffectual mempool txs #6443

Closed
wants to merge 17 commits into from
4 changes: 4 additions & 0 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ func (t *ConStateAPIMock) GetStateRoot() (types.Hash32, error) {
return stateRoot, nil
}

func (t *ConStateAPIMock) HasEvicted(id types.TransactionID) (bool, error) {
panic("not implemented")
}

func (t *ConStateAPIMock) GetMeshTransaction(id types.TransactionID) (*types.MeshTransaction, error) {
tx, ok := t.returnTx[id]
if ok {
Expand Down
1 change: 1 addition & 0 deletions api/grpcserver/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type conservativeState interface {
GetMeshTransactions([]types.TransactionID) ([]*types.MeshTransaction, map[types.TransactionID]struct{})
GetTransactionsByAddress(types.LayerID, types.LayerID, types.Address) ([]*types.MeshTransaction, error)
Validation(raw types.RawTx) system.ValidationRequest
HasEvicted(tid types.TransactionID) (bool, error)
}

// syncer is the API to get sync status.
Expand Down
39 changes: 39 additions & 0 deletions api/grpcserver/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion api/grpcserver/transaction_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,15 @@
case types.APPLIED:
state = pb.TransactionState_TRANSACTION_STATE_PROCESSED
default:
state = pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED
evicted, err := s.conState.HasEvicted(txID)
if err != nil {
return nil, state
}
if evicted {
state = pb.TransactionState_TRANSACTION_STATE_INEFFECTUAL
} else {
state = pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED
}

Check warning on line 154 in api/grpcserver/transaction_service.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/transaction_service.go#L146-L154

Added lines #L146 - L154 were not covered by tests
}
return &tx.Transaction, state
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/seehuhn/mt19937 v1.0.0
github.com/slok/go-http-metrics v0.13.0
github.com/spacemeshos/api/release/go v1.55.0
github.com/spacemeshos/api/release/go v1.55.1-0.20241111172452-53c7ab7d2740
github.com/spacemeshos/economics v0.1.4
github.com/spacemeshos/fixed v0.1.2
github.com/spacemeshos/go-scale v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
github.com/spacemeshos/api/release/go v1.55.0 h1:IQ8PmQ1d7CwUiM1r3NH8uZ+JkEyNjSltiAuqEY6dn6o=
github.com/spacemeshos/api/release/go v1.55.0/go.mod h1:qM6GTS2QtUvxPNIJf+2ObH63bGXYrJnapgOd6l6pbpQ=
github.com/spacemeshos/api/release/go v1.55.1-0.20241111172452-53c7ab7d2740 h1:yVJ3wfBGLNKC1FgdtJ9/IKh2cfpLCrNET89MZ9yxrqc=
github.com/spacemeshos/api/release/go v1.55.1-0.20241111172452-53c7ab7d2740/go.mod h1:6o17nhNyXpbVeijAQqkZfL8Pe/IkMGAWMLSLZni0DOU=
github.com/spacemeshos/economics v0.1.4 h1:twlawrcQhYNqPgyDv08+24EL/OgUKz3d7q+PvJIAND0=
github.com/spacemeshos/economics v0.1.4/go.mod h1:6HKWKiKdxjVQcGa2z/wA0LR4M/DzKib856bP16yqNmQ=
github.com/spacemeshos/fixed v0.1.2 h1:pENQ8pXFAqin3f15ZLoOVVeSgcmcFJ0IFdFm4+9u4SM=
Expand Down
5 changes: 5 additions & 0 deletions sql/statesql/schema/migrations/0026_pruned_txs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE evicted_mempool (
id CHAR(32) NOT NULL,
time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
7 changes: 6 additions & 1 deletion sql/statesql/schema/schema.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PRAGMA user_version = 25;
PRAGMA user_version = 26;
CREATE TABLE accounts
(
address CHAR(24),
Expand Down Expand Up @@ -69,6 +69,11 @@ CREATE TABLE certificates
valid bool NOT NULL,
PRIMARY KEY (layer, block)
);
CREATE TABLE evicted_mempool (
id CHAR(32) NOT NULL,
time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
CREATE TABLE identities
(
pubkey CHAR(32) PRIMARY KEY,
Expand Down
57 changes: 57 additions & 0 deletions sql/transactions/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,17 @@
return rows > 0, nil
}

func HasEvicted(db sql.Executor, id types.TransactionID) (bool, error) {
rows, err := db.Exec("select 1 from evicted_mempool where id = ?1",
func(stmt *sql.Statement) {
stmt.BindBytes(1, id.Bytes())
}, nil)
if err != nil {
return false, fmt.Errorf("has evicted %s: %w", id, err)
}

Check warning on line 243 in sql/transactions/transactions.go

View check run for this annotation

Codecov / codecov/patch

sql/transactions/transactions.go#L242-L243

Added lines #L242 - L243 were not covered by tests
return rows > 0, nil
}

// GetByAddress finds all transactions for an address.
func GetByAddress(db sql.Executor, from, to types.LayerID, address types.Address) ([]*types.MeshTransaction, error) {
var txs []*types.MeshTransaction
Expand Down Expand Up @@ -295,6 +306,52 @@
}, "get acct pending from nonce")
}

// EvictPendingNonce will evict the pending transactions
// from a given principal address into the evited_mempool table.
func EvictPendingNonce(db sql.Executor, address types.Address, to uint64) error {
txs, err := queryPending(db, `select tx, header, layer, block, timestamp, id from transactions
where principal = ?1 and nonce < ?2 and result is null`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, address.Bytes())
stmt.BindBytes(2, util.Uint64ToBytesBigEndian(to))
}, "get acct pending to nonce")
if err != nil {
return fmt.Errorf("query pending: %w", err)
}

Check warning on line 320 in sql/transactions/transactions.go

View check run for this annotation

Codecov / codecov/patch

sql/transactions/transactions.go#L319-L320

Added lines #L319 - L320 were not covered by tests
insert := func(txId []byte) error {
if _, err := db.Exec("insert into evicted_mempool (id) values (?1) on conflict do nothing;",
func(stmt *sql.Statement) {
stmt.BindBytes(1, txId)
}, nil); err != nil {
return fmt.Errorf("insert: %w", err)
}

Check warning on line 327 in sql/transactions/transactions.go

View check run for this annotation

Codecov / codecov/patch

sql/transactions/transactions.go#L326-L327

Added lines #L326 - L327 were not covered by tests

if _, err := db.Exec("delete from transactions where id = ?1;",
func(stmt *sql.Statement) {
stmt.BindBytes(1, txId)
}, nil); err != nil {
return fmt.Errorf("delete: %w", err)
}

Check warning on line 334 in sql/transactions/transactions.go

View check run for this annotation

Codecov / codecov/patch

sql/transactions/transactions.go#L333-L334

Added lines #L333 - L334 were not covered by tests

return nil
}

for _, tx := range txs {
if err := insert(tx.ID.Bytes()); err != nil {
return fmt.Errorf("insert evicted: %w", err)
}

Check warning on line 342 in sql/transactions/transactions.go

View check run for this annotation

Codecov / codecov/patch

sql/transactions/transactions.go#L341-L342

Added lines #L341 - L342 were not covered by tests
}
return nil
}

const PRUNE_PERIOD = "-12 hours"

func PruneEvicted(db sql.Executor) error {
sql := fmt.Sprintf("delete from evicted_mempool where time < DATETIME(CURRENT_TIMESTAMP,'%s');", PRUNE_PERIOD)
_, err := db.Exec(sql, nil, nil)
return err
}

// query MUST ensure that this order of fields tx, header, layer, block, timestamp, id.
func queryPending(
db sql.Executor,
Expand Down
67 changes: 67 additions & 0 deletions sql/transactions/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,3 +562,70 @@ func TestTransactionInBlock(t *testing.T) {
_, _, err = transactions.TransactionInBlock(db, tid, lids[2])
require.ErrorIs(t, err, sql.ErrNotFound)
}

func TestTransactionEvictMempool(t *testing.T) {
principals := []types.Address{
{1},
{2},
{3},
}
txs := []types.Transaction{
{
RawTx: types.RawTx{ID: types.TransactionID{1}},
TxHeader: &types.TxHeader{Principal: principals[0], Nonce: 0},
},
{
RawTx: types.RawTx{ID: types.TransactionID{2}},
TxHeader: &types.TxHeader{Principal: principals[0], Nonce: 1},
},
{
RawTx: types.RawTx{ID: types.TransactionID{3}},
TxHeader: &types.TxHeader{Principal: principals[1], Nonce: 0},
},
}
db := statesql.InMemoryTest(t)
for _, tx := range txs {
require.NoError(t, transactions.Add(db, &tx, time.Time{}))
}
err := transactions.EvictPendingNonce(db, principals[0], 1)
require.NoError(t, err)

pending, err := transactions.GetAcctPendingFromNonce(db, principals[0], 1)
require.NoError(t, err)
require.Len(t, pending, 1)
require.Equal(t, pending[0].ID, txs[1].ID)

pending, err = transactions.GetAcctPendingFromNonce(db, principals[1], 0)
require.NoError(t, err)
require.Len(t, pending, 1)
require.Equal(t, pending[0].ID, txs[2].ID)

has, err := transactions.Has(db, txs[0].ID)
require.False(t, has)
require.NoError(t, err)

has, err = transactions.HasEvicted(db, txs[0].ID)
require.True(t, has)
require.NoError(t, err)
}

func TestPruneEvicted(t *testing.T) {
txId := types.TransactionID{1}
db := statesql.InMemoryTest(t)
db.Exec(`insert into evicted_mempool (id,time)
values (?1,DATETIME(CURRENT_TIMESTAMP,'-13 hours'));`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, txId.Bytes())
}, nil)

has, err := transactions.HasEvicted(db, txId)
require.True(t, has)
require.NoError(t, err)

err = transactions.PruneEvicted(db)
require.NoError(t, err)

has, err = transactions.HasEvicted(db, txId)
require.False(t, has)
require.NoError(t, err)
}
10 changes: 10 additions & 0 deletions txs/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,11 @@
ac.txsByNonce = list.New()
ac.startNonce = nextNonce
ac.startBalance = newBalance

err := transactions.EvictPendingNonce(db, ac.addr, ac.startNonce)
if err != nil {
return fmt.Errorf("prune pending: %w", err)
}

Check warning on line 403 in txs/cache.go

View check run for this annotation

Codecov / codecov/patch

txs/cache.go#L402-L403

Added lines #L402 - L403 were not covered by tests
return ac.addPendingFromNonce(logger, db, ac.startNonce, applied)
}

Expand Down Expand Up @@ -776,6 +781,11 @@
}
acctResetDuration.Observe(float64(time.Since(t2)))
}

err := transactions.PruneEvicted(db)
if err != nil {
logger.Warn("failed to prune evicted", zap.Error(err))
}

Check warning on line 788 in txs/cache.go

View check run for this annotation

Codecov / codecov/patch

txs/cache.go#L787-L788

Added lines #L787 - L788 were not covered by tests
return nil
}

Expand Down
8 changes: 3 additions & 5 deletions txs/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,10 @@ func TestCache_Account_HappyFlow(t *testing.T) {
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance+income)
// mempool is unchanged
checkMempool(t, tc.Cache, expectedMempool)

// pruning has removed old and ineffective txs
for _, mtx := range append(oldNonces, sameNonces...) {
got, err := transactions.Get(tc.db, mtx.ID)
require.NoError(t, err)
require.Equal(t, types.MEMPOOL, got.State)
checkTXNotInDB(t, tc.db, mtx.ID)
}

// revert to one layer before lid
Expand All @@ -357,8 +357,6 @@ func TestCache_Account_HappyFlow(t *testing.T) {
}
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance)
checkTXStateFromDB(t, tc.db, mtxs, types.MEMPOOL)
checkTXStateFromDB(t, tc.db, oldNonces, types.MEMPOOL)
checkTXStateFromDB(t, tc.db, sameNonces, types.MEMPOOL)
}

func TestCache_Account_TXInMultipleLayers(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions txs/conservative_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,7 @@
})))
return result
}

func (cs *ConservativeState) HasEvicted(tid types.TransactionID) (bool, error) {
return transactions.HasEvicted(cs.db, tid)

Check warning on line 288 in txs/conservative_state.go

View check run for this annotation

Codecov / codecov/patch

txs/conservative_state.go#L287-L288

Added lines #L287 - L288 were not covered by tests
}
Loading