diff --git a/cmd/tradelogs/main.go b/cmd/tradelogs/main.go index fbca05e..2dd226d 100644 --- a/cmd/tradelogs/main.go +++ b/cmd/tradelogs/main.go @@ -8,13 +8,13 @@ import ( "os" "time" + "github.com/KyberNetwork/tradelogs/pkg/dune" "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/parser/oneinch" "github.com/KyberNetwork/tradelogs/pkg/rpcnode" "github.com/KyberNetwork/tradelogs/pkg/tracecall" libapp "github.com/KyberNetwork/tradelogs/internal/app" - "github.com/KyberNetwork/tradelogs/internal/bigquery" "github.com/KyberNetwork/tradelogs/internal/dbutil" backfill "github.com/KyberNetwork/tradelogs/internal/server/backfill" tradelogs "github.com/KyberNetwork/tradelogs/internal/server/tradelogs" @@ -92,8 +92,7 @@ func run(c *cli.Context) error { } traceCalls := tracecall.NewCache(rpcClient) - w, err := worker.New(l, s, listener, - kyberswap.MustNewParser(), + parsers := []parser.Parser{kyberswap.MustNewParser(), zxotc.MustNewParser(), zxrfq.MustNewParser(), tokenlon.MustNewParser(), @@ -103,35 +102,22 @@ func run(c *cli.Context) error { kyberswaprfq.MustNewParser(), hashflowv3.MustNewParser(), oneinch.MustNewParser(traceCalls), - ) + } + + w, err := worker.New(l, s, listener, parsers...) if err != nil { l.Errorw("Error while init worker") return err } - parserMap := map[string]parser.Parser{ - "kyberswap": kyberswap.MustNewParser(), - "zxotc": zxotc.MustNewParser(), - "zxrfq": zxrfq.MustNewParser(), - "tokenlon": tokenlon.MustNewParser(), - "paraswap": paraswap.MustNewParser(), - "hashflow": hashflow.MustNewParser(), - "native": native.MustNewParser(), - "kyberswaprfq": kyberswaprfq.MustNewParser(), - "hashflowv3": hashflowv3.MustNewParser(), - "1inch": oneinch.MustNewParser(traceCalls), - } - backfillWorker, err := bigquery.NewWorker(libapp.BigqueryProjectIDFFromCli(c), s, parserMap) - if err != nil { - l.Errorw("Error while init backfillWorker") - } else { - httpBackfill := backfill.New(c.String(libapp.HTTPBackfillServerFlag.Name), backfillWorker) - go func() { - if err := httpBackfill.Run(); err != nil { - panic(err) - } - }() - } + httpBackfill := backfill.New(c.String(libapp.HTTPBackfillServerFlag.Name), + backfill.NewDuneWoker(dune.NewClient(c.String(libapp.DuneURLFlag.Name), c.String(libapp.DuneKeyFlag.Name), httpClient), s), + parsers) + go func() { + if err := httpBackfill.Run(); err != nil { + panic(err) + } + }() httpTradelogs := tradelogs.New(l, s, c.String(libapp.HTTPServerFlag.Name)) go func() { diff --git a/go.mod b/go.mod index 698b5cb..d112ebd 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/joho/godotenv v1.4.0 github.com/lib/pq v1.10.9 github.com/pkg/errors v0.9.1 + github.com/rs/xid v1.5.0 github.com/shopspring/decimal v1.3.1 github.com/stretchr/testify v1.8.4 github.com/urfave/cli v1.22.14 diff --git a/go.sum b/go.sum index 3c5d19b..07b8549 100644 --- a/go.sum +++ b/go.sum @@ -449,6 +449,8 @@ github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZV github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= diff --git a/internal/app/server.go b/internal/app/server.go index d41669d..1e73da5 100644 --- a/internal/app/server.go +++ b/internal/app/server.go @@ -18,9 +18,22 @@ var HTTPBackfillServerFlag = cli.StringFlag{ Value: "localhost:8081", } +var DuneURLFlag = cli.StringFlag{ + Name: "dune-url", + EnvVar: "DUNE_URL", + Value: "https://api.dune.com/api", +} + +var DuneKeyFlag = cli.StringFlag{ + Name: "dune-key", + EnvVar: "DUNE_KEY", +} + func HTTPServerFlags() []cli.Flag { return []cli.Flag{ HTTPServerFlag, HTTPBackfillServerFlag, + DuneURLFlag, + DuneKeyFlag, } } diff --git a/internal/server/backfill/dune_worker.go b/internal/server/backfill/dune_worker.go new file mode 100644 index 0000000..a128edf --- /dev/null +++ b/internal/server/backfill/dune_worker.go @@ -0,0 +1,130 @@ +package server + +import ( + "time" + + "github.com/KyberNetwork/tradelogs/pkg/dune" + "github.com/KyberNetwork/tradelogs/pkg/parser" + "github.com/KyberNetwork/tradelogs/pkg/storage" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "go.uber.org/zap" +) + +const ( + limit = 100 + retry = 5 +) + +type DuneWorker struct { + client *dune.Client + s *storage.Storage +} + +func NewDuneWoker(c *dune.Client, s *storage.Storage) *DuneWorker { + return &DuneWorker{ + client: c, + s: s, + } +} + +func (d *DuneWorker) backfill(l *zap.SugaredLogger, + queryID int64, from, to uint64, ps parser.Parser, topic string) error { + l.Infow("start backfill data", "query", queryID, "topic", topic, "from", from, "to", to) + queryRes, err := d.client.ExecuteQuery(queryID, topic, from, to) + if err != nil { + return err + } + + l.Infow("executing query", "data", queryRes) + errCount := 0 + for { + time.Sleep(5 * time.Second) + state, err := d.client.ExecuteState(queryRes.ExecutionID) + if err != nil { + l.Errorw("error when get state", "error", err) + errCount++ + if errCount > retry { + return err + } + continue + } + l.Infow("execute query", "state", state) + if state.IsExecutionFinished { + break + } + } + l.Infow("executed query") + + var ( + progress uint64 = 0 + data []storage.TradeLog + ) + errCount = 0 + + for { + time.Sleep(5 * time.Second) + logs, rowCount, err := d.client.GetLastestExecuteResult(queryID, limit, progress) + if err != nil { + l.Errorw("error when collect data", "error", err) + errCount++ + if errCount > retry { + return err + } + continue + } + + l.Infow("collect data", "progress", progress, "len", len(logs), "total", rowCount) + for _, l := range logs { + ts, err := time.Parse("2006-01-02 15:04:05.999 UTC", l.BlockTime) + if err != nil { + return err + } + parsedLog, err := ps.Parse(DuneLogToETHLog(l), uint64(ts.Unix())) + if err != nil { + return err + } + data = append(data, parsedLog) + } + l.Infow("parsed data", "len", len(data)) + if err = d.s.Insert(data); err != nil { + return err + } + progress += limit + 1 + if progress >= rowCount { + break + } + } + return nil +} + +func DuneLogToETHLog(log dune.DuneLog) types.Log { + data, err := hexutil.Decode(log.Data) + if err != nil { + return types.Log{} + } + topics := []common.Hash{} + if log.Topic0 != "" { + topics = append(topics, common.HexToHash(log.Topic0)) + } + if log.Topic1 != "" { + topics = append(topics, common.HexToHash(log.Topic1)) + } + if log.Topic2 != "" { + topics = append(topics, common.HexToHash(log.Topic2)) + } + if log.Topic3 != "" { + topics = append(topics, common.HexToHash(log.Topic3)) + } + return types.Log{ + Address: common.HexToAddress(log.ContractAddress), + Topics: topics, + Data: data, + BlockNumber: log.BlockNumber, + TxHash: common.HexToHash(log.TxHash), + TxIndex: log.TxIndex, + BlockHash: common.HexToHash(log.BlockHash), + Index: log.Index, + } +} diff --git a/internal/server/backfill/server.go b/internal/server/backfill/server.go index a1080e3..db9ac23 100644 --- a/internal/server/backfill/server.go +++ b/internal/server/backfill/server.go @@ -4,11 +4,12 @@ import ( "fmt" "net/http" "strings" - "time" - "github.com/KyberNetwork/tradelogs/internal/bigquery" + "github.com/KyberNetwork/tradelogs/pkg/parser" + "github.com/KyberNetwork/tradelogs/pkg/storage" "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" + "github.com/rs/xid" "go.uber.org/zap" ) @@ -16,20 +17,22 @@ import ( type Server struct { l *zap.SugaredLogger r *gin.Engine - bq *bigquery.Worker + dune *DuneWorker bindAddr string + parsers []parser.Parser } // New returns a new server. -func New(bindAddr string, bq *bigquery.Worker) *Server { +func New(bindAddr string, dune *DuneWorker, parsers []parser.Parser) *Server { engine := gin.New() engine.Use(gin.Recovery()) server := &Server{ l: zap.S(), r: engine, - bq: bq, + dune: dune, bindAddr: bindAddr, + parsers: parsers, } gin.SetMode(gin.ReleaseMode) @@ -50,7 +53,6 @@ func (s *Server) Run() error { func (s *Server) register() { pprof.Register(s.r, "/debug") s.r.POST("/backfill", s.backfill) - s.r.POST("/backfill-1inch", s.backfillOneinch) } func responseErr(c *gin.Context, err error) { @@ -60,60 +62,59 @@ func responseErr(c *gin.Context, err error) { }) } +func responseOK(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{ + "success": true, + }) +} + func (s *Server) backfill(c *gin.Context) { - var ( - query struct { - FromTime int64 `form:"from_time" json:"from_time,omitempty"` // milliseconds - ToTime int64 `form:"to_time" json:"to_time,omitempty"` // milliseconds - Exchanges string `form:"exchanges" json:"exchanges,omitempty"` - } - err = c.ShouldBind(&query) - ) - if err != nil { + var params storage.BackfillQuery + if err := c.BindJSON(¶ms); err != nil { responseErr(c, err) return } - var exchanges []string - if query.Exchanges != "" { - exchanges = strings.Split(query.Exchanges, ",") - } - s.l.Infow("Request backfill", "query", query) - if query.FromTime == 0 || query.ToTime == 0 { - err = s.bq.BackFillAllData(exchanges) - } else { - err = s.bq.BackFillPartialData(query.FromTime/1000, query.ToTime/1000, exchanges) - } - if err != nil { - responseErr(c, err) + if params.FromBlock > params.ToBlock { + responseErr(c, fmt.Errorf("from block is greater than to block")) return } - c.JSON(http.StatusOK, gin.H{ - "success": true, - "time": time.Now().UnixMilli(), - }) -} - -func (s *Server) backfillOneinch(c *gin.Context) { - var ( - query BackFillOneInchRequest - ) - if err := c.ShouldBind(&query); err != nil { - responseErr(c, err) + if params.EventHash == "" && params.Exchange == "" { + responseErr(c, fmt.Errorf("empty event hash or exchange")) return } - tradeLogs := query.ToTradeLogs() + l := s.l.With("reqID", xid.New().String()) + l.Infow("receive backfill params", "params", params) + if params.EventHash != "" { + for _, p := range s.parsers { + for _, t := range p.Topics() { + if strings.EqualFold(params.EventHash, t) { + if err := s.dune.backfill(l, params.QueryID, params.FromBlock, params.ToBlock, p, t); err != nil { + l.Errorw("error when backfill", "error", err) + responseErr(c, err) + return + } + responseOK(c) + return + } + } + } + } - err := s.bq.BackfillOneInchRFQ(tradeLogs) - if err != nil { - responseErr(c, err) - return + for _, p := range s.parsers { + if strings.EqualFold(params.Exchange, p.Exchange()) { + for _, t := range p.Topics() { + if err := s.dune.backfill(l, params.QueryID, params.FromBlock, params.ToBlock, p, t); err != nil { + l.Errorw("error when backfill", "error", err) + responseErr(c, err) + return + } + } + responseOK(c) + return + } } - c.JSON(http.StatusOK, gin.H{ - "message": "Backfill 1inch rfq orders successfully", - "success": true, - "time": time.Now().UnixMilli(), - }) + } diff --git a/internal/server/tradelogs/server.go b/internal/server/tradelogs/server.go index 3caff9c..1b89a2c 100644 --- a/internal/server/tradelogs/server.go +++ b/internal/server/tradelogs/server.go @@ -56,12 +56,15 @@ func (s *Server) register() { s.r.GET("/tradelogs", s.getTradeLogs) } + func responseErr(c *gin.Context, status int, err error) { c.JSON(http.StatusBadRequest, gin.H{ "success": false, "error": err.Error(), + "status": status, }) } + func (s *Server) getTradeLogs(c *gin.Context) { var ( query storage.TradeLogsQuery diff --git a/pkg/dune/client.go b/pkg/dune/client.go new file mode 100644 index 0000000..9ff7c9d --- /dev/null +++ b/pkg/dune/client.go @@ -0,0 +1,167 @@ +package dune + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "time" +) + +type Client struct { + baseURL string + c *http.Client + apiKey string +} + +func NewClient(url, key string, httpClient *http.Client) *Client { + return &Client{ + baseURL: url, + c: httpClient, + apiKey: key, + } +} + +type ExecuteQuery struct { + QueryParams struct { + EventHash string `json:"event_hash"` + BlockFrom uint64 `json:"block_from"` + BlockTo uint64 `json:"block_to"` + } `json:"query_parameters"` +} + +type ExecuteResponse struct { + ExecutionID string `json:"execution_id"` + State string `json:"state"` + Error string `json:"error"` +} + +type StateResponse struct { + ExecutionID string `json:"execution_id"` + QueryID int `json:"query_id"` + IsExecutionFinished bool `json:"is_execution_finished"` + State string `json:"state"` + SubmittedAt time.Time `json:"submitted_at"` + ExpiresAt time.Time `json:"expires_at"` + ExecutionStartedAt time.Time `json:"execution_started_at"` + ExecutionEndedAt time.Time `json:"execution_ended_at"` + Error string `json:"error"` +} + +func (c *Client) ExecuteQuery(queryID int64, evenHash string, blockFrom, blockTo uint64) (*ExecuteResponse, error) { + params := ExecuteQuery{} + params.QueryParams.EventHash = evenHash + params.QueryParams.BlockFrom = blockFrom + params.QueryParams.BlockTo = blockTo + data, err := json.Marshal(params) + if err != nil { + return nil, err + } + req, err := http.NewRequest("POST", fmt.Sprintf("%s/v1/query/%d/execute", c.baseURL, queryID), bytes.NewBuffer(data)) + if err != nil { + return nil, err + } + req.Header.Set("x-dune-api-key", c.apiKey) + res, err := c.c.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + + var executeRes ExecuteResponse + if err = json.NewDecoder(res.Body).Decode(&executeRes); err != nil { + return nil, err + } + if executeRes.Error != "" { + return nil, fmt.Errorf(executeRes.Error) + } + return &executeRes, nil +} + +func (c *Client) ExecuteState(exeID string) (*StateResponse, error) { + req, err := http.NewRequest("GET", fmt.Sprintf("%s/v1/execution/%s/status", c.baseURL, exeID), nil) + if err != nil { + return nil, err + } + req.Header.Set("x-dune-api-key", c.apiKey) + res, err := c.c.Do(req) + if err != nil { + return nil, err + } + + defer res.Body.Close() + var state StateResponse + if err = json.NewDecoder(res.Body).Decode(&state); err != nil { + return nil, err + } + if state.Error != "" { + return nil, fmt.Errorf(state.Error) + } + return &state, nil +} + +type DuneLog struct { + BlockTime string `json:"block_time"` + BlockNumber uint64 `json:"block_number"` + BlockHash string `json:"block_hash"` + ContractAddress string `json:"contract_address"` + Topic0 string `json:"topic0"` + Topic1 string `json:"topic1"` + Topic2 string `json:"topic2"` + Topic3 string `json:"topic3"` + Data string `json:"data"` + TxHash string `json:"tx_hash"` + Index uint `json:"index"` + TxIndex uint `json:"tx_index"` + BlockDate string `json:"block_date"` + TxFrom string `json:"tx_from"` + TxTo string `json:"tx_to"` +} + +func (c *Client) GetLastestExecuteResult(queryID int64, limit, offset uint64) ([]DuneLog, uint64, error) { + req, err := http.NewRequest("GET", fmt.Sprintf("%s/v1/query/%d/results?allow_partial_results=true&limit=%d&offset=%d", c.baseURL, queryID, limit, offset), nil) + if err != nil { + return nil, 0, err + } + req.Header.Set("x-dune-api-key", c.apiKey) + res, err := c.c.Do(req) + if err != nil { + return nil, 0, err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return nil, 0, fmt.Errorf("unexpected status code: %d", res.StatusCode) + } + + var data map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&data); err != nil { + return nil, 0, fmt.Errorf("failed to decode JSON response: %v", err) + } + result, ok := data["result"].(map[string]interface{}) + if !ok { + return nil, 0, fmt.Errorf("error when parse result") + } + metadata, ok := result["metadata"].(map[string]interface{}) + if !ok { + return nil, 0, fmt.Errorf("error when parse metadata") + } + rowCount, ok := metadata["total_row_count"].(float64) + if !ok { + return nil, 0, fmt.Errorf("error when parse row count") + } + rows, ok := result["rows"].([]interface{}) + if !ok { + return nil, 0, fmt.Errorf("error when parse rows") + } + jsonData, err := json.Marshal(rows) + if err != nil { + return nil, 0, err + } + ret := []DuneLog{} + if err = json.Unmarshal(jsonData, &ret); err != nil { + return nil, 0, err + } + + return ret, uint64(rowCount), nil +} diff --git a/pkg/dune/client_test.go b/pkg/dune/client_test.go new file mode 100644 index 0000000..4712320 --- /dev/null +++ b/pkg/dune/client_test.go @@ -0,0 +1,34 @@ +package dune + +import ( + "net/http" + "testing" + + "github.com/stretchr/testify/require" +) + +const key = "" + +func TestDuneExecute(t *testing.T) { + t.Skip() + c := NewClient("https://api.dune.com/api", key, http.DefaultClient) + res, err := c.ExecuteQuery(3551570, "0x34f57786fb01682fb4eec88d340387ef01a168fe345ea5b76f709d4e560c10eb", 18366958, 19467958) + require.NoError(t, err) + t.Log(res.ExecutionID) +} + +func TestDuneState(t *testing.T) { + t.Skip() + c := NewClient("https://api.dune.com/api", key, http.DefaultClient) + res, err := c.ExecuteState("01HSB444B5E7WJZDGYS72GFJGY") + require.NoError(t, err) + t.Log(res) +} + +func TestDuneGetLastestResults(t *testing.T) { + t.Skip() + c := NewClient("https://api.dune.com/api", key, http.DefaultClient) + res, rowcount, err := c.GetLastestExecuteResult(3551570, 10, 1) + require.NoError(t, err) + t.Log(res, rowcount) +} diff --git a/pkg/parser/hashflow/parser.go b/pkg/parser/hashflow/parser.go index ac75278..37b1e12 100644 --- a/pkg/parser/hashflow/parser.go +++ b/pkg/parser/hashflow/parser.go @@ -3,6 +3,7 @@ package hashflow import ( "errors" + "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/storage" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -72,3 +73,7 @@ func (p *Parser) Parse(log types.Log, blockTime uint64) (storage.TradeLog, error } return res, nil } + +func (p *Parser) Exchange() string { + return parser.ExHashflow +} diff --git a/pkg/parser/hashflow_v3/parser.go b/pkg/parser/hashflow_v3/parser.go index c4aa662..fb3f7a0 100644 --- a/pkg/parser/hashflow_v3/parser.go +++ b/pkg/parser/hashflow_v3/parser.go @@ -3,6 +3,7 @@ package hashflowv3 import ( "errors" + "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/storage" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -72,3 +73,7 @@ func (p *Parser) Parse(log types.Log, blockTime uint64) (storage.TradeLog, error } return res, nil } + +func (p *Parser) Exchange() string { + return parser.ExHashflowV3 +} diff --git a/pkg/parser/kyberswap/parser.go b/pkg/parser/kyberswap/parser.go index 31342c6..2b6fb0e 100644 --- a/pkg/parser/kyberswap/parser.go +++ b/pkg/parser/kyberswap/parser.go @@ -5,6 +5,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" + "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/storage" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -71,3 +72,7 @@ func (p *Parser) Parse(log types.Log, blockTime uint64) (storage.TradeLog, error } return res, nil } + +func (p *Parser) Exchange() string { + return parser.ExKs +} diff --git a/pkg/parser/kyberswap_rfq/parser.go b/pkg/parser/kyberswap_rfq/parser.go index f774d75..fef3ded 100644 --- a/pkg/parser/kyberswap_rfq/parser.go +++ b/pkg/parser/kyberswap_rfq/parser.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/storage" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -73,3 +74,8 @@ func (p *Parser) Parse(log types.Log, blockTime uint64) (storage.TradeLog, error } return res, nil } + +func (p *Parser) Exchange() string { + return parser.ExKsRFQ +} + diff --git a/pkg/parser/native/parser.go b/pkg/parser/native/parser.go index 9c816b2..1b7a35c 100644 --- a/pkg/parser/native/parser.go +++ b/pkg/parser/native/parser.go @@ -3,6 +3,7 @@ package native import ( "errors" + "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/storage" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -72,3 +73,7 @@ func (p *Parser) Parse(log types.Log, blockTime uint64) (storage.TradeLog, error } return res, nil } + +func (p *Parser) Exchange() string { + return parser.ExNative +} diff --git a/pkg/parser/oneinch/parser.go b/pkg/parser/oneinch/parser.go index c1aaa6a..a94cb65 100644 --- a/pkg/parser/oneinch/parser.go +++ b/pkg/parser/oneinch/parser.go @@ -9,6 +9,7 @@ import ( "github.com/KyberNetwork/tradelogs/internal/types" "github.com/KyberNetwork/tradelogs/pkg/abitypes" "github.com/KyberNetwork/tradelogs/pkg/decoder" + "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/storage" "github.com/KyberNetwork/tradelogs/pkg/tracecall" "github.com/ethereum/go-ethereum/accounts/abi" @@ -213,3 +214,7 @@ func (p *Parser) decodeOutput(output string) (string, string, string, error) { orderHash := hexutil.Encode(orderHashParams[:]) return filledMakingAmountFromOutput.String(), filledTakingAmountFromOutput.String(), orderHash, nil } + +func (p *Parser) Exchange() string { + return parser.Ex1Inch +} diff --git a/pkg/parser/paraswap/parser.go b/pkg/parser/paraswap/parser.go index d8ae1b8..ffef791 100644 --- a/pkg/parser/paraswap/parser.go +++ b/pkg/parser/paraswap/parser.go @@ -3,6 +3,7 @@ package paraswap import ( "errors" + "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/storage" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -72,3 +73,7 @@ func (p *Parser) Parse(log types.Log, blockTime uint64) (storage.TradeLog, error } return res, nil } + +func (p *Parser) Exchange() string { + return parser.ExParaswap +} diff --git a/pkg/parser/parser.go b/pkg/parser/parser.go index 335f782..00d6c88 100644 --- a/pkg/parser/parser.go +++ b/pkg/parser/parser.go @@ -5,7 +5,23 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) +const ( + ExZeroX = "zerox" + ExParaswap = "paraswap" + ExTokenlon = "tokenlon" + ExZeroXRFQ = "zerox_rfq" + ExKs = "kyberswap" + ExHashflow = "hashflow" + ExKsRFQ = "kyberswapRFQ" + Ex1Inch = "1inch" + ExHashflowV3 = "hashflowV3" + ExParaswapTaker = "paraswap_taker" + ExUniswapX = "uniswapx" + ExNative = "native" +) + type Parser interface { Parse(log types.Log, blockTime uint64) (storage.TradeLog, error) Topics() []string + Exchange() string } diff --git a/pkg/parser/tokenlon/parser.go b/pkg/parser/tokenlon/parser.go index d95d695..187c1d9 100644 --- a/pkg/parser/tokenlon/parser.go +++ b/pkg/parser/tokenlon/parser.go @@ -3,6 +3,7 @@ package tokenlon import ( "errors" + "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/storage" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -72,3 +73,7 @@ func (p *Parser) Parse(log types.Log, blockTime uint64) (storage.TradeLog, error } return res, nil } + +func (p *Parser) Exchange() string { + return parser.ExTokenlon +} diff --git a/pkg/parser/zxotc/parser.go b/pkg/parser/zxotc/parser.go index 7971d8b..08695de 100644 --- a/pkg/parser/zxotc/parser.go +++ b/pkg/parser/zxotc/parser.go @@ -3,6 +3,7 @@ package zxotc import ( "errors" + "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/storage" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -72,3 +73,7 @@ func (p *Parser) Parse(log types.Log, blockTime uint64) (storage.TradeLog, error } return res, nil } + +func (p *Parser) Exchange() string { + return parser.ExZeroX +} diff --git a/pkg/parser/zxrfq/parser.go b/pkg/parser/zxrfq/parser.go index 9a0f609..48a8601 100644 --- a/pkg/parser/zxrfq/parser.go +++ b/pkg/parser/zxrfq/parser.go @@ -3,6 +3,7 @@ package zxrfq import ( "errors" + "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/storage" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -72,3 +73,7 @@ func (p *Parser) Parse(log types.Log, blockTime uint64) (storage.TradeLog, error } return res, nil } + +func (p *Parser) Exchange() string { + return parser.ExZeroXRFQ +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 221178a..40598d3 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -27,7 +27,6 @@ func (s *Storage) Insert(orders []TradeLog) error { if len(orders) == 0 { return nil } - s.l.Debugw("Request insert", "orders", orders) b := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar).Insert(tradeLogsTable).Columns( tradelogsColumns()..., ) diff --git a/pkg/storage/types.go b/pkg/storage/types.go index dc6c1f7..1b26cfb 100644 --- a/pkg/storage/types.go +++ b/pkg/storage/types.go @@ -47,3 +47,11 @@ func (o *TradeLog) Serialize() []interface{} { o.EventHash, } } + +type BackfillQuery struct { + QueryID int64 `json:"query_id" binding:"required"` + FromBlock uint64 `json:"from_block" binding:"required"` + ToBlock uint64 `json:"to_block" binding:"required"` + EventHash string `json:"event_hash"` + Exchange string `json:"exchange"` +}