Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BLOCK-2245] prevent duplicated transaction #47

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions dgraphql/resolvers/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ type matchOrError struct {
err error
}

func processMatchOrError(ctx context.Context, m *matchOrError, rows [][]*pbcodec.TransactionEvent, rowMap map[string]int, abiCodecClient pbabicodec.DecoderClient) (*SearchTransactionForwardResponse, error) {
func processMatchOrError(ctx context.Context, m *matchOrError, rows [][]*pbcodec.TransactionEvent, rowMap map[string]int, abiCodecClient pbabicodec.DecoderClient, processedTrxCache *TrxCache) (*SearchTransactionForwardResponse, error) {
zl := logging.Logger(ctx, zlog)
if m.err != nil {
return &SearchTransactionForwardResponse{
Expand Down Expand Up @@ -385,6 +385,20 @@ func processMatchOrError(ctx context.Context, m *matchOrError, rows [][]*pbcodec
out.blockID = eosMatch.Block.BlockID
out.blockHeader = eosMatch.Block.BlockHeader
out.trxTrace = eosMatch.Block.Trace

//ultra-duncan --- BLOCK-2245 prevent duplication when query
if out.trxTrace != nil {
//Skip if transaction ID if already been streammed
if processedTrxCache.Exists(out.trxTrace.GetId()) {
zl.Error("skipping duplicated transaction", zap.String("trx_trace_id", out.trxTrace.GetId()))
return &SearchTransactionForwardResponse{
err: dgraphql.Errorf(ctx, "Duplication Transaction error"),
}, nil
}
//Saved proccessed transaction
processedTrxCache.Put(out.trxTrace.GetId())
}

return out, nil
}

Expand Down Expand Up @@ -469,6 +483,9 @@ func (r *Root) streamSearchTracesBoth(forward bool, ctx context.Context, args St
return nil, dgraphql.Errorf(ctx, "internal server error: connection to live search failed")
}

//ultra-duncan --- BLOCK-2245 prevent duplication when query --- Initialize how many cache should be keep track
processedTrxCache := NewTrxCache(100000)

//////////////////////////////////////////////////////////////////////
// Billable event on GraphQL Subscriptions
// WARNING : Here we only track inbound subscription init
Expand Down Expand Up @@ -535,7 +552,7 @@ func (r *Root) streamSearchTracesBoth(forward bool, ctx context.Context, args St
var out []interface{}
for _, v := range batch {
m := v.(*matchOrError)
resp, err := processMatchOrError(ctx, m, rows, rowToIndex, r.abiCodecClient)
resp, err := processMatchOrError(ctx, m, rows, rowToIndex, r.abiCodecClient, processedTrxCache)
if err != nil {
return out, err
}
Expand Down
85 changes: 81 additions & 4 deletions dgraphql/resolvers/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func newSearchMatchArchive(trxID string) *pbsearch.SearchMatch {
func newSearchMatchLive(trxID string, idx int) *pbsearch.SearchMatch {
cs, err := ptypes.MarshalAny(&pbsearcheos.Match{
Block: &pbsearcheos.BlockTrxPayload{
Trace: &pbcodec.TransactionTrace{Index: uint64(idx)},
Trace: &pbcodec.TransactionTrace{
Id: trxID,
Index: uint64(idx),
},
},
})
if err != nil {
Expand All @@ -82,6 +85,19 @@ func newDgraphqlResponse(trxID string, idx int) *SearchTransactionForwardRespons
},
}
}

func newLiveDgraphqlResponse(trxID string, idx int) *SearchTransactionForwardResponse {
return &SearchTransactionForwardResponse{
SearchTransactionBackwardResponse: SearchTransactionBackwardResponse{
trxIDPrefix: trxID,
trxTrace: &pbcodec.TransactionTrace{
Index: uint64(idx),
Id: trxID,
},
},
}
}

func TestSubscriptionSearchForward(t *testing.T) {
ctx := dtracing.NewFixedTraceIDInContext(context.Background(), "00000000000000000000000000000000")

Expand Down Expand Up @@ -142,9 +158,9 @@ func TestSubscriptionSearchForward(t *testing.T) {
newDgraphqlResponse("trx001", 6),
newDgraphqlResponse("trx002", 7),
newDgraphqlResponse("trx022", 11),
newDgraphqlResponse("trx003", 8),
newDgraphqlResponse("trx004", 9),
newDgraphqlResponse("trx005", 10),
newLiveDgraphqlResponse("trx003", 8),
newLiveDgraphqlResponse("trx004", 9),
newLiveDgraphqlResponse("trx005", 10),
},

expectError: nil,
Expand Down Expand Up @@ -174,3 +190,64 @@ func TestSubscriptionSearchForward(t *testing.T) {
})
}
}

func TestSubscriptionSearchForwardDuplication(t *testing.T) {
ctx := dtracing.NewFixedTraceIDInContext(context.Background(), "00000000000000000000000000000000")

tests := []struct {
name string
fromRouter []interface{}
fromDB map[string][]*pbcodec.TransactionEvent
expect []*SearchTransactionForwardResponse
expectError error
}{
{
name: "duplication",
fromRouter: []interface{}{
newSearchMatchLive("trx003", 8),
newSearchMatchLive("trx004", 9),
newSearchMatchLive("trx005", 10),
newSearchMatchLive("trx004", 9),
newSearchMatchLive("trx005", 10),
},
fromDB: nil,
expect: []*SearchTransactionForwardResponse{
newLiveDgraphqlResponse("trx003", 8),
newLiveDgraphqlResponse("trx004", 9),
newLiveDgraphqlResponse("trx005", 10),
{
err: dgraphql.Errorf(ctx, "Duplication Transaction error"),
},
{
err: dgraphql.Errorf(ctx, "Duplication Transaction error"),
},
},

expectError: nil,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {

root := &Root{
searchClient: pbsearch.NewTestRouterClient(test.fromRouter),
}

res, err := root.streamSearchTracesBoth(true, ctx, StreamSearchArgs{})

if test.expectError != nil {
t.Log(err)
require.Error(t, err)
} else {
require.NoError(t, err)
var expect []*SearchTransactionForwardResponse
for el := range res {
expect = append(expect, el)
}

assert.Equal(t, test.expect, expect)
}
})
}
}
52 changes: 52 additions & 0 deletions dgraphql/resolvers/trxcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//Implement Least Recently Used cache to check for duplication transaction ID

package resolvers

import (
"container/list"
)

type TrxCache struct {
capacity int
data map[string]*list.Element
order list.List
}

func NewTrxCache(capacity int) *TrxCache {
return &TrxCache{
capacity: capacity,
data: make(map[string]*list.Element),
order: *list.New(),
}
}

func (c *TrxCache) Put(key string) {
if ele, exists := c.data[key]; exists {
// If key already exists, move it to the front
c.order.MoveToFront(ele)
return
}

// If at capacity, remove the oldest entry
if c.order.Len() == c.capacity {
c.evictOldest()
}

// Add new entry to the front of the list and to the map
ele := c.order.PushFront(key)
c.data[key] = ele
}

func (c *TrxCache) Exists(key string) bool {
_, exists := c.data[key]
return exists
}

func (c *TrxCache) evictOldest() {
// Remove the oldest entry (tail of the list)
oldest := c.order.Back()
if oldest != nil {
c.order.Remove(oldest)
delete(c.data, oldest.Value.(string))
}
}
Loading