From 509dd73803478ebc39da6fc81779166608e2f5ee Mon Sep 17 00:00:00 2001 From: minhthanh Date: Wed, 7 Aug 2024 14:16:55 +0700 Subject: [PATCH] restruct parsing flow (#60) * restruct parsing flow --- cmd/tradelogs/main.go | 20 ++++------- internal/worker/worker.go | 46 +++++++++++++++----------- pkg/parser/bebop/bebop_test.go | 9 +++-- pkg/parser/bebop/parser.go | 11 ++++-- pkg/parser/hashflow_v3/parser.go | 8 +++++ pkg/parser/kyberswap/parser.go | 8 +++++ pkg/parser/kyberswap_rfq/parser.go | 8 +++++ pkg/parser/oneinch/oneinch_test.go | 22 ++++++------ pkg/parser/oneinchv6/oneinchv6_test.go | 9 +++-- pkg/parser/oneinchv6/parser.go | 11 ++++-- pkg/parser/paraswap/parser.go | 8 +++++ pkg/parser/parser.go | 13 +++++++- pkg/parser/uniswapx/parser.go | 10 +++++- pkg/parser/uniswapx/uniswap_test.go | 9 +++-- pkg/parser/zxotc/parser.go | 8 +++++ pkg/parser/zxrfqv3/parser.go | 13 +++++--- pkg/parser/zxrfqv3/parser_test.go | 12 +++---- pkg/rpcnode/client.go | 17 +++------- 18 files changed, 154 insertions(+), 88 deletions(-) diff --git a/cmd/tradelogs/main.go b/cmd/tradelogs/main.go index 79bc2e9..b589ec9 100644 --- a/cmd/tradelogs/main.go +++ b/cmd/tradelogs/main.go @@ -12,12 +12,14 @@ import ( "github.com/KyberNetwork/tradelogs/pkg/dune" "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/parser/bebop" - "github.com/KyberNetwork/tradelogs/pkg/parser/oneinch" "github.com/KyberNetwork/tradelogs/pkg/parser/oneinchv6" "github.com/KyberNetwork/tradelogs/pkg/parser/uniswapx" + "github.com/KyberNetwork/tradelogs/pkg/parser/zxrfqv3" "github.com/KyberNetwork/tradelogs/pkg/pricefiller" "github.com/KyberNetwork/tradelogs/pkg/rpcnode" "github.com/KyberNetwork/tradelogs/pkg/tracecall" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" libapp "github.com/KyberNetwork/tradelogs/internal/app" "github.com/KyberNetwork/tradelogs/internal/bigquery" @@ -26,15 +28,11 @@ import ( tradelogs "github.com/KyberNetwork/tradelogs/internal/server/tradelogs" "github.com/KyberNetwork/tradelogs/internal/worker" "github.com/KyberNetwork/tradelogs/pkg/evmlistenerclient" - "github.com/KyberNetwork/tradelogs/pkg/parser/hashflow" hashflowv3 "github.com/KyberNetwork/tradelogs/pkg/parser/hashflow_v3" "github.com/KyberNetwork/tradelogs/pkg/parser/kyberswap" kyberswaprfq "github.com/KyberNetwork/tradelogs/pkg/parser/kyberswap_rfq" - "github.com/KyberNetwork/tradelogs/pkg/parser/native" "github.com/KyberNetwork/tradelogs/pkg/parser/paraswap" - "github.com/KyberNetwork/tradelogs/pkg/parser/tokenlon" "github.com/KyberNetwork/tradelogs/pkg/parser/zxotc" - "github.com/KyberNetwork/tradelogs/pkg/parser/zxrfq" "github.com/KyberNetwork/tradelogs/pkg/storage" "github.com/go-redis/redis/v8" "github.com/jmoiron/sqlx" @@ -93,25 +91,21 @@ func run(c *cli.Context) error { ResponseHeaderTimeout: time.Second * 30, }, } - rpcClient, err := rpcnode.NewClient(httpClient, c.String(libapp.RPCUrlFlagName)) + ethClient, err := ethclient.Dial(c.String(libapp.RPCUrlFlagName)) if err != nil { panic(err) } - traceCalls := tracecall.NewCache(rpcClient) + traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient)) parsers := []parser.Parser{kyberswap.MustNewParser(), zxotc.MustNewParser(), - zxrfq.MustNewParser(), - tokenlon.MustNewParser(), paraswap.MustNewParser(), - hashflow.MustNewParser(), - native.MustNewParser(), kyberswaprfq.MustNewParser(), hashflowv3.MustNewParser(), - oneinch.MustNewParser(traceCalls), oneinchv6.MustNewParser(traceCalls), uniswapx.MustNewParser(traceCalls), bebop.MustNewParser(traceCalls), + zxrfqv3.MustNewParserWithDeployer(traceCalls, ethClient, common.HexToAddress(parser.Deployer0xV3)), } binanceClient := binance.NewClient(c.String(pricefiller.BinanceAPIKeyFlag.Name), c.String(pricefiller.BinanceSecretKeyFlag.Name)) @@ -122,7 +116,7 @@ func run(c *cli.Context) error { } tradeLogChan := make(chan storage.TradeLog, 1000) - w, err := worker.New(l, s, listener, priceFiller, tradeLogChan, parsers...) + w, err := worker.New(l, s, listener, priceFiller, tradeLogChan, parsers) if err != nil { l.Errorw("Error while init worker") return err diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 6fb153f..d63e776 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -11,6 +11,7 @@ import ( "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/pricefiller" "github.com/KyberNetwork/tradelogs/pkg/storage" + ethTypes "github.com/ethereum/go-ethereum/core/types" "go.uber.org/zap" ) @@ -18,25 +19,19 @@ type Worker struct { listener *evmlistenerclient.Client l *zap.SugaredLogger s *storage.Storage - p map[string]parser.Parser + p []parser.Parser priceFiller *pricefiller.PriceFiller tradeLogChan chan storage.TradeLog } func New(l *zap.SugaredLogger, s *storage.Storage, listener *evmlistenerclient.Client, priceFiller *pricefiller.PriceFiller, tradeLogChan chan storage.TradeLog, - parsers ...parser.Parser) (*Worker, error) { - p := make(map[string]parser.Parser) - for _, ps := range parsers { - for _, topic := range ps.Topics() { - p[topic] = ps - } - } + parsers []parser.Parser) (*Worker, error) { return &Worker{ listener: listener, l: l, s: s, - p: p, + p: parsers, priceFiller: priceFiller, tradeLogChan: tradeLogChan, }, nil @@ -87,14 +82,12 @@ func (w *Worker) processMessages(m []evmlistenerclient.Message) error { } } for _, log := range block.Logs { - if len(log.Topics) == 0 { - continue - } - ps := w.p[log.Topics[0]] + ethLog := convert.ToETHLog(log) + ps := w.findMatchingParser(ethLog) if ps == nil { continue } - order, err := ps.Parse(convert.ToETHLog(log), block.Timestamp) + order, err := ps.Parse(ethLog, block.Timestamp) if err != nil { w.l.Errorw("error when parse log", "log", log, "order", order, "err", err) if err := w.s.InsertErrorLog(storage.EVMLog{ @@ -143,11 +136,7 @@ func (w *Worker) retryParseLog() error { if len(topics) == 0 { continue } - ps := w.p[topics[0]] - if ps == nil { - continue - } - order, err := ps.Parse(convert.ToETHLog(types.Log{ + ethLog := convert.ToETHLog(types.Log{ Address: l.Address, Topics: topics, Data: l.Data, @@ -156,7 +145,12 @@ func (w *Worker) retryParseLog() error { TxIndex: l.TxIndex, BlockHash: l.BlockHash, Index: l.Index, - }), l.Time) + }) + ps := w.findMatchingParser(ethLog) + if ps == nil { + continue + } + order, err := ps.Parse(ethLog, l.Time) if err != nil { w.l.Errorw("error when retry log", "log", l, "err", err) continue @@ -178,3 +172,15 @@ func (w *Worker) retryParseLog() error { } return nil } + +func (w *Worker) findMatchingParser(log ethTypes.Log) parser.Parser { + var ps parser.Parser + for _, p := range w.p { + if !p.LogFromExchange(log) { + continue + } + ps = p + break + } + return ps +} diff --git a/pkg/parser/bebop/bebop_test.go b/pkg/parser/bebop/bebop_test.go index e71eff8..dfd2157 100644 --- a/pkg/parser/bebop/bebop_test.go +++ b/pkg/parser/bebop/bebop_test.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "math/big" - "net/http" "testing" "time" @@ -21,11 +20,11 @@ const rpcURL = "" func TestFetchEvent(t *testing.T) { t.Skip("Need to add the rpc url that enables the trace call JSON-RPC") - rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL) + ethClient, err := ethclient.Dial(rpcURL) if err != nil { panic(err) } - traceCalls := tracecall.NewCache(rpcClient) + traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient)) p := MustNewParser(traceCalls) require.Equal(t, p.abi.Events[TradeEvent].ID, common.HexToHash("0xadd7095becdaa725f0f33243630938c861b0bba83dfd217d4055701aa768ec2e")) client, err := ethclient.Dial(rpcURL) @@ -53,11 +52,11 @@ func TestParseEvent(t *testing.T) { events := []types.Log{} err := json.Unmarshal([]byte(eventRaw), &events) require.NoError(t, err) - rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL) + ethClient, err := ethclient.Dial(rpcURL) if err != nil { panic(err) } - traceCalls := tracecall.NewCache(rpcClient) + traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient)) p := MustNewParser(traceCalls) for _, event := range events { log, err := p.Parse(event, uint64(time.Now().Unix())) diff --git a/pkg/parser/bebop/parser.go b/pkg/parser/bebop/parser.go index f275dae..a4772d9 100644 --- a/pkg/parser/bebop/parser.go +++ b/pkg/parser/bebop/parser.go @@ -4,11 +4,12 @@ import ( "encoding/json" "errors" "fmt" - "github.com/KyberNetwork/tradelogs/pkg/types" - tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types" "math/big" "strings" + "github.com/KyberNetwork/tradelogs/pkg/types" + tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types" + "github.com/KyberNetwork/tradelogs/pkg/decoder" "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/storage" @@ -281,3 +282,9 @@ func (p *Parser) ParseWithCallFrame(callFrame *tradingTypes.CallFrame, log ether } return p.searchTradeLog(order, types.ConvertCallFrame(callFrame)) } + +func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool { + return strings.EqualFold(log.Address.String(), parser.AddrBebop) && + len(log.Topics) > 0 && + strings.EqualFold(log.Topics[0].String(), p.eventHash) +} diff --git a/pkg/parser/hashflow_v3/parser.go b/pkg/parser/hashflow_v3/parser.go index 545fcf6..521030d 100644 --- a/pkg/parser/hashflow_v3/parser.go +++ b/pkg/parser/hashflow_v3/parser.go @@ -3,6 +3,8 @@ package hashflowv3 import ( "encoding/json" "errors" + "strings" + "github.com/KyberNetwork/tradelogs/pkg/decoder" tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types" "github.com/ethereum/go-ethereum/common/hexutil" @@ -136,3 +138,9 @@ func (p *Parser) getRFQOrderParams(callFrame *tradingTypes.CallFrame) (*OrderRFQ } return nil, nil } + +func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool { + return strings.EqualFold(log.Address.String(), parser.AddrHashflowV3) && + len(log.Topics) > 0 && + strings.EqualFold(log.Topics[0].String(), p.eventHash) +} diff --git a/pkg/parser/kyberswap/parser.go b/pkg/parser/kyberswap/parser.go index c465cea..53b51ea 100644 --- a/pkg/parser/kyberswap/parser.go +++ b/pkg/parser/kyberswap/parser.go @@ -2,6 +2,8 @@ package kyberswap import ( "errors" + "strings" + tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types" ethereumTypes "github.com/ethereum/go-ethereum/core/types" @@ -85,3 +87,9 @@ func (p *Parser) UseTraceCall() bool { func (p *Parser) ParseWithCallFrame(_ *tradingTypes.CallFrame, log ethereumTypes.Log, blockTime uint64) (storage.TradeLog, error) { return p.Parse(log, blockTime) } + +func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool { + return strings.EqualFold(log.Address.String(), parser.AddrKyberswap) && + len(log.Topics) > 0 && + strings.EqualFold(log.Topics[0].String(), p.eventHash) +} diff --git a/pkg/parser/kyberswap_rfq/parser.go b/pkg/parser/kyberswap_rfq/parser.go index 677df07..59148fa 100644 --- a/pkg/parser/kyberswap_rfq/parser.go +++ b/pkg/parser/kyberswap_rfq/parser.go @@ -4,6 +4,8 @@ import ( "encoding/json" "errors" "fmt" + "strings" + "github.com/KyberNetwork/tradelogs/pkg/decoder" tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types" "github.com/ethereum/go-ethereum/common/hexutil" @@ -138,3 +140,9 @@ func (p *Parser) getRFQOrderParams(callFrame *tradingTypes.CallFrame) (*OrderRFQ } return nil, nil } + +func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool { + return strings.EqualFold(log.Address.String(), parser.AddrKyberswapRFQ) && + len(log.Topics) > 0 && + strings.EqualFold(log.Topics[0].String(), p.eventHash) +} diff --git a/pkg/parser/oneinch/oneinch_test.go b/pkg/parser/oneinch/oneinch_test.go index 328661e..3dfa124 100644 --- a/pkg/parser/oneinch/oneinch_test.go +++ b/pkg/parser/oneinch/oneinch_test.go @@ -3,14 +3,14 @@ package oneinch import ( "context" "encoding/json" - "github.com/KyberNetwork/tradelogs/pkg/storage" - tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types" "math/big" - "net/http" "strings" "testing" "time" + "github.com/KyberNetwork/tradelogs/pkg/storage" + tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types" + "github.com/KyberNetwork/tradelogs/pkg/rpcnode" "github.com/KyberNetwork/tradelogs/pkg/tracecall" "github.com/ethereum/go-ethereum" @@ -24,11 +24,11 @@ const rpcURL = "" func TestFetchEvent(t *testing.T) { t.Skip("Need to add the rpc url that enables the trace call JSON-RPC") - rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL) + ethClient, err := ethclient.Dial(rpcURL) if err != nil { panic(err) } - traceCalls := tracecall.NewCache(rpcClient) + traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient)) p := MustNewParser(traceCalls) require.Equal(t, p.abi.Events[FilledEvent].ID, common.HexToHash("0xc3b639f02b125bfa160e50739b8c44eb2d1b6908e2b6d5925c6d770f2ca78127")) client, err := ethclient.Dial(rpcURL) @@ -64,11 +64,11 @@ func TestParseEvent(t *testing.T) { event := types.Log{} err := json.Unmarshal([]byte(eventRaw), &event) require.NoError(t, err) - rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL) + ethClient, err := ethclient.Dial(rpcURL) if err != nil { panic(err) } - traceCalls := tracecall.NewCache(rpcClient) + traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient)) p := MustNewParser(traceCalls) log, err := p.Parse(event, uint64(time.Now().Unix())) require.NoError(t, err) @@ -78,11 +78,11 @@ func TestParseEvent(t *testing.T) { func TestParseOneinchTradeLog(t *testing.T) { t.Skip("Need to add the rpc url that enables the trace call JSON-RPC") - rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL) + ethClient, err := ethclient.Dial(rpcURL) if err != nil { panic(err) } - traceCalls := tracecall.NewCache(rpcClient) + traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient)) p := MustNewParser(traceCalls) client, err := ethclient.Dial(rpcURL) require.NoError(t, err) @@ -129,11 +129,11 @@ func TestParseWithCallFrame(t *testing.T) { MakerTraits: "", Expiry: 1719508091, } - rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL) + ethClient, err := ethclient.Dial(rpcURL) if err != nil { panic(err) } - traceCalls := tracecall.NewCache(rpcClient) + traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient)) p := MustNewParser(traceCalls) client, err := ethclient.Dial(rpcURL) require.NoError(t, err) diff --git a/pkg/parser/oneinchv6/oneinchv6_test.go b/pkg/parser/oneinchv6/oneinchv6_test.go index 79891ff..a8d3278 100644 --- a/pkg/parser/oneinchv6/oneinchv6_test.go +++ b/pkg/parser/oneinchv6/oneinchv6_test.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "math/big" - "net/http" "testing" "time" @@ -21,11 +20,11 @@ const rpcURL = "" func TestFetchEvent(t *testing.T) { t.Skip("Need to add the rpc url that enables the trace call JSON-RPC") - rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL) + ethClient, err := ethclient.Dial(rpcURL) if err != nil { panic(err) } - traceCalls := tracecall.NewCache(rpcClient) + traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient)) p := MustNewParser(traceCalls) require.Equal(t, p.abi.Events[FilledEvent].ID, common.HexToHash("0xfec331350fce78ba658e082a71da20ac9f8d798a99b3c79681c8440cbfe77e07")) client, err := ethclient.Dial(rpcURL) @@ -53,11 +52,11 @@ func TestParseEvent(t *testing.T) { events := []types.Log{} err := json.Unmarshal([]byte(eventRaw), &events) require.NoError(t, err) - rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL) + ethClient, err := ethclient.Dial(rpcURL) if err != nil { panic(err) } - traceCalls := tracecall.NewCache(rpcClient) + traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient)) p := MustNewParser(traceCalls) for _, event := range events { log, err := p.Parse(event, uint64(time.Now().Unix())) diff --git a/pkg/parser/oneinchv6/parser.go b/pkg/parser/oneinchv6/parser.go index c89b096..77cae7d 100644 --- a/pkg/parser/oneinchv6/parser.go +++ b/pkg/parser/oneinchv6/parser.go @@ -4,11 +4,12 @@ import ( "encoding/json" "errors" "fmt" - "github.com/KyberNetwork/tradelogs/pkg/types" - tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types" "math/big" "strings" + "github.com/KyberNetwork/tradelogs/pkg/types" + tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types" + "github.com/KyberNetwork/tradelogs/pkg/abitypes" "github.com/KyberNetwork/tradelogs/pkg/decoder" "github.com/KyberNetwork/tradelogs/pkg/parser" @@ -261,3 +262,9 @@ func (p *Parser) ParseWithCallFrame(callFrame *tradingTypes.CallFrame, log ether count := 0 return p.recursiveDetectOneInchRFQTrades(order, types.ConvertCallFrame(callFrame), &count) } + +func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool { + return strings.EqualFold(log.Address.String(), parser.Addr1InchV6) && + len(log.Topics) > 0 && + strings.EqualFold(log.Topics[0].String(), p.eventHash) +} diff --git a/pkg/parser/paraswap/parser.go b/pkg/parser/paraswap/parser.go index f953636..51ae152 100644 --- a/pkg/parser/paraswap/parser.go +++ b/pkg/parser/paraswap/parser.go @@ -3,6 +3,8 @@ package paraswap import ( "encoding/json" "errors" + "strings" + "github.com/KyberNetwork/tradelogs/pkg/decoder" tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types" "github.com/ethereum/go-ethereum/common/hexutil" @@ -136,3 +138,9 @@ func (p *Parser) getRFQOrderParams(callFrame *tradingTypes.CallFrame) (*OrderRFQ } return nil, nil } + +func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool { + return strings.EqualFold(log.Address.String(), parser.AddrParaswap) && + len(log.Topics) > 0 && + strings.EqualFold(log.Topics[0].String(), p.eventHash) +} diff --git a/pkg/parser/parser.go b/pkg/parser/parser.go index f0df93a..ebbf788 100644 --- a/pkg/parser/parser.go +++ b/pkg/parser/parser.go @@ -8,7 +8,7 @@ import ( const ( ExZeroX = "zerox" - EXZeroXV3 = "zerox_v3" + EXZeroXV3 = "zeroxV3" ExParaswap = "paraswap" ExTokenlon = "tokenlon" ExZeroXRFQ = "zerox_rfq" @@ -22,6 +22,16 @@ const ( ExUniswapX = "uniswapx" ExNative = "native" ExBebop = "bebop" + + Addr1InchV6 = "0x111111125421cA6dc452d289314280a0f8842A65" + AddrBebop = "0xbbbbbBB520d69a9775E85b458C58c648259FAD5F" + AddrHashflowV3 = "0x24b9d98FABF4DA1F69eE10775F240AE3dA6856fd" + AddrKyberswap = "0x6131B5fae19EA4f9D964eAc0408E4408b66337b5" + AddrKyberswapRFQ = "0x7A819Fa46734a49D0112796f9377E024c350FB26" + AddrParaswap = "0xe92b586627ccA7a83dC919cc7127196d70f55a06" + AddrUniswapX = "0x00000011F84B9aa48e5f8aA8B9897600006289Be" + Addr0x = "0xDef1C0ded9bec7F1a1670819833240f027b25EfF" + Deployer0xV3 = "0x00000000000004533Fe15556B1E086BB1A72cEae" ) type Parser interface { @@ -30,4 +40,5 @@ type Parser interface { Exchange() string UseTraceCall() bool ParseWithCallFrame(callFrame *tradingTypes.CallFrame, log ethereumTypes.Log, blockTime uint64) (storage.TradeLog, error) + LogFromExchange(log ethereumTypes.Log) bool } diff --git a/pkg/parser/uniswapx/parser.go b/pkg/parser/uniswapx/parser.go index 69fbc59..dba3109 100644 --- a/pkg/parser/uniswapx/parser.go +++ b/pkg/parser/uniswapx/parser.go @@ -4,9 +4,11 @@ import ( "encoding/json" "errors" "fmt" + "math/big" + "strings" + "github.com/KyberNetwork/tradelogs/pkg/types" tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types" - "math/big" "github.com/KyberNetwork/tradelogs/pkg/decoder" "github.com/KyberNetwork/tradelogs/pkg/parser" @@ -331,3 +333,9 @@ func (p *Parser) ParseWithCallFrame(callFrame *tradingTypes.CallFrame, log ether } return order, nil } + +func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool { + return strings.EqualFold(log.Address.String(), parser.AddrUniswapX) && + len(log.Topics) > 0 && + strings.EqualFold(log.Topics[0].String(), p.eventHash) +} diff --git a/pkg/parser/uniswapx/uniswap_test.go b/pkg/parser/uniswapx/uniswap_test.go index aefa74d..dfe9024 100644 --- a/pkg/parser/uniswapx/uniswap_test.go +++ b/pkg/parser/uniswapx/uniswap_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "math/big" - "net/http" "testing" "github.com/KyberNetwork/tradelogs/pkg/rpcnode" @@ -21,11 +20,11 @@ const rpcURL = "" func TestFetchEvent(t *testing.T) { t.Skip("Need to add the rpc url that enables the trace call JSON-RPC") - rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL) + ethClient, err := ethclient.Dial(rpcURL) if err != nil { panic(err) } - traceCalls := tracecall.NewCache(rpcClient) + traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient)) p := MustNewParser(traceCalls) require.Equal(t, p.abi.Events[FilledEvent].ID, common.HexToHash("0x78ad7ec0e9f89e74012afa58738b6b661c024cb0fd185ee2f616c0a28924bd66")) client, err := ethclient.Dial("https://ethereum.kyberengineering.io") @@ -53,11 +52,11 @@ func TestParseEvent(t *testing.T) { event := types.Log{} err := json.Unmarshal([]byte(eventRaw), &event) require.NoError(t, err) - rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL) + ethClient, err := ethclient.Dial(rpcURL) if err != nil { panic(err) } - traceCalls := tracecall.NewCache(rpcClient) + traceCalls := tracecall.NewCache(rpcnode.NewClient(ethClient)) p := MustNewParser(traceCalls) log, err := p.Parse(event, 1713889895) require.NoError(t, err) diff --git a/pkg/parser/zxotc/parser.go b/pkg/parser/zxotc/parser.go index 65ce8c0..28c7980 100644 --- a/pkg/parser/zxotc/parser.go +++ b/pkg/parser/zxotc/parser.go @@ -3,6 +3,8 @@ package zxotc import ( "encoding/json" "errors" + "strings" + "github.com/KyberNetwork/tradelogs/pkg/decoder" tradingTypes "github.com/KyberNetwork/tradinglib/pkg/types" "github.com/ethereum/go-ethereum/common/hexutil" @@ -136,3 +138,9 @@ func (p *Parser) getRFQOrderParams(callFrame *tradingTypes.CallFrame) (*OrderRFQ } return nil, nil } + +func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool { + return strings.EqualFold(log.Address.String(), parser.Addr0x) && + len(log.Topics) > 0 && + strings.EqualFold(log.Topics[0].String(), p.eventHash) +} diff --git a/pkg/parser/zxrfqv3/parser.go b/pkg/parser/zxrfqv3/parser.go index d7f6c77..e4b6d39 100644 --- a/pkg/parser/zxrfqv3/parser.go +++ b/pkg/parser/zxrfqv3/parser.go @@ -3,6 +3,10 @@ package zxrfqv3 import ( "context" "fmt" + "log" + "math/big" + "time" + "github.com/KyberNetwork/tradelogs/pkg/decoder" "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/parser/zxrfqv3/deployer" @@ -17,9 +21,6 @@ import ( ethereumTypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "go.uber.org/zap" - "log" - "math/big" - "time" ) type Parser struct { @@ -108,7 +109,6 @@ func (p *Parser) getAndUpdateContractAddress(deployer *deployer.Deployer, contra } if !p.contractABIs.containAddress(contractAddress) && !isZeroAddress(contractAddress) { p.l.Infow("add contract abi", "contractAddress", contractAddress) - fmt.Println("add contract abi", "contractAddress", contractAddress, contractType) p.contractABIs.addContractABI(contractAddress, abi) } return nil @@ -352,3 +352,8 @@ func calculateTakerTokenAmount(makerTokenAmount *big.Int, maxTakerTokenAmount *b tmp := makerTokenAmountCal.Mul(makerTokenAmountCal, maxTakerTokenAmountCal) return tmp.Div(tmp, permittedTokenAmountCal) } + +func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool { + return p.contractABIs.containAddress(log.Address) && + len(log.Topics) == 0 +} diff --git a/pkg/parser/zxrfqv3/parser_test.go b/pkg/parser/zxrfqv3/parser_test.go index ba8bf50..a99d4af 100644 --- a/pkg/parser/zxrfqv3/parser_test.go +++ b/pkg/parser/zxrfqv3/parser_test.go @@ -3,6 +3,9 @@ package zxrfqv3 import ( "context" "encoding/json" + "os" + "testing" + "github.com/KyberNetwork/tradelogs/pkg/rpcnode" "github.com/KyberNetwork/tradelogs/pkg/storage" "github.com/KyberNetwork/tradelogs/pkg/tracecall" @@ -11,9 +14,6 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "net/http" - "os" - "testing" ) const rpcURL = "" @@ -21,11 +21,11 @@ const rpcURL = "" func newParserTest(t *testing.T, contractABI ContractABI, needRpc bool) *Parser { var cache *tracecall.Cache if needRpc { - rpcClient, err := rpcnode.NewClient(http.DefaultClient, rpcURL) + ethClient, err := ethclient.Dial(rpcURL) if err != nil { - t.Fatal(err) + panic(err) } - cache = tracecall.NewCache(rpcClient) + cache = tracecall.NewCache(rpcnode.NewClient(ethClient)) } return MustNewParser(cache, contractABI) } diff --git a/pkg/rpcnode/client.go b/pkg/rpcnode/client.go index fc4f9a4..0a46c52 100644 --- a/pkg/rpcnode/client.go +++ b/pkg/rpcnode/client.go @@ -2,28 +2,19 @@ package rpcnode import ( "context" - "net/http" "github.com/KyberNetwork/tradelogs/pkg/types" "github.com/ethereum/go-ethereum/ethclient" ) type Client struct { - httpClient *http.Client - rpcUrl string - ethClient *ethclient.Client + ethClient *ethclient.Client } -func NewClient(httpClient *http.Client, rpcUrl string) (*Client, error) { - ethClient, err := ethclient.Dial(rpcUrl) - if err != nil { - return nil, err - } +func NewClient(ethClient *ethclient.Client) *Client { return &Client{ - httpClient: httpClient, - rpcUrl: rpcUrl, - ethClient: ethClient, - }, nil + ethClient: ethClient, + } } func (c *Client) FetchTraceCall(ctx context.Context, txHash string) (types.CallFrame, error) {