Skip to content

Commit

Permalink
restruct parsing flow (#60)
Browse files Browse the repository at this point in the history
* restruct parsing flow
  • Loading branch information
ngocthanh1389 authored Aug 7, 2024
1 parent 94158b2 commit 509dd73
Show file tree
Hide file tree
Showing 18 changed files with 154 additions and 88 deletions.
20 changes: 7 additions & 13 deletions cmd/tradelogs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"github.com/KyberNetwork/tradelogs/pkg/dune"
"github.com/KyberNetwork/tradelogs/pkg/parser"
"github.com/KyberNetwork/tradelogs/pkg/parser/bebop"
"github.com/KyberNetwork/tradelogs/pkg/parser/oneinch"
"github.com/KyberNetwork/tradelogs/pkg/parser/oneinchv6"
"github.com/KyberNetwork/tradelogs/pkg/parser/uniswapx"
"github.com/KyberNetwork/tradelogs/pkg/parser/zxrfqv3"
"github.com/KyberNetwork/tradelogs/pkg/pricefiller"
"github.com/KyberNetwork/tradelogs/pkg/rpcnode"
"github.com/KyberNetwork/tradelogs/pkg/tracecall"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"

libapp "github.com/KyberNetwork/tradelogs/internal/app"
"github.com/KyberNetwork/tradelogs/internal/bigquery"
Expand All @@ -26,15 +28,11 @@ import (
tradelogs "github.com/KyberNetwork/tradelogs/internal/server/tradelogs"
"github.com/KyberNetwork/tradelogs/internal/worker"
"github.com/KyberNetwork/tradelogs/pkg/evmlistenerclient"
"github.com/KyberNetwork/tradelogs/pkg/parser/hashflow"
hashflowv3 "github.com/KyberNetwork/tradelogs/pkg/parser/hashflow_v3"
"github.com/KyberNetwork/tradelogs/pkg/parser/kyberswap"
kyberswaprfq "github.com/KyberNetwork/tradelogs/pkg/parser/kyberswap_rfq"
"github.com/KyberNetwork/tradelogs/pkg/parser/native"
"github.com/KyberNetwork/tradelogs/pkg/parser/paraswap"
"github.com/KyberNetwork/tradelogs/pkg/parser/tokenlon"
"github.com/KyberNetwork/tradelogs/pkg/parser/zxotc"
"github.com/KyberNetwork/tradelogs/pkg/parser/zxrfq"
"github.com/KyberNetwork/tradelogs/pkg/storage"
"github.com/go-redis/redis/v8"
"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -93,25 +91,21 @@ func run(c *cli.Context) error {
ResponseHeaderTimeout: time.Second * 30,
},
}
rpcClient, err := rpcnode.NewClient(httpClient, c.String(libapp.RPCUrlFlagName))
ethClient, err := ethclient.Dial(c.String(libapp.RPCUrlFlagName))
if err != nil {
panic(err)
}
traceCalls := tracecall.NewCache(rpcClient)
traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient))

parsers := []parser.Parser{kyberswap.MustNewParser(),
zxotc.MustNewParser(),
zxrfq.MustNewParser(),
tokenlon.MustNewParser(),
paraswap.MustNewParser(),
hashflow.MustNewParser(),
native.MustNewParser(),
kyberswaprfq.MustNewParser(),
hashflowv3.MustNewParser(),
oneinch.MustNewParser(traceCalls),
oneinchv6.MustNewParser(traceCalls),
uniswapx.MustNewParser(traceCalls),
bebop.MustNewParser(traceCalls),
zxrfqv3.MustNewParserWithDeployer(traceCalls, ethClient, common.HexToAddress(parser.Deployer0xV3)),
}

binanceClient := binance.NewClient(c.String(pricefiller.BinanceAPIKeyFlag.Name), c.String(pricefiller.BinanceSecretKeyFlag.Name))
Expand All @@ -122,7 +116,7 @@ func run(c *cli.Context) error {
}

tradeLogChan := make(chan storage.TradeLog, 1000)
w, err := worker.New(l, s, listener, priceFiller, tradeLogChan, parsers...)
w, err := worker.New(l, s, listener, priceFiller, tradeLogChan, parsers)
if err != nil {
l.Errorw("Error while init worker")
return err
Expand Down
46 changes: 26 additions & 20 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,27 @@ import (
"github.com/KyberNetwork/tradelogs/pkg/parser"
"github.com/KyberNetwork/tradelogs/pkg/pricefiller"
"github.com/KyberNetwork/tradelogs/pkg/storage"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"go.uber.org/zap"
)

type Worker struct {
listener *evmlistenerclient.Client
l *zap.SugaredLogger
s *storage.Storage
p map[string]parser.Parser
p []parser.Parser
priceFiller *pricefiller.PriceFiller
tradeLogChan chan storage.TradeLog
}

func New(l *zap.SugaredLogger, s *storage.Storage, listener *evmlistenerclient.Client,
priceFiller *pricefiller.PriceFiller, tradeLogChan chan storage.TradeLog,
parsers ...parser.Parser) (*Worker, error) {
p := make(map[string]parser.Parser)
for _, ps := range parsers {
for _, topic := range ps.Topics() {
p[topic] = ps
}
}
parsers []parser.Parser) (*Worker, error) {
return &Worker{
listener: listener,
l: l,
s: s,
p: p,
p: parsers,
priceFiller: priceFiller,
tradeLogChan: tradeLogChan,
}, nil
Expand Down Expand Up @@ -87,14 +82,12 @@ func (w *Worker) processMessages(m []evmlistenerclient.Message) error {
}
}
for _, log := range block.Logs {
if len(log.Topics) == 0 {
continue
}
ps := w.p[log.Topics[0]]
ethLog := convert.ToETHLog(log)
ps := w.findMatchingParser(ethLog)
if ps == nil {
continue
}
order, err := ps.Parse(convert.ToETHLog(log), block.Timestamp)
order, err := ps.Parse(ethLog, block.Timestamp)
if err != nil {
w.l.Errorw("error when parse log", "log", log, "order", order, "err", err)
if err := w.s.InsertErrorLog(storage.EVMLog{
Expand Down Expand Up @@ -143,11 +136,7 @@ func (w *Worker) retryParseLog() error {
if len(topics) == 0 {
continue
}
ps := w.p[topics[0]]
if ps == nil {
continue
}
order, err := ps.Parse(convert.ToETHLog(types.Log{
ethLog := convert.ToETHLog(types.Log{
Address: l.Address,
Topics: topics,
Data: l.Data,
Expand All @@ -156,7 +145,12 @@ func (w *Worker) retryParseLog() error {
TxIndex: l.TxIndex,
BlockHash: l.BlockHash,
Index: l.Index,
}), l.Time)
})
ps := w.findMatchingParser(ethLog)
if ps == nil {
continue
}
order, err := ps.Parse(ethLog, l.Time)
if err != nil {
w.l.Errorw("error when retry log", "log", l, "err", err)
continue
Expand All @@ -178,3 +172,15 @@ func (w *Worker) retryParseLog() error {
}
return nil
}

func (w *Worker) findMatchingParser(log ethTypes.Log) parser.Parser {
var ps parser.Parser
for _, p := range w.p {
if !p.LogFromExchange(log) {
continue
}
ps = p
break
}
return ps
}
9 changes: 4 additions & 5 deletions pkg/parser/bebop/bebop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"math/big"
"net/http"
"testing"
"time"

Expand All @@ -21,11 +20,11 @@ const rpcURL = ""

func TestFetchEvent(t *testing.T) {
t.Skip("Need to add the rpc url that enables the trace call JSON-RPC")
rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL)
ethClient, err := ethclient.Dial(rpcURL)
if err != nil {
panic(err)
}
traceCalls := tracecall.NewCache(rpcClient)
traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient))
p := MustNewParser(traceCalls)
require.Equal(t, p.abi.Events[TradeEvent].ID, common.HexToHash("0xadd7095becdaa725f0f33243630938c861b0bba83dfd217d4055701aa768ec2e"))
client, err := ethclient.Dial(rpcURL)
Expand Down Expand Up @@ -53,11 +52,11 @@ func TestParseEvent(t *testing.T) {
events := []types.Log{}
err := json.Unmarshal([]byte(eventRaw), &events)
require.NoError(t, err)
rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL)
ethClient, err := ethclient.Dial(rpcURL)
if err != nil {
panic(err)
}
traceCalls := tracecall.NewCache(rpcClient)
traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient))
p := MustNewParser(traceCalls)
for _, event := range events {
log, err := p.Parse(event, uint64(time.Now().Unix()))
Expand Down
11 changes: 9 additions & 2 deletions pkg/parser/bebop/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/KyberNetwork/tradelogs/pkg/types"
tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types"
"math/big"
"strings"

"github.com/KyberNetwork/tradelogs/pkg/types"
tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types"

"github.com/KyberNetwork/tradelogs/pkg/decoder"
"github.com/KyberNetwork/tradelogs/pkg/parser"
"github.com/KyberNetwork/tradelogs/pkg/storage"
Expand Down Expand Up @@ -281,3 +282,9 @@ func (p *Parser) ParseWithCallFrame(callFrame *tradingTypes.CallFrame, log ether
}
return p.searchTradeLog(order, types.ConvertCallFrame(callFrame))
}

func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool {
return strings.EqualFold(log.Address.String(), parser.AddrBebop) &&
len(log.Topics) > 0 &&
strings.EqualFold(log.Topics[0].String(), p.eventHash)
}
8 changes: 8 additions & 0 deletions pkg/parser/hashflow_v3/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package hashflowv3
import (
"encoding/json"
"errors"
"strings"

"github.com/KyberNetwork/tradelogs/pkg/decoder"
tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -136,3 +138,9 @@ func (p *Parser) getRFQOrderParams(callFrame *tradingTypes.CallFrame) (*OrderRFQ
}
return nil, nil
}

func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool {
return strings.EqualFold(log.Address.String(), parser.AddrHashflowV3) &&
len(log.Topics) > 0 &&
strings.EqualFold(log.Topics[0].String(), p.eventHash)
}
8 changes: 8 additions & 0 deletions pkg/parser/kyberswap/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package kyberswap

import (
"errors"
"strings"

tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types"

ethereumTypes "github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -85,3 +87,9 @@ func (p *Parser) UseTraceCall() bool {
func (p *Parser) ParseWithCallFrame(_ *tradingTypes.CallFrame, log ethereumTypes.Log, blockTime uint64) (storage.TradeLog, error) {
return p.Parse(log, blockTime)
}

func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool {
return strings.EqualFold(log.Address.String(), parser.AddrKyberswap) &&
len(log.Topics) > 0 &&
strings.EqualFold(log.Topics[0].String(), p.eventHash)
}
8 changes: 8 additions & 0 deletions pkg/parser/kyberswap_rfq/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/KyberNetwork/tradelogs/pkg/decoder"
tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -138,3 +140,9 @@ func (p *Parser) getRFQOrderParams(callFrame *tradingTypes.CallFrame) (*OrderRFQ
}
return nil, nil
}

func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool {
return strings.EqualFold(log.Address.String(), parser.AddrKyberswapRFQ) &&
len(log.Topics) > 0 &&
strings.EqualFold(log.Topics[0].String(), p.eventHash)
}
22 changes: 11 additions & 11 deletions pkg/parser/oneinch/oneinch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package oneinch
import (
"context"
"encoding/json"
"github.com/KyberNetwork/tradelogs/pkg/storage"
tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types"
"math/big"
"net/http"
"strings"
"testing"
"time"

"github.com/KyberNetwork/tradelogs/pkg/storage"
tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types"

"github.com/KyberNetwork/tradelogs/pkg/rpcnode"
"github.com/KyberNetwork/tradelogs/pkg/tracecall"
"github.com/ethereum/go-ethereum"
Expand All @@ -24,11 +24,11 @@ const rpcURL = ""

func TestFetchEvent(t *testing.T) {
t.Skip("Need to add the rpc url that enables the trace call JSON-RPC")
rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL)
ethClient, err := ethclient.Dial(rpcURL)
if err != nil {
panic(err)
}
traceCalls := tracecall.NewCache(rpcClient)
traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient))
p := MustNewParser(traceCalls)
require.Equal(t, p.abi.Events[FilledEvent].ID, common.HexToHash("0xc3b639f02b125bfa160e50739b8c44eb2d1b6908e2b6d5925c6d770f2ca78127"))
client, err := ethclient.Dial(rpcURL)
Expand Down Expand Up @@ -64,11 +64,11 @@ func TestParseEvent(t *testing.T) {
event := types.Log{}
err := json.Unmarshal([]byte(eventRaw), &event)
require.NoError(t, err)
rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL)
ethClient, err := ethclient.Dial(rpcURL)
if err != nil {
panic(err)
}
traceCalls := tracecall.NewCache(rpcClient)
traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient))
p := MustNewParser(traceCalls)
log, err := p.Parse(event, uint64(time.Now().Unix()))
require.NoError(t, err)
Expand All @@ -78,11 +78,11 @@ func TestParseEvent(t *testing.T) {

func TestParseOneinchTradeLog(t *testing.T) {
t.Skip("Need to add the rpc url that enables the trace call JSON-RPC")
rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL)
ethClient, err := ethclient.Dial(rpcURL)
if err != nil {
panic(err)
}
traceCalls := tracecall.NewCache(rpcClient)
traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient))
p := MustNewParser(traceCalls)
client, err := ethclient.Dial(rpcURL)
require.NoError(t, err)
Expand Down Expand Up @@ -129,11 +129,11 @@ func TestParseWithCallFrame(t *testing.T) {
MakerTraits: "",
Expiry: 1719508091,
}
rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL)
ethClient, err := ethclient.Dial(rpcURL)
if err != nil {
panic(err)
}
traceCalls := tracecall.NewCache(rpcClient)
traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient))
p := MustNewParser(traceCalls)
client, err := ethclient.Dial(rpcURL)
require.NoError(t, err)
Expand Down
9 changes: 4 additions & 5 deletions pkg/parser/oneinchv6/oneinchv6_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"math/big"
"net/http"
"testing"
"time"

Expand All @@ -21,11 +20,11 @@ const rpcURL = ""

func TestFetchEvent(t *testing.T) {
t.Skip("Need to add the rpc url that enables the trace call JSON-RPC")
rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL)
ethClient, err := ethclient.Dial(rpcURL)
if err != nil {
panic(err)
}
traceCalls := tracecall.NewCache(rpcClient)
traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient))
p := MustNewParser(traceCalls)
require.Equal(t, p.abi.Events[FilledEvent].ID, common.HexToHash("0xfec331350fce78ba658e082a71da20ac9f8d798a99b3c79681c8440cbfe77e07"))
client, err := ethclient.Dial(rpcURL)
Expand Down Expand Up @@ -53,11 +52,11 @@ func TestParseEvent(t *testing.T) {
events := []types.Log{}
err := json.Unmarshal([]byte(eventRaw), &events)
require.NoError(t, err)
rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL)
ethClient, err := ethclient.Dial(rpcURL)
if err != nil {
panic(err)
}
traceCalls := tracecall.NewCache(rpcClient)
traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient))
p := MustNewParser(traceCalls)
for _, event := range events {
log, err := p.Parse(event, uint64(time.Now().Unix()))
Expand Down
Loading

0 comments on commit 509dd73

Please sign in to comment.