diff --git a/dgraphql/resolvers/resolver.go b/dgraphql/resolvers/resolver.go index 1ab82047..7457d521 100644 --- a/dgraphql/resolvers/resolver.go +++ b/dgraphql/resolvers/resolver.go @@ -355,7 +355,7 @@ type matchOrError struct { err error } -func processMatchOrError(ctx context.Context, m *matchOrError, rows [][]*pbcodec.TransactionEvent, rowMap map[string]int, abiCodecClient pbabicodec.DecoderClient) (*SearchTransactionForwardResponse, error) { +func processMatchOrError(ctx context.Context, m *matchOrError, rows [][]*pbcodec.TransactionEvent, rowMap map[string]int, abiCodecClient pbabicodec.DecoderClient, processedTrxCache *TrxCache) (*SearchTransactionForwardResponse, error) { zl := logging.Logger(ctx, zlog) if m.err != nil { return &SearchTransactionForwardResponse{ @@ -385,6 +385,20 @@ func processMatchOrError(ctx context.Context, m *matchOrError, rows [][]*pbcodec out.blockID = eosMatch.Block.BlockID out.blockHeader = eosMatch.Block.BlockHeader out.trxTrace = eosMatch.Block.Trace + + //ultra-duncan --- BLOCK-2245 prevent duplication when query + if out.trxTrace != nil { + //Skip if transaction ID if already been streammed + if processedTrxCache.Exists(out.trxTrace.GetId()) { + zl.Error("skipping duplicated transaction", zap.String("trx_trace_id", out.trxTrace.GetId())) + return &SearchTransactionForwardResponse{ + err: dgraphql.Errorf(ctx, "Duplication Transaction error"), + }, nil + } + //Saved proccessed transaction + processedTrxCache.Put(out.trxTrace.GetId()) + } + return out, nil } @@ -469,6 +483,9 @@ func (r *Root) streamSearchTracesBoth(forward bool, ctx context.Context, args St return nil, dgraphql.Errorf(ctx, "internal server error: connection to live search failed") } + //ultra-duncan --- BLOCK-2245 prevent duplication when query --- Initialize how many cache should be keep track + processedTrxCache := NewTrxCache(100000) + ////////////////////////////////////////////////////////////////////// // Billable event on GraphQL Subscriptions // WARNING : Here we only track inbound subscription init @@ -535,7 +552,7 @@ func (r *Root) streamSearchTracesBoth(forward bool, ctx context.Context, args St var out []interface{} for _, v := range batch { m := v.(*matchOrError) - resp, err := processMatchOrError(ctx, m, rows, rowToIndex, r.abiCodecClient) + resp, err := processMatchOrError(ctx, m, rows, rowToIndex, r.abiCodecClient, processedTrxCache) if err != nil { return out, err } diff --git a/dgraphql/resolvers/resolver_test.go b/dgraphql/resolvers/resolver_test.go index 6be76674..f1321645 100644 --- a/dgraphql/resolvers/resolver_test.go +++ b/dgraphql/resolvers/resolver_test.go @@ -59,7 +59,10 @@ func newSearchMatchArchive(trxID string) *pbsearch.SearchMatch { func newSearchMatchLive(trxID string, idx int) *pbsearch.SearchMatch { cs, err := ptypes.MarshalAny(&pbsearcheos.Match{ Block: &pbsearcheos.BlockTrxPayload{ - Trace: &pbcodec.TransactionTrace{Index: uint64(idx)}, + Trace: &pbcodec.TransactionTrace{ + Id: trxID, + Index: uint64(idx), + }, }, }) if err != nil { @@ -82,6 +85,19 @@ func newDgraphqlResponse(trxID string, idx int) *SearchTransactionForwardRespons }, } } + +func newLiveDgraphqlResponse(trxID string, idx int) *SearchTransactionForwardResponse { + return &SearchTransactionForwardResponse{ + SearchTransactionBackwardResponse: SearchTransactionBackwardResponse{ + trxIDPrefix: trxID, + trxTrace: &pbcodec.TransactionTrace{ + Index: uint64(idx), + Id: trxID, + }, + }, + } +} + func TestSubscriptionSearchForward(t *testing.T) { ctx := dtracing.NewFixedTraceIDInContext(context.Background(), "00000000000000000000000000000000") @@ -142,9 +158,9 @@ func TestSubscriptionSearchForward(t *testing.T) { newDgraphqlResponse("trx001", 6), newDgraphqlResponse("trx002", 7), newDgraphqlResponse("trx022", 11), - newDgraphqlResponse("trx003", 8), - newDgraphqlResponse("trx004", 9), - newDgraphqlResponse("trx005", 10), + newLiveDgraphqlResponse("trx003", 8), + newLiveDgraphqlResponse("trx004", 9), + newLiveDgraphqlResponse("trx005", 10), }, expectError: nil, @@ -174,3 +190,64 @@ func TestSubscriptionSearchForward(t *testing.T) { }) } } + +func TestSubscriptionSearchForwardDuplication(t *testing.T) { + ctx := dtracing.NewFixedTraceIDInContext(context.Background(), "00000000000000000000000000000000") + + tests := []struct { + name string + fromRouter []interface{} + fromDB map[string][]*pbcodec.TransactionEvent + expect []*SearchTransactionForwardResponse + expectError error + }{ + { + name: "duplication", + fromRouter: []interface{}{ + newSearchMatchLive("trx003", 8), + newSearchMatchLive("trx004", 9), + newSearchMatchLive("trx005", 10), + newSearchMatchLive("trx004", 9), + newSearchMatchLive("trx005", 10), + }, + fromDB: nil, + expect: []*SearchTransactionForwardResponse{ + newLiveDgraphqlResponse("trx003", 8), + newLiveDgraphqlResponse("trx004", 9), + newLiveDgraphqlResponse("trx005", 10), + { + err: dgraphql.Errorf(ctx, "Duplication Transaction error"), + }, + { + err: dgraphql.Errorf(ctx, "Duplication Transaction error"), + }, + }, + + expectError: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + root := &Root{ + searchClient: pbsearch.NewTestRouterClient(test.fromRouter), + } + + res, err := root.streamSearchTracesBoth(true, ctx, StreamSearchArgs{}) + + if test.expectError != nil { + t.Log(err) + require.Error(t, err) + } else { + require.NoError(t, err) + var expect []*SearchTransactionForwardResponse + for el := range res { + expect = append(expect, el) + } + + assert.Equal(t, test.expect, expect) + } + }) + } +} diff --git a/dgraphql/resolvers/trxcache.go b/dgraphql/resolvers/trxcache.go new file mode 100644 index 00000000..94a06a31 --- /dev/null +++ b/dgraphql/resolvers/trxcache.go @@ -0,0 +1,52 @@ +//Implement Least Recently Used cache to check for duplication transaction ID + +package resolvers + +import ( + "container/list" +) + +type TrxCache struct { + capacity int + data map[string]*list.Element + order list.List +} + +func NewTrxCache(capacity int) *TrxCache { + return &TrxCache{ + capacity: capacity, + data: make(map[string]*list.Element), + order: *list.New(), + } +} + +func (c *TrxCache) Put(key string) { + if ele, exists := c.data[key]; exists { + // If key already exists, move it to the front + c.order.MoveToFront(ele) + return + } + + // If at capacity, remove the oldest entry + if c.order.Len() == c.capacity { + c.evictOldest() + } + + // Add new entry to the front of the list and to the map + ele := c.order.PushFront(key) + c.data[key] = ele +} + +func (c *TrxCache) Exists(key string) bool { + _, exists := c.data[key] + return exists +} + +func (c *TrxCache) evictOldest() { + // Remove the oldest entry (tail of the list) + oldest := c.order.Back() + if oldest != nil { + c.order.Remove(oldest) + delete(c.data, oldest.Value.(string)) + } +}