Skip to content

Commit

Permalink
[BLOCK-2245] prevent duplicated transaction (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
Duncan-Ultra authored May 29, 2024
1 parent 48dd273 commit 4da9cc9
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 6 deletions.
21 changes: 19 additions & 2 deletions dgraphql/resolvers/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ type matchOrError struct {
err error
}

func processMatchOrError(ctx context.Context, m *matchOrError, rows [][]*pbcodec.TransactionEvent, rowMap map[string]int, abiCodecClient pbabicodec.DecoderClient) (*SearchTransactionForwardResponse, error) {
func processMatchOrError(ctx context.Context, m *matchOrError, rows [][]*pbcodec.TransactionEvent, rowMap map[string]int, abiCodecClient pbabicodec.DecoderClient, processedTrxCache *TrxCache) (*SearchTransactionForwardResponse, error) {
zl := logging.Logger(ctx, zlog)
if m.err != nil {
return &SearchTransactionForwardResponse{
Expand Down Expand Up @@ -385,6 +385,20 @@ func processMatchOrError(ctx context.Context, m *matchOrError, rows [][]*pbcodec
out.blockID = eosMatch.Block.BlockID
out.blockHeader = eosMatch.Block.BlockHeader
out.trxTrace = eosMatch.Block.Trace

//ultra-duncan --- BLOCK-2245 prevent duplication when query
if out.trxTrace != nil {
//Skip if transaction ID if already been streammed
if processedTrxCache.Exists(out.trxTrace.GetId()) {
zl.Error("skipping duplicated transaction", zap.String("trx_trace_id", out.trxTrace.GetId()))
return &SearchTransactionForwardResponse{
err: dgraphql.Errorf(ctx, "Duplication Transaction error"),
}, nil
}
//Saved proccessed transaction
processedTrxCache.Put(out.trxTrace.GetId())
}

return out, nil
}

Expand Down Expand Up @@ -469,6 +483,9 @@ func (r *Root) streamSearchTracesBoth(forward bool, ctx context.Context, args St
return nil, dgraphql.Errorf(ctx, "internal server error: connection to live search failed")
}

//ultra-duncan --- BLOCK-2245 prevent duplication when query --- Initialize how many cache should be keep track
processedTrxCache := NewTrxCache(100000)

//////////////////////////////////////////////////////////////////////
// Billable event on GraphQL Subscriptions
// WARNING : Here we only track inbound subscription init
Expand Down Expand Up @@ -535,7 +552,7 @@ func (r *Root) streamSearchTracesBoth(forward bool, ctx context.Context, args St
var out []interface{}
for _, v := range batch {
m := v.(*matchOrError)
resp, err := processMatchOrError(ctx, m, rows, rowToIndex, r.abiCodecClient)
resp, err := processMatchOrError(ctx, m, rows, rowToIndex, r.abiCodecClient, processedTrxCache)
if err != nil {
return out, err
}
Expand Down
85 changes: 81 additions & 4 deletions dgraphql/resolvers/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func newSearchMatchArchive(trxID string) *pbsearch.SearchMatch {
func newSearchMatchLive(trxID string, idx int) *pbsearch.SearchMatch {
cs, err := ptypes.MarshalAny(&pbsearcheos.Match{
Block: &pbsearcheos.BlockTrxPayload{
Trace: &pbcodec.TransactionTrace{Index: uint64(idx)},
Trace: &pbcodec.TransactionTrace{
Id: trxID,
Index: uint64(idx),
},
},
})
if err != nil {
Expand All @@ -82,6 +85,19 @@ func newDgraphqlResponse(trxID string, idx int) *SearchTransactionForwardRespons
},
}
}

func newLiveDgraphqlResponse(trxID string, idx int) *SearchTransactionForwardResponse {
return &SearchTransactionForwardResponse{
SearchTransactionBackwardResponse: SearchTransactionBackwardResponse{
trxIDPrefix: trxID,
trxTrace: &pbcodec.TransactionTrace{
Index: uint64(idx),
Id: trxID,
},
},
}
}

func TestSubscriptionSearchForward(t *testing.T) {
ctx := dtracing.NewFixedTraceIDInContext(context.Background(), "00000000000000000000000000000000")

Expand Down Expand Up @@ -142,9 +158,9 @@ func TestSubscriptionSearchForward(t *testing.T) {
newDgraphqlResponse("trx001", 6),
newDgraphqlResponse("trx002", 7),
newDgraphqlResponse("trx022", 11),
newDgraphqlResponse("trx003", 8),
newDgraphqlResponse("trx004", 9),
newDgraphqlResponse("trx005", 10),
newLiveDgraphqlResponse("trx003", 8),
newLiveDgraphqlResponse("trx004", 9),
newLiveDgraphqlResponse("trx005", 10),
},

expectError: nil,
Expand Down Expand Up @@ -174,3 +190,64 @@ func TestSubscriptionSearchForward(t *testing.T) {
})
}
}

func TestSubscriptionSearchForwardDuplication(t *testing.T) {
ctx := dtracing.NewFixedTraceIDInContext(context.Background(), "00000000000000000000000000000000")

tests := []struct {
name string
fromRouter []interface{}
fromDB map[string][]*pbcodec.TransactionEvent
expect []*SearchTransactionForwardResponse
expectError error
}{
{
name: "duplication",
fromRouter: []interface{}{
newSearchMatchLive("trx003", 8),
newSearchMatchLive("trx004", 9),
newSearchMatchLive("trx005", 10),
newSearchMatchLive("trx004", 9),
newSearchMatchLive("trx005", 10),
},
fromDB: nil,
expect: []*SearchTransactionForwardResponse{
newLiveDgraphqlResponse("trx003", 8),
newLiveDgraphqlResponse("trx004", 9),
newLiveDgraphqlResponse("trx005", 10),
{
err: dgraphql.Errorf(ctx, "Duplication Transaction error"),
},
{
err: dgraphql.Errorf(ctx, "Duplication Transaction error"),
},
},

expectError: nil,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {

root := &Root{
searchClient: pbsearch.NewTestRouterClient(test.fromRouter),
}

res, err := root.streamSearchTracesBoth(true, ctx, StreamSearchArgs{})

if test.expectError != nil {
t.Log(err)
require.Error(t, err)
} else {
require.NoError(t, err)
var expect []*SearchTransactionForwardResponse
for el := range res {
expect = append(expect, el)
}

assert.Equal(t, test.expect, expect)
}
})
}
}
52 changes: 52 additions & 0 deletions dgraphql/resolvers/trxcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//Implement Least Recently Used cache to check for duplication transaction ID

package resolvers

import (
"container/list"
)

type TrxCache struct {
capacity int
data map[string]*list.Element
order list.List
}

func NewTrxCache(capacity int) *TrxCache {
return &TrxCache{
capacity: capacity,
data: make(map[string]*list.Element),
order: *list.New(),
}
}

func (c *TrxCache) Put(key string) {
if ele, exists := c.data[key]; exists {
// If key already exists, move it to the front
c.order.MoveToFront(ele)
return
}

// If at capacity, remove the oldest entry
if c.order.Len() == c.capacity {
c.evictOldest()
}

// Add new entry to the front of the list and to the map
ele := c.order.PushFront(key)
c.data[key] = ele
}

func (c *TrxCache) Exists(key string) bool {
_, exists := c.data[key]
return exists
}

func (c *TrxCache) evictOldest() {
// Remove the oldest entry (tail of the list)
oldest := c.order.Back()
if oldest != nil {
c.order.Remove(oldest)
delete(c.data, oldest.Value.(string))
}
}

0 comments on commit 4da9cc9

Please sign in to comment.