Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BLOCK-2245] prevent duplicated transaction #47

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions dgraphql/resolvers/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ type matchOrError struct {
err error
}

func processMatchOrError(ctx context.Context, m *matchOrError, rows [][]*pbcodec.TransactionEvent, rowMap map[string]int, abiCodecClient pbabicodec.DecoderClient) (*SearchTransactionForwardResponse, error) {
func processMatchOrError(ctx context.Context, m *matchOrError, rows [][]*pbcodec.TransactionEvent, rowMap map[string]int, abiCodecClient pbabicodec.DecoderClient, processedTrxCache *TrxCache) (*SearchTransactionForwardResponse, error) {
zl := logging.Logger(ctx, zlog)
if m.err != nil {
return &SearchTransactionForwardResponse{
Expand Down Expand Up @@ -385,6 +385,20 @@ func processMatchOrError(ctx context.Context, m *matchOrError, rows [][]*pbcodec
out.blockID = eosMatch.Block.BlockID
out.blockHeader = eosMatch.Block.BlockHeader
out.trxTrace = eosMatch.Block.Trace

//ultra-duncan --- BLOCK-2245 prevent duplication when query
if out.trxTrace != nil {
//Skip if transaction ID if already been streammed
if processedTrxCache.Exists(out.trxTrace.GetId()) {
zl.Error("skipping duplicated transaction", zap.String("trx_trace_id", out.trxTrace.GetId()))
return &SearchTransactionForwardResponse{
err: dgraphql.Errorf(ctx, "Duplication Transaction error"),
}, nil
}
//Saved proccessed transaction
processedTrxCache.Put(out.trxTrace.GetId())
}

return out, nil
}

Expand Down Expand Up @@ -469,6 +483,9 @@ func (r *Root) streamSearchTracesBoth(forward bool, ctx context.Context, args St
return nil, dgraphql.Errorf(ctx, "internal server error: connection to live search failed")
}

//ultra-duncan --- BLOCK-2245 prevent duplication when query --- Initialize how many cache should be keep track
processedTrxCache := NewTrxCache(100000)

//////////////////////////////////////////////////////////////////////
// Billable event on GraphQL Subscriptions
// WARNING : Here we only track inbound subscription init
Expand Down Expand Up @@ -535,7 +552,7 @@ func (r *Root) streamSearchTracesBoth(forward bool, ctx context.Context, args St
var out []interface{}
for _, v := range batch {
m := v.(*matchOrError)
resp, err := processMatchOrError(ctx, m, rows, rowToIndex, r.abiCodecClient)
resp, err := processMatchOrError(ctx, m, rows, rowToIndex, r.abiCodecClient, processedTrxCache)
if err != nil {
return out, err
}
Expand Down
85 changes: 81 additions & 4 deletions dgraphql/resolvers/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func newSearchMatchArchive(trxID string) *pbsearch.SearchMatch {
func newSearchMatchLive(trxID string, idx int) *pbsearch.SearchMatch {
cs, err := ptypes.MarshalAny(&pbsearcheos.Match{
Block: &pbsearcheos.BlockTrxPayload{
Trace: &pbcodec.TransactionTrace{Index: uint64(idx)},
Trace: &pbcodec.TransactionTrace{
Id: trxID,
Index: uint64(idx),
},
},
})
if err != nil {
Expand All @@ -82,6 +85,19 @@ func newDgraphqlResponse(trxID string, idx int) *SearchTransactionForwardRespons
},
}
}

func newLiveDgraphqlResponse(trxID string, idx int) *SearchTransactionForwardResponse {
return &SearchTransactionForwardResponse{
SearchTransactionBackwardResponse: SearchTransactionBackwardResponse{
trxIDPrefix: trxID,
trxTrace: &pbcodec.TransactionTrace{
Index: uint64(idx),
Id: trxID,
},
},
}
}

func TestSubscriptionSearchForward(t *testing.T) {
ctx := dtracing.NewFixedTraceIDInContext(context.Background(), "00000000000000000000000000000000")

Expand Down Expand Up @@ -142,9 +158,9 @@ func TestSubscriptionSearchForward(t *testing.T) {
newDgraphqlResponse("trx001", 6),
newDgraphqlResponse("trx002", 7),
newDgraphqlResponse("trx022", 11),
newDgraphqlResponse("trx003", 8),
newDgraphqlResponse("trx004", 9),
newDgraphqlResponse("trx005", 10),
newLiveDgraphqlResponse("trx003", 8),
newLiveDgraphqlResponse("trx004", 9),
newLiveDgraphqlResponse("trx005", 10),
},

expectError: nil,
Expand Down Expand Up @@ -174,3 +190,64 @@ func TestSubscriptionSearchForward(t *testing.T) {
})
}
}

func TestSubscriptionSearchForwardDuplication(t *testing.T) {
ctx := dtracing.NewFixedTraceIDInContext(context.Background(), "00000000000000000000000000000000")

tests := []struct {
name string
fromRouter []interface{}
fromDB map[string][]*pbcodec.TransactionEvent
expect []*SearchTransactionForwardResponse
expectError error
}{
{
name: "duplication",
fromRouter: []interface{}{
newSearchMatchLive("trx003", 8),
newSearchMatchLive("trx004", 9),
newSearchMatchLive("trx005", 10),
newSearchMatchLive("trx004", 9),
newSearchMatchLive("trx005", 10),
},
fromDB: nil,
expect: []*SearchTransactionForwardResponse{
newLiveDgraphqlResponse("trx003", 8),
newLiveDgraphqlResponse("trx004", 9),
newLiveDgraphqlResponse("trx005", 10),
{
err: dgraphql.Errorf(ctx, "Duplication Transaction error"),
},
{
err: dgraphql.Errorf(ctx, "Duplication Transaction error"),
},
},

expectError: nil,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {

root := &Root{
searchClient: pbsearch.NewTestRouterClient(test.fromRouter),
}

res, err := root.streamSearchTracesBoth(true, ctx, StreamSearchArgs{})

if test.expectError != nil {
t.Log(err)
require.Error(t, err)
} else {
require.NoError(t, err)
var expect []*SearchTransactionForwardResponse
for el := range res {
expect = append(expect, el)
}

assert.Equal(t, test.expect, expect)
}
})
}
}
72 changes: 72 additions & 0 deletions dgraphql/resolvers/trxcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//Implement Least Recently Used cache to check for duplication transaction ID

package resolvers

import (
"container/list"
)

type TrxCache struct {
capacity int
data map[string]interface{}
Duncan-Ultra marked this conversation as resolved.
Show resolved Hide resolved
order *list.List
Duncan-Ultra marked this conversation as resolved.
Show resolved Hide resolved
}

func NewTrxCache(capacity int) *TrxCache {
return &TrxCache{
capacity: capacity,
data: make(map[string]interface{}),
order: list.New(),
}
}

func (c *TrxCache) Put(key string) {
if _, exists := c.data[key]; exists {
// If key already exists, update value and move it to the front
c.data[key] = true
c.moveToFront(key)
return
}

// If at capacity, remove the oldest entry
if c.order.Len() == c.capacity {
c.evictOldest()
}

// Add new entry to the map and the front of the list
c.data[key] = true
c.order.PushFront(key)
}

func (c *TrxCache) Get(key string) (interface{}, bool) {
if value, exists := c.data[key]; exists {
// Move accessed item to the front
c.moveToFront(key)
return value, true
}
return nil, false
}

func (c *TrxCache) Exists(key string) bool {
_, exists := c.data[key]
return exists
}

func (c *TrxCache) evictOldest() {
// Remove the oldest entry (tail of the list)
oldest := c.order.Back()
if oldest != nil {
c.order.Remove(oldest)
delete(c.data, oldest.Value.(string))
}
}

func (c *TrxCache) moveToFront(key string) {
// Move accessed or updated key to the front of the list
for e := c.order.Front(); e != nil; e = e.Next() {
if e.Value.(string) == key {
c.order.MoveToFront(e)
break
}
}
}
Loading