Skip to content

Commit

Permalink
Try #6443:
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemesh-bors[bot] authored Nov 11, 2024
2 parents f711c61 + b7b67fa commit d40da73
Show file tree
Hide file tree
Showing 13 changed files with 208 additions and 10 deletions.
4 changes: 4 additions & 0 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ func (t *ConStateAPIMock) GetStateRoot() (types.Hash32, error) {
return stateRoot, nil
}

func (t *ConStateAPIMock) HasEvicted(id types.TransactionID) (bool, error) {
panic("not implemented")
}

func (t *ConStateAPIMock) GetMeshTransaction(id types.TransactionID) (*types.MeshTransaction, error) {
tx, ok := t.returnTx[id]
if ok {
Expand Down
1 change: 1 addition & 0 deletions api/grpcserver/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type conservativeState interface {
GetMeshTransactions([]types.TransactionID) ([]*types.MeshTransaction, map[types.TransactionID]struct{})
GetTransactionsByAddress(types.LayerID, types.LayerID, types.Address) ([]*types.MeshTransaction, error)
Validation(raw types.RawTx) system.ValidationRequest
HasEvicted(tid types.TransactionID) (bool, error)
}

// syncer is the API to get sync status.
Expand Down
39 changes: 39 additions & 0 deletions api/grpcserver/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion api/grpcserver/transaction_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,15 @@ func (s *TransactionService) getTransactionAndStatus(
case types.APPLIED:
state = pb.TransactionState_TRANSACTION_STATE_PROCESSED
default:
state = pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED
evicted, err := s.conState.HasEvicted(txID)
if err != nil {
return nil, state
}
if evicted {
state = pb.TransactionState_TRANSACTION_STATE_INEFFECTUAL
} else {
state = pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED
}
}
return &tx.Transaction, state
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/seehuhn/mt19937 v1.0.0
github.com/slok/go-http-metrics v0.13.0
github.com/spacemeshos/api/release/go v1.55.0
github.com/spacemeshos/api/release/go v1.55.1-0.20241111172452-53c7ab7d2740
github.com/spacemeshos/economics v0.1.4
github.com/spacemeshos/fixed v0.1.2
github.com/spacemeshos/go-scale v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
github.com/spacemeshos/api/release/go v1.55.0 h1:IQ8PmQ1d7CwUiM1r3NH8uZ+JkEyNjSltiAuqEY6dn6o=
github.com/spacemeshos/api/release/go v1.55.0/go.mod h1:qM6GTS2QtUvxPNIJf+2ObH63bGXYrJnapgOd6l6pbpQ=
github.com/spacemeshos/api/release/go v1.55.1-0.20241111172452-53c7ab7d2740 h1:yVJ3wfBGLNKC1FgdtJ9/IKh2cfpLCrNET89MZ9yxrqc=
github.com/spacemeshos/api/release/go v1.55.1-0.20241111172452-53c7ab7d2740/go.mod h1:6o17nhNyXpbVeijAQqkZfL8Pe/IkMGAWMLSLZni0DOU=
github.com/spacemeshos/economics v0.1.4 h1:twlawrcQhYNqPgyDv08+24EL/OgUKz3d7q+PvJIAND0=
github.com/spacemeshos/economics v0.1.4/go.mod h1:6HKWKiKdxjVQcGa2z/wA0LR4M/DzKib856bP16yqNmQ=
github.com/spacemeshos/fixed v0.1.2 h1:pENQ8pXFAqin3f15ZLoOVVeSgcmcFJ0IFdFm4+9u4SM=
Expand Down
5 changes: 5 additions & 0 deletions sql/statesql/schema/migrations/0026_pruned_txs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE evicted_mempool (
id CHAR(32) NOT NULL,
time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
7 changes: 6 additions & 1 deletion sql/statesql/schema/schema.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PRAGMA user_version = 25;
PRAGMA user_version = 26;
CREATE TABLE accounts
(
address CHAR(24),
Expand Down Expand Up @@ -69,6 +69,11 @@ CREATE TABLE certificates
valid bool NOT NULL,
PRIMARY KEY (layer, block)
);
CREATE TABLE evicted_mempool (
id CHAR(32) NOT NULL,
time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
CREATE TABLE identities
(
pubkey CHAR(32) PRIMARY KEY,
Expand Down
57 changes: 57 additions & 0 deletions sql/transactions/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,17 @@ func Has(db sql.Executor, id types.TransactionID) (bool, error) {
return rows > 0, nil
}

func HasEvicted(db sql.Executor, id types.TransactionID) (bool, error) {
rows, err := db.Exec("select 1 from evicted_mempool where id = ?1",
func(stmt *sql.Statement) {
stmt.BindBytes(1, id.Bytes())
}, nil)
if err != nil {
return false, fmt.Errorf("has evicted %s: %w", id, err)
}
return rows > 0, nil
}

// GetByAddress finds all transactions for an address.
func GetByAddress(db sql.Executor, from, to types.LayerID, address types.Address) ([]*types.MeshTransaction, error) {
var txs []*types.MeshTransaction
Expand Down Expand Up @@ -295,6 +306,52 @@ func GetAcctPendingFromNonce(db sql.Executor, address types.Address, from uint64
}, "get acct pending from nonce")
}

// EvictPendingNonce will evict the pending transactions
// from a given principal address into the evited_mempool table.
func EvictPendingNonce(db sql.Executor, address types.Address, to uint64) error {
txs, err := queryPending(db, `select tx, header, layer, block, timestamp, id from transactions
where principal = ?1 and nonce < ?2 and result is null`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, address.Bytes())
stmt.BindBytes(2, util.Uint64ToBytesBigEndian(to))
}, "get acct pending to nonce")
if err != nil {
return fmt.Errorf("query pending: %w", err)
}
insert := func(txId []byte) error {
if _, err := db.Exec("insert into evicted_mempool (id) values (?1) on conflict do nothing;",
func(stmt *sql.Statement) {
stmt.BindBytes(1, txId)
}, nil); err != nil {
return fmt.Errorf("insert: %w", err)
}

if _, err := db.Exec("delete from transactions where id = ?1;",
func(stmt *sql.Statement) {
stmt.BindBytes(1, txId)
}, nil); err != nil {
return fmt.Errorf("delete: %w", err)
}

return nil
}

for _, tx := range txs {
if err := insert(tx.ID.Bytes()); err != nil {
return fmt.Errorf("insert evicted: %w", err)
}
}
return nil
}

const PRUNE_PERIOD = "-12 hours"

func PruneEvicted(db sql.Executor) error {
sql := fmt.Sprintf("delete from evicted_mempool where time < DATETIME(CURRENT_TIMESTAMP,'%s');", PRUNE_PERIOD)
_, err := db.Exec(sql, nil, nil)
return err
}

// query MUST ensure that this order of fields tx, header, layer, block, timestamp, id.
func queryPending(
db sql.Executor,
Expand Down
67 changes: 67 additions & 0 deletions sql/transactions/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,3 +562,70 @@ func TestTransactionInBlock(t *testing.T) {
_, _, err = transactions.TransactionInBlock(db, tid, lids[2])
require.ErrorIs(t, err, sql.ErrNotFound)
}

func TestTransactionEvictMempool(t *testing.T) {
principals := []types.Address{
{1},
{2},
{3},
}
txs := []types.Transaction{
{
RawTx: types.RawTx{ID: types.TransactionID{1}},
TxHeader: &types.TxHeader{Principal: principals[0], Nonce: 0},
},
{
RawTx: types.RawTx{ID: types.TransactionID{2}},
TxHeader: &types.TxHeader{Principal: principals[0], Nonce: 1},
},
{
RawTx: types.RawTx{ID: types.TransactionID{3}},
TxHeader: &types.TxHeader{Principal: principals[1], Nonce: 0},
},
}
db := statesql.InMemoryTest(t)
for _, tx := range txs {
require.NoError(t, transactions.Add(db, &tx, time.Time{}))
}
err := transactions.EvictPendingNonce(db, principals[0], 1)
require.NoError(t, err)

pending, err := transactions.GetAcctPendingFromNonce(db, principals[0], 1)
require.NoError(t, err)
require.Len(t, pending, 1)
require.Equal(t, pending[0].ID, txs[1].ID)

pending, err = transactions.GetAcctPendingFromNonce(db, principals[1], 0)
require.NoError(t, err)
require.Len(t, pending, 1)
require.Equal(t, pending[0].ID, txs[2].ID)

has, err := transactions.Has(db, txs[0].ID)
require.False(t, has)
require.NoError(t, err)

has, err = transactions.HasEvicted(db, txs[0].ID)
require.True(t, has)
require.NoError(t, err)
}

func TestPruneEvicted(t *testing.T) {
txId := types.TransactionID{1}
db := statesql.InMemoryTest(t)
db.Exec(`insert into evicted_mempool (id,time)
values (?1,DATETIME(CURRENT_TIMESTAMP,'-13 hours'));`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, txId.Bytes())
}, nil)

has, err := transactions.HasEvicted(db, txId)
require.True(t, has)
require.NoError(t, err)

err = transactions.PruneEvicted(db)
require.NoError(t, err)

has, err = transactions.HasEvicted(db, txId)
require.False(t, has)
require.NoError(t, err)
}
10 changes: 10 additions & 0 deletions txs/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,11 @@ func (ac *accountCache) resetAfterApply(
ac.txsByNonce = list.New()
ac.startNonce = nextNonce
ac.startBalance = newBalance

err := transactions.EvictPendingNonce(db, ac.addr, ac.startNonce)
if err != nil {
return fmt.Errorf("prune pending: %w", err)
}
return ac.addPendingFromNonce(logger, db, ac.startNonce, applied)
}

Expand Down Expand Up @@ -776,6 +781,11 @@ func (c *Cache) ApplyLayer(
}
acctResetDuration.Observe(float64(time.Since(t2)))
}

err := transactions.PruneEvicted(db)
if err != nil {
logger.Warn("failed to prune evicted", zap.Error(err))
}
return nil
}

Expand Down
8 changes: 3 additions & 5 deletions txs/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,10 @@ func TestCache_Account_HappyFlow(t *testing.T) {
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance+income)
// mempool is unchanged
checkMempool(t, tc.Cache, expectedMempool)

// pruning has removed old and ineffective txs
for _, mtx := range append(oldNonces, sameNonces...) {
got, err := transactions.Get(tc.db, mtx.ID)
require.NoError(t, err)
require.Equal(t, types.MEMPOOL, got.State)
checkTXNotInDB(t, tc.db, mtx.ID)
}

// revert to one layer before lid
Expand All @@ -357,8 +357,6 @@ func TestCache_Account_HappyFlow(t *testing.T) {
}
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance)
checkTXStateFromDB(t, tc.db, mtxs, types.MEMPOOL)
checkTXStateFromDB(t, tc.db, oldNonces, types.MEMPOOL)
checkTXStateFromDB(t, tc.db, sameNonces, types.MEMPOOL)
}

func TestCache_Account_TXInMultipleLayers(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions txs/conservative_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,7 @@ func ShuffleWithNonceOrder(
})))
return result
}

func (cs *ConservativeState) HasEvicted(tid types.TransactionID) (bool, error) {
return transactions.HasEvicted(cs.db, tid)
}

0 comments on commit d40da73

Please sign in to comment.