diff --git a/README.md b/README.md index 3593fd9..2d720cd 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,185 @@ -## This is service to indexing tradelogs from event +# Tradelogs -### Re-generate mock file +This is service to indexing tradelogs from event + +# Tradelogs V2 + +## Backfill Server API Documentation + +This server serve the endpoints to manage backfill task: list, create, cancel, restart tasks +### 1. **Backfill Task Creation** + +- **URL**: `/backfill` +- **Method**: `POST` +- **Description**: Creates a new backfill task. +- **Request Body**: + - `from_block` (uint64, required): The starting block number. + - `to_block` (uint64, required): The ending block number. + - `exchange` (string, required): The exchange name. +- **Response**: + - **200 OK**: On success. + ```json + { + "success": true, + "id": "", + "message": "" + } + ``` + - **400 Bad Request**: If there is a validation error (e.g., missing fields, invalid exchange). + ```json + { + "success": false, + "error": "" + } + ``` + - **500 Internal Server Error**: If there is an error during task creation. + ```json + { + "success": false, + "error": "" + } + ``` + +### 2. **Get All Backfill Tasks** + +- **URL**: `/backfill` +- **Method**: `GET` +- **Description**: Retrieves all backfill tasks. +- **Response**: + - **200 OK**: On success. The task with id -1 is the service's default backfill flow. + ```json + { + "success": true, + "tasks": [ + { + "id": -1, + "exchange": "", + "from_block": 20926953, + "to_block": 20962657, + "processed_block": 20962657, + "created_at": "0001-01-01T00:00:00Z", + "updated_at": "0001-01-01T00:00:00Z", + "status": "processing" + }, + { + "id": 1, + "exchange": "zerox", + "from_block": 20962657, + "to_block": 20962658, + "processed_block": 20962657, + "created_at": "2024-10-14T09:07:01.059135Z", + "updated_at": "2024-10-14T17:18:32.814065Z", + "status": "done" + } + ] + } + ``` + - **500 Internal Server Error**: If there is an error retrieving the tasks. + ```json + { + "success": false, + "error": "" + } + ``` + +### 3. **Get Backfill Task By ID** + +- **URL**: `/backfill/:id` +- **Method**: `GET` +- **Description**: Retrieves a specific backfill task by its ID. +- **URL Parameters**: + - `id` (int, required): The task ID. +- **Response**: + - **200 OK**: On success. + ```json + { + "success": true, + "task": { + "id": 1, + "exchange": "zerox", + "from_block": 20962657, + "to_block": 20962658, + "processed_block": 20962657, + "created_at": "2024-10-14T09:07:01.059135Z", + "updated_at": "2024-10-14T17:18:32.814065Z", + "status": "done" + } + } + ``` + - **400 Bad Request**: If the task ID is invalid. + ```json + { + "success": false, + "error": "invalid task id: " + } + ``` + - **500 Internal Server Error**: If there is an error retrieving the task. + ```json + { + "success": false, + "error": "" + } + ``` + +### 4. **Cancel Backfill Task** + +- **URL**: `/backfill/cancel/:id` +- **Method**: `GET` +- **Description**: Cancels a specific backfill task by its ID. +- **URL Parameters**: + - `id` (int, required): The task ID. +- **Response**: + - **200 OK**: On success. + ```json + { + "success": true + } + ``` + - **400 Bad Request**: If the task ID is invalid. + ```json + { + "success": false, + "error": "invalid task id: " + } + ``` + - **500 Internal Server Error**: If there is an error canceling the task. + ```json + { + "success": false, + "error": "" + } + ``` + +### 5. **Restart Backfill Task** + +- **URL**: `/backfill/restart/:id` +- **Method**: `GET` +- **Description**: Restarts a specific backfill task by its ID. +- **URL Parameters**: + - `id` (int, required): The task ID. +- **Response**: + - **200 OK**: On success. + ```json + { + "success": true + } + ``` + - **400 Bad Request**: If the task ID is invalid. + ```json + { + "success": false, + "error": "invalid task id: " + } + ``` + - **500 Internal Server Error**: If there is an error restarting the task. + ```json + { + "success": false, + "error": "" + } + ``` + +## Re-generate mock file First, you need to install `mockery` diff --git a/v2/cmd/backfill/main.go b/v2/cmd/backfill/main.go index 14e19e1..5afe9a6 100644 --- a/v2/cmd/backfill/main.go +++ b/v2/cmd/backfill/main.go @@ -6,6 +6,7 @@ import ( "os" "github.com/KyberNetwork/tradelogs/v2/internal/server" + "github.com/KyberNetwork/tradelogs/v2/internal/service" "github.com/KyberNetwork/tradelogs/v2/internal/worker" libapp "github.com/KyberNetwork/tradelogs/v2/pkg/app" "github.com/KyberNetwork/tradelogs/v2/pkg/handler" @@ -53,6 +54,7 @@ func run(c *cli.Context) error { l.Infow("Starting backfill service") db, err := initDB(c) + l.Infow("init db successfully") if err != nil { return fmt.Errorf("cannot init DB: %w", err) } @@ -123,7 +125,12 @@ func run(c *cli.Context) error { } }() - s := server.NewBackfill(l, c.String(libapp.HTTPBackfillServerFlag.Name), w) + srv, err := service.NewBackfillService(backfillStorage, l, w) + if err != nil { + return fmt.Errorf("cannot create backfill service: %w", err) + } + + s := server.NewBackfill(l, c.String(libapp.HTTPBackfillServerFlag.Name), srv) return s.Run() } diff --git a/v2/cmd/migrations/00003_add_backfill_task.up.sql b/v2/cmd/migrations/00003_add_backfill_task.up.sql new file mode 100644 index 0000000..6aa570a --- /dev/null +++ b/v2/cmd/migrations/00003_add_backfill_task.up.sql @@ -0,0 +1,14 @@ +CREATE TYPE backfill_status AS ENUM ('processing', 'failed', 'done', 'canceled'); + +create table backfill_task +( + id serial primary key, + exchange text not null, + from_block bigint not null, + to_block bigint not null, + processed_block bigint default 0 not null, + created_at timestamptz default now() not null, + updated_at timestamptz default now() not null, + status backfill_status default 'processing'::backfill_status not null +); + diff --git a/v2/cmd/parse_log/main.go b/v2/cmd/parse_log/main.go index db8c969..56730d5 100644 --- a/v2/cmd/parse_log/main.go +++ b/v2/cmd/parse_log/main.go @@ -54,6 +54,7 @@ func run(c *cli.Context) error { l.Infow("Starting log parser service") db, err := initDB(c) + l.Infow("init db successfully") if err != nil { return fmt.Errorf("cannot init DB: %w", err) } @@ -151,7 +152,7 @@ func initDB(c *cli.Context) (*sqlx.DB, error) { return db, nil } -func getMostRecentBlock(l *zap.SugaredLogger, s state.Storage, rpcClient *rpcnode.Client) (uint64, error) { +func getMostRecentBlock(l *zap.SugaredLogger, s state.Storage, rpcClient rpcnode.IClient) (uint64, error) { var blockNumber uint64 block, err := s.GetState(state.ProcessedBlockKey) if err == nil { diff --git a/v2/internal/server/backfill.go b/v2/internal/server/backfill.go index 26aa368..022f4e6 100644 --- a/v2/internal/server/backfill.go +++ b/v2/internal/server/backfill.go @@ -3,8 +3,9 @@ package server import ( "fmt" "net/http" + "strconv" - "github.com/KyberNetwork/tradelogs/v2/internal/worker" + "github.com/KyberNetwork/tradelogs/v2/internal/service" "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" "github.com/rs/xid" @@ -16,7 +17,7 @@ type BackfillServer struct { l *zap.SugaredLogger r *gin.Engine bindAddr string - worker *worker.BackFiller + service *service.Backfill } type Query struct { @@ -25,7 +26,7 @@ type Query struct { Exchange string `json:"exchange" binding:"required"` } -func NewBackfill(l *zap.SugaredLogger, bindAddr string, w *worker.BackFiller) *BackfillServer { +func NewBackfill(l *zap.SugaredLogger, bindAddr string, s *service.Backfill) *BackfillServer { engine := gin.New() engine.Use(gin.Recovery()) @@ -33,7 +34,7 @@ func NewBackfill(l *zap.SugaredLogger, bindAddr string, w *worker.BackFiller) *B l: l, r: engine, bindAddr: bindAddr, - worker: w, + service: s, } gin.SetMode(gin.ReleaseMode) @@ -53,6 +54,10 @@ func (s *BackfillServer) Run() error { func (s *BackfillServer) register() { pprof.Register(s.r, "/debug") s.r.POST("/backfill", s.backfill) + s.r.GET("/backfill", s.getAllTask) + s.r.GET("/backfill/:id", s.getTask) + s.r.GET("/backfill/cancel/:id", s.cancelTask) + s.r.GET("/backfill/restart/:id", s.restartTask) } func responseErr(c *gin.Context, err error) { @@ -62,9 +67,10 @@ func responseErr(c *gin.Context, err error) { }) } -func responseOK(c *gin.Context) { - c.JSON(http.StatusOK, gin.H{ - "success": true, +func internalServerError(c *gin.Context, err error) { + c.JSON(http.StatusInternalServerError, gin.H{ + "success": false, + "error": err.Error(), }) } @@ -83,12 +89,80 @@ func (s *BackfillServer) backfill(c *gin.Context) { l := s.l.With("reqID", xid.New().String()) l.Infow("receive backfill params", "params", params) - err := s.worker.BackfillByExchange(params.FromBlock, params.ToBlock, params.Exchange) + id, message, err := s.service.NewBackfillTask(params.FromBlock, params.ToBlock, params.Exchange) if err != nil { l.Errorw("error when backfill", "error", err) - responseErr(c, err) + internalServerError(c, err) + return + } + + c.JSON(http.StatusOK, gin.H{ + "success": true, + "id": id, + "message": message, + }) +} + +func (s *BackfillServer) getAllTask(c *gin.Context) { + tasks, err := s.service.ListTask() + if err != nil { + internalServerError(c, err) + return + } + c.JSON(http.StatusOK, gin.H{ + "success": true, + "tasks": tasks, + }) +} + +func (s *BackfillServer) getTask(c *gin.Context) { + id := c.Param("id") + taskID, err := strconv.ParseInt(id, 10, 32) + if err != nil || len(id) == 0 { + responseErr(c, fmt.Errorf("invalid task id: %s", id)) + return + } + task, err := s.service.GetTask(int(taskID)) + if err != nil { + internalServerError(c, err) return } + c.JSON(http.StatusOK, gin.H{ + "success": true, + "task": task, + }) +} + +func (s *BackfillServer) cancelTask(c *gin.Context) { + id := c.Param("id") + taskID, err := strconv.ParseInt(id, 10, 32) + if err != nil { + responseErr(c, fmt.Errorf("invalid task id: %w", err)) + return + } + err = s.service.CancelBackfillTask(int(taskID)) + if err != nil { + internalServerError(c, err) + return + } + c.JSON(http.StatusOK, gin.H{ + "success": true, + }) +} - responseOK(c) +func (s *BackfillServer) restartTask(c *gin.Context) { + id := c.Param("id") + taskID, err := strconv.ParseInt(id, 10, 32) + if err != nil { + responseErr(c, fmt.Errorf("invalid task id: %w", err)) + return + } + err = s.service.RestartBackfillTask(int(taskID)) + if err != nil { + internalServerError(c, err) + return + } + c.JSON(http.StatusOK, gin.H{ + "success": true, + }) } diff --git a/v2/internal/service/backfill.go b/v2/internal/service/backfill.go new file mode 100644 index 0000000..d9a7a6e --- /dev/null +++ b/v2/internal/service/backfill.go @@ -0,0 +1,162 @@ +package service + +import ( + "fmt" + + "github.com/KyberNetwork/tradelogs/v2/internal/worker" + "github.com/KyberNetwork/tradelogs/v2/pkg/storage/backfill" + "go.uber.org/zap" +) + +type Backfill struct { + storage backfill.IStorage + l *zap.SugaredLogger + worker *worker.BackFiller +} + +const MaxBackfillTaskNumber = 10 + +func NewBackfillService(storage backfill.IStorage, l *zap.SugaredLogger, w *worker.BackFiller) (*Backfill, error) { + srv := &Backfill{ + storage: storage, + l: l, + worker: w, + } + err := srv.rerunAllTasks() + if err != nil { + return nil, fmt.Errorf("fail to rerun all tasks: %w", err) + } + return srv, nil +} + +func (s *Backfill) rerunAllTasks() error { + tasks, err := s.storage.GetTask() + if err != nil { + return err + } + for _, task := range tasks { + if task.Status == backfill.StatusTypeProcessing || task.Status == backfill.StatusTypeFailed { + go s.worker.BackfillByExchange(task) + } + } + return nil +} + +func (s *Backfill) NewBackfillTask(from, to uint64, exchange string) (int, string, error) { + var message string + + if !s.worker.IsValidExchange(exchange) { + return 0, "", fmt.Errorf("invalid exchange %s", exchange) + } + + // limit max 10 tasks running at the same time + count, err := s.storage.GetRunningTaskNumber() + if err != nil { + return 0, "", fmt.Errorf("fail to get running task number: %w", err) + } + if count >= MaxBackfillTaskNumber { + return 0, "", fmt.Errorf("number of running task exceed: %d", MaxBackfillTaskNumber) + } + + first, last, _, err := s.worker.GetBlockRanges() + if err != nil { + return 0, "", fmt.Errorf("cannot get ongoing backfill block ranges: %w", err) + } + + // new task block range will be processed by the default backfill flow + if from >= first && to <= last { + return 0, "", fmt.Errorf("new range covered by ongoing backfill process") + } + + // disable backfill the older block to simplify the logic + if from < first { + return 0, "", fmt.Errorf("from block cannot smaller than the deployed block") + } + + // new task block range covered partially, set the new range to avoid duplication + if from < last { + from = last + message = fmt.Sprintf("new range partially covered by ongoing backfill process, changed old from_block %d to %d", from, last) + } + + task := backfill.Task{ + FromBlock: from, + ToBlock: to, + Exchange: exchange, + } + id, err := s.storage.CreateTask(task) + if err != nil { + return 0, "", fmt.Errorf("cannot create backfill task: %w", err) + } + + task.ID = id + go s.worker.BackfillByExchange(task) + + return id, message, nil +} + +func (s *Backfill) CancelBackfillTask(id int) error { + task, err := s.storage.GetTaskByID(id) + if err != nil { + return fmt.Errorf("cannot get backfill task with id %d: %w", id, err) + } + if task.Status != backfill.StatusTypeProcessing { + return fmt.Errorf("task with id %d is not processing, current status: %s", id, task.Status) + } + err = s.storage.UpdateTask(task.ID, nil, backfill.StatusTypeCanceled) + if err != nil { + return fmt.Errorf("cannot cancel backfill task with id %d: %w", id, err) + } + return nil +} + +func (s *Backfill) RestartBackfillTask(id int) error { + task, err := s.storage.GetTaskByID(id) + if err != nil { + return fmt.Errorf("cannot get backfill task with id %d: %w", id, err) + } + + // limit max 10 tasks running at the same time + count, err := s.storage.GetRunningTaskNumber() + if err != nil { + return fmt.Errorf("fail to get running task number: %w", err) + } + if count >= MaxBackfillTaskNumber { + return fmt.Errorf("number of running task exceed: %d", MaxBackfillTaskNumber) + } + + if task.Status == backfill.StatusTypeProcessing || task.Status == backfill.StatusTypeDone { + return fmt.Errorf("cannot restart task with id %d, current status: %s", id, task.Status) + } + + go s.worker.BackfillByExchange(task) + return nil +} + +func (s *Backfill) ListTask() ([]backfill.Task, error) { + tasks, err := s.storage.GetTask() + if err != nil { + return nil, err + } + first, last, _, err := s.worker.GetBlockRanges() + if err != nil { + return tasks, nil + } + status := backfill.StatusTypeProcessing + if first >= last { + status = backfill.StatusTypeDone + } + task := backfill.Task{ + ID: -1, + FromBlock: first, + ToBlock: last, + ProcessedBlock: last, + Status: status, + } + tasks = append([]backfill.Task{task}, tasks...) + return tasks, nil +} + +func (s *Backfill) GetTask(id int) (backfill.Task, error) { + return s.storage.GetTaskByID(id) +} diff --git a/v2/internal/service/backfill_test.go b/v2/internal/service/backfill_test.go new file mode 100644 index 0000000..ff38051 --- /dev/null +++ b/v2/internal/service/backfill_test.go @@ -0,0 +1,173 @@ +package service + +import ( + "testing" + + "github.com/KyberNetwork/tradelogs/v2/internal/worker" + "github.com/KyberNetwork/tradelogs/v2/mocks" + "github.com/KyberNetwork/tradelogs/v2/pkg/parser" + "github.com/KyberNetwork/tradelogs/v2/pkg/storage/backfill" + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.uber.org/zap" +) + +func TestBackfill_NewBackfill(t *testing.T) { + backfillStorage := &mocks.MockBackfillStorage{} + stateStorage := &mocks.MockState{} + rpcClient := &mocks.MockRPCClient{} + + backfillStorage.On("GetTask").Return([]backfill.Task{}, nil) + + w := worker.NewBackFiller(nil, backfillStorage, stateStorage, zap.S(), rpcClient, nil) + _, err := NewBackfillService(backfillStorage, zap.S(), w) + assert.NoError(t, err) + + backfillStorage.AssertCalled(t, "GetTask") +} + +func TestBackfill_NewBackfillTask(t *testing.T) { + backfillStorage := &mocks.MockBackfillStorage{} + stateStorage := &mocks.MockState{} + rpcClient := &mocks.MockRPCClient{} + mockParser := &mocks.MockParser{} + + backfillStorage.On("GetTask").Return([]backfill.Task{}, nil). + On("CreateTask", mock.Anything).Return(0, nil). + On("Get").Return( + []backfill.State{ + { + Exchange: "test", + DeployBlock: 20962650, + BackFilledBlock: 20962657, + }, + }, nil). + On("GetRunningTaskNumber").Return(1, nil). + On("UpdateTask", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + rpcClient.On("FetchLogs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]types.Log{}, nil) + + mockParser.On("Exchange").Return("test"). + On("Address").Return(""). + On("Topics").Return([]string{}) + + w := worker.NewBackFiller(nil, backfillStorage, stateStorage, zap.S(), rpcClient, []parser.Parser{mockParser}) + srv, err := NewBackfillService(backfillStorage, zap.S(), w) + assert.NoError(t, err) + + testcases := []struct { + name string + from uint64 + to uint64 + exchange string + expectedFrom uint64 + expectedTo uint64 + hasError bool + }{ + { + name: "fully covered", + from: 20962650, + to: 20962657, + hasError: true, + }, + { + name: "invalid from", + from: 20962645, + to: 20962657, + hasError: true, + }, + { + name: "partially covered", + from: 20962655, + to: 20962659, + exchange: "test", + expectedFrom: 20962657, + expectedTo: 20962659, + hasError: false, + }, + { + name: "normal case", + from: 20962660, + to: 20962665, + exchange: "test", + expectedFrom: 20962660, + expectedTo: 20962665, + hasError: false, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + _, _, err = srv.NewBackfillTask(tc.from, tc.to, tc.exchange) + if tc.hasError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + task := backfill.Task{ + Exchange: tc.exchange, + FromBlock: tc.expectedFrom, + ToBlock: tc.expectedTo, + } + backfillStorage.AssertCalled(t, "CreateTask", task) + } + }) + } +} + +func TestBackfill_ListTask(t *testing.T) { + backfillStorage := &mocks.MockBackfillStorage{} + stateStorage := &mocks.MockState{} + rpcClient := &mocks.MockRPCClient{} + + backfillStorage.On("GetTask").Return( + []backfill.Task{ + { + ID: 1, + Exchange: "test", + FromBlock: 20962660, + ToBlock: 20962667, + ProcessedBlock: 20962665, + Status: backfill.StatusTypeDone, + }, + }, nil). + On("CreateTask", mock.Anything).Return(0, nil). + On("Get").Return( + []backfill.State{ + { + Exchange: "test", + DeployBlock: 20962650, + BackFilledBlock: 20962657, + }, + }, nil) + + rpcClient.On("FetchLogs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]types.Log{}, nil) + + w := worker.NewBackFiller(nil, backfillStorage, stateStorage, zap.S(), rpcClient, nil) + srv, err := NewBackfillService(backfillStorage, zap.S(), w) + assert.NoError(t, err) + + tasks, err := srv.ListTask() + assert.NoError(t, err) + assert.Len(t, tasks, 2) + + expectedTasks := []backfill.Task{ + { + ID: -1, + Exchange: "", + FromBlock: 20962650, + ToBlock: 20962657, + ProcessedBlock: 20962657, + Status: backfill.StatusTypeProcessing, + }, + { + ID: 1, + Exchange: "test", + FromBlock: 20962660, + ToBlock: 20962667, + ProcessedBlock: 20962665, + Status: backfill.StatusTypeDone, + }, + } + assert.Equal(t, expectedTasks, tasks) +} diff --git a/v2/internal/worker/backfiller.go b/v2/internal/worker/backfiller.go index 65c965a..a0576f3 100644 --- a/v2/internal/worker/backfiller.go +++ b/v2/internal/worker/backfiller.go @@ -21,12 +21,12 @@ type BackFiller struct { backfillStorage backfill.IStorage stateStorage state.Storage l *zap.SugaredLogger - rpc *rpcnode.Client + rpc rpcnode.IClient parsers []parser.Parser } func NewBackFiller(handler *handler.TradeLogHandler, backfillStorage backfill.IStorage, stateStorage state.Storage, - l *zap.SugaredLogger, rpc *rpcnode.Client, parsers []parser.Parser) *BackFiller { + l *zap.SugaredLogger, rpc rpcnode.IClient, parsers []parser.Parser) *BackFiller { return &BackFiller{ handler: handler, backfillStorage: backfillStorage, @@ -39,7 +39,7 @@ func NewBackFiller(handler *handler.TradeLogHandler, backfillStorage backfill.IS func (w *BackFiller) Run() error { for { - first, last, err := w.getBlockRanges() + first, last, exclusions, err := w.GetBlockRanges() if err != nil { return fmt.Errorf("cannot get block ranges: %w", err) } @@ -47,7 +47,7 @@ func (w *BackFiller) Run() error { return nil } w.l.Infow("backfill blocks", "first", first, "last", last) - err = w.processBlock(last - 1) + err = w.processBlock(last-1, exclusions) if err != nil { w.l.Errorw("error when backfill block", "block", last-1, "err", err) return fmt.Errorf("cannot process block: %w", err) @@ -61,37 +61,56 @@ func (w *BackFiller) Run() error { } } -// getBlockRanges return separated and non-overlapping ranges that cover all backfill ranges -func (w *BackFiller) getBlockRanges() (uint64, uint64, error) { +func (w *BackFiller) IsValidExchange(exchange string) bool { + for _, p := range w.parsers { + if p.Exchange() == exchange { + return true + } + } + return false +} + +// GetBlockRanges return separated and non-overlapping ranges that cover all backfill ranges +func (w *BackFiller) GetBlockRanges() (uint64, uint64, sets.Set[string], error) { states, err := w.backfillStorage.Get() if err != nil { - return 0, 0, err + return 0, 0, nil, err } if len(states) == 0 { - return 0, 0, nil + return 0, 0, sets.New[string](), nil } + // the result sorted by deploy block ascending, so when all exchanges are completely backfilled, + // the first will be the oldest deploy block among exchanges first, last := states[0].DeployBlock, states[0].BackFilledBlock + exclusions := sets.New[string]() // get the oldest deploy block - for _, state := range states { - first = min(first, state.DeployBlock) + for _, s := range states { + if s.DeployBlock >= s.BackFilledBlock && s.BackFilledBlock != 0 { + exclusions.Insert(s.Exchange) + continue + } + first = min(first, s.DeployBlock) } // get the newest filled block - for _, state := range states { + for _, s := range states { + if exclusions.Has(s.Exchange) { + continue + } // fill new exchange - if state.BackFilledBlock <= 0 { + if s.BackFilledBlock <= 0 { blockNumber, err := w.getRecentBlock() if err != nil { - return 0, 0, err + return 0, 0, nil, err } last = blockNumber break } - last = max(last, state.BackFilledBlock) + last = max(last, s.BackFilledBlock) } - return first, last, nil + return first, last, exclusions, nil } // getRecentBlock get the newest processed block to backfill for new deployed exchange @@ -101,7 +120,7 @@ func (w *BackFiller) getRecentBlock() (uint64, error) { if err == nil { blockNumber, err = strconv.ParseUint(block, 10, 64) if err == nil { - return blockNumber, nil + return blockNumber + 1, nil } } w.l.Errorw("cannot get from db", "err", err) @@ -112,13 +131,13 @@ func (w *BackFiller) getRecentBlock() (uint64, error) { return blockNumber, nil } -func (w *BackFiller) processBlock(blockNumber uint64) error { +func (w *BackFiller) processBlock(blockNumber uint64, exclusions sets.Set[string]) error { block, err := w.rpc.BlockByNumber(context.Background(), blockNumber) if err != nil { return fmt.Errorf("cannot get block %d: %w", blockNumber, err) } - err = w.handler.ProcessBlock(block.Hash().String(), blockNumber, block.Time()) + err = w.handler.ProcessBlockWithExclusion(block.Hash().String(), blockNumber, block.Time(), exclusions) if err != nil { return fmt.Errorf("cannot process block %d: %w", blockNumber, err) } @@ -127,45 +146,92 @@ func (w *BackFiller) processBlock(blockNumber uint64) error { return nil } -func (w *BackFiller) BackfillByExchange(from, to uint64, exchange string) error { - blocks, err := w.getBlockByExchange(from, to, exchange) +func (w *BackFiller) BackfillByExchange(task backfill.Task) { + from, to := task.FromBlock, task.ToBlock + if task.ProcessedBlock > 0 { + to = task.ProcessedBlock - 1 + } + if from > to { + return + } + + // update the status before starting to backfill + err := w.backfillStorage.UpdateTask(task.ID, nil, backfill.StatusTypeProcessing) if err != nil { - return fmt.Errorf("cannot get block %d: %w", from, err) + w.l.Errorw("cannot update task status", "task", task.ID, "err", err) + return + } + + blocks, exclusions, err := w.getBlockByExchange(from, to, task.Exchange) + if err != nil { + w.l.Errorw("cannot get block", "task", task, "err", err) + err = w.backfillStorage.UpdateTask(task.ID, nil, backfill.StatusTypeFailed) + if err != nil { + w.l.Errorw("cannot update task status", "task", task, "err", err) + } + return } - w.l.Infow("start to backfill blocks", "blocks", blocks) + w.l.Infow("start to backfill blocks", "task_id", task.ID, "blocks", blocks) // backfill from the newest blocks, if error occurs we can continue backfill from error block for _, b := range blocks { - err = w.processBlock(b) + // check the task status, stop if canceled + task, err = w.backfillStorage.GetTaskByID(task.ID) + if err != nil { + w.l.Errorw("cannot get backfill task", "task_id", task.ID, "err", err) + return + } + if task.Status == backfill.StatusTypeCanceled { + w.l.Infow("cannot backfill because task is canceled", "task_id", task.ID) + return + } + + err = w.processBlock(b, exclusions) + if err != nil { + w.l.Errorw("cannot backfill block", "task_id", task.ID, "block", b, "err", err) + err = w.backfillStorage.UpdateTask(task.ID, nil, backfill.StatusTypeFailed) + if err != nil { + w.l.Errorw("cannot update task status", "task_id", task.ID, "err", err) + } + return + } + + // update the processed block + err = w.backfillStorage.UpdateTask(task.ID, &b, "") if err != nil { - w.l.Errorw("cannot backfill block", "block", b, "err", err) - return fmt.Errorf("error when backfill to block %d: %w", b, err) + w.l.Errorw("cannot update task in db", "id", task.ID, "err", err) + return } } - return nil + err = w.backfillStorage.UpdateTask(task.ID, &task.FromBlock, backfill.StatusTypeDone) + if err != nil { + w.l.Errorw("cannot update task status", "task_id", task.ID, "err", err) + } } // getBlockByExchange get the blocks having logs of specific exchange, the block number sorted descending -func (w *BackFiller) getBlockByExchange(from, to uint64, exchange string) ([]uint64, error) { +func (w *BackFiller) getBlockByExchange(from, to uint64, exchange string) ([]uint64, sets.Set[string], error) { var ( - address string - topics []string + address string + topics []string + exclusions = sets.New[string]() ) // get exchange address and topics to filter logs for _, p := range w.parsers { if strings.EqualFold(p.Exchange(), exchange) { address = p.Address() topics = p.Topics() - break + continue } + exclusions.Insert(p.Exchange()) } // get logs logs, err := w.rpc.FetchLogs(context.Background(), from, to, address, topics) if err != nil { - return nil, err + return nil, nil, err } // get blocks need to backfill @@ -180,5 +246,5 @@ func (w *BackFiller) getBlockByExchange(from, to uint64, exchange string) ([]uin return blocks[i] > blocks[j] }) - return blocks, nil + return blocks, exclusions, nil } diff --git a/v2/mocks/BackfillStorage.go b/v2/mocks/BackfillStorage.go new file mode 100644 index 0000000..f9ac054 --- /dev/null +++ b/v2/mocks/BackfillStorage.go @@ -0,0 +1,225 @@ +// Code generated by mockery v2.46.2. DO NOT EDIT. + +package mocks + +import ( + backfill "github.com/KyberNetwork/tradelogs/v2/pkg/storage/backfill" + mock "github.com/stretchr/testify/mock" +) + +// MockBackfillStorage is an autogenerated mock type for the IStorage type +type MockBackfillStorage struct { + mock.Mock +} + +// CreateTask provides a mock function with given fields: task +func (_m *MockBackfillStorage) CreateTask(task backfill.Task) (int, error) { + ret := _m.Called(task) + + if len(ret) == 0 { + panic("no return value specified for CreateTask") + } + + var r0 int + var r1 error + if rf, ok := ret.Get(0).(func(backfill.Task) (int, error)); ok { + return rf(task) + } + if rf, ok := ret.Get(0).(func(backfill.Task) int); ok { + r0 = rf(task) + } else { + r0 = ret.Get(0).(int) + } + + if rf, ok := ret.Get(1).(func(backfill.Task) error); ok { + r1 = rf(task) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DeleteTask provides a mock function with given fields: taskID +func (_m *MockBackfillStorage) DeleteTask(taskID int) error { + ret := _m.Called(taskID) + + if len(ret) == 0 { + panic("no return value specified for DeleteTask") + } + + var r0 error + if rf, ok := ret.Get(0).(func(int) error); ok { + r0 = rf(taskID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Get provides a mock function with given fields: +func (_m *MockBackfillStorage) Get() ([]backfill.State, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Get") + } + + var r0 []backfill.State + var r1 error + if rf, ok := ret.Get(0).(func() ([]backfill.State, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() []backfill.State); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]backfill.State) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetRunningTaskNumber provides a mock function with given fields: +func (_m *MockBackfillStorage) GetRunningTaskNumber() (int, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetRunningTaskNumber") + } + + var r0 int + var r1 error + if rf, ok := ret.Get(0).(func() (int, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetTask provides a mock function with given fields: +func (_m *MockBackfillStorage) GetTask() ([]backfill.Task, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetTask") + } + + var r0 []backfill.Task + var r1 error + if rf, ok := ret.Get(0).(func() ([]backfill.Task, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() []backfill.Task); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]backfill.Task) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetTaskByID provides a mock function with given fields: taskID +func (_m *MockBackfillStorage) GetTaskByID(taskID int) (backfill.Task, error) { + ret := _m.Called(taskID) + + if len(ret) == 0 { + panic("no return value specified for GetTaskByID") + } + + var r0 backfill.Task + var r1 error + if rf, ok := ret.Get(0).(func(int) (backfill.Task, error)); ok { + return rf(taskID) + } + if rf, ok := ret.Get(0).(func(int) backfill.Task); ok { + r0 = rf(taskID) + } else { + r0 = ret.Get(0).(backfill.Task) + } + + if rf, ok := ret.Get(1).(func(int) error); ok { + r1 = rf(taskID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Update provides a mock function with given fields: backfilled +func (_m *MockBackfillStorage) Update(backfilled uint64) error { + ret := _m.Called(backfilled) + + if len(ret) == 0 { + panic("no return value specified for Update") + } + + var r0 error + if rf, ok := ret.Get(0).(func(uint64) error); ok { + r0 = rf(backfilled) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// UpdateTask provides a mock function with given fields: id, block, status +func (_m *MockBackfillStorage) UpdateTask(id int, block *uint64, status string) error { + ret := _m.Called(id, block, status) + + if len(ret) == 0 { + panic("no return value specified for UpdateTask") + } + + var r0 error + if rf, ok := ret.Get(0).(func(int, *uint64, string) error); ok { + r0 = rf(id, block, status) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewMockBackfillStorage creates a new instance of MockBackfillStorage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockBackfillStorage(t interface { + mock.TestingT + Cleanup(func()) +}) *MockBackfillStorage { + mock := &MockBackfillStorage{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/v2/mocks/Parser.go b/v2/mocks/Parser.go new file mode 100644 index 0000000..3ddccea --- /dev/null +++ b/v2/mocks/Parser.go @@ -0,0 +1,184 @@ +// Code generated by mockery v2.46.2. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + + pkgtypes "github.com/KyberNetwork/tradelogs/v2/pkg/types" + + tradelogstypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/types" + + types "github.com/ethereum/go-ethereum/core/types" +) + +// MockParser is an autogenerated mock type for the Parser type +type MockParser struct { + mock.Mock +} + +// Address provides a mock function with given fields: +func (_m *MockParser) Address() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Address") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Exchange provides a mock function with given fields: +func (_m *MockParser) Exchange() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Exchange") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// LogFromExchange provides a mock function with given fields: log +func (_m *MockParser) LogFromExchange(log types.Log) bool { + ret := _m.Called(log) + + if len(ret) == 0 { + panic("no return value specified for LogFromExchange") + } + + var r0 bool + if rf, ok := ret.Get(0).(func(types.Log) bool); ok { + r0 = rf(log) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Parse provides a mock function with given fields: log, blockTime +func (_m *MockParser) Parse(log types.Log, blockTime uint64) ([]tradelogstypes.TradeLog, error) { + ret := _m.Called(log, blockTime) + + if len(ret) == 0 { + panic("no return value specified for Parse") + } + + var r0 []tradelogstypes.TradeLog + var r1 error + if rf, ok := ret.Get(0).(func(types.Log, uint64) ([]tradelogstypes.TradeLog, error)); ok { + return rf(log, blockTime) + } + if rf, ok := ret.Get(0).(func(types.Log, uint64) []tradelogstypes.TradeLog); ok { + r0 = rf(log, blockTime) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]tradelogstypes.TradeLog) + } + } + + if rf, ok := ret.Get(1).(func(types.Log, uint64) error); ok { + r1 = rf(log, blockTime) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ParseWithCallFrame provides a mock function with given fields: callFrame, log, blockTime +func (_m *MockParser) ParseWithCallFrame(callFrame pkgtypes.CallFrame, log types.Log, blockTime uint64) ([]tradelogstypes.TradeLog, error) { + ret := _m.Called(callFrame, log, blockTime) + + if len(ret) == 0 { + panic("no return value specified for ParseWithCallFrame") + } + + var r0 []tradelogstypes.TradeLog + var r1 error + if rf, ok := ret.Get(0).(func(pkgtypes.CallFrame, types.Log, uint64) ([]tradelogstypes.TradeLog, error)); ok { + return rf(callFrame, log, blockTime) + } + if rf, ok := ret.Get(0).(func(pkgtypes.CallFrame, types.Log, uint64) []tradelogstypes.TradeLog); ok { + r0 = rf(callFrame, log, blockTime) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]tradelogstypes.TradeLog) + } + } + + if rf, ok := ret.Get(1).(func(pkgtypes.CallFrame, types.Log, uint64) error); ok { + r1 = rf(callFrame, log, blockTime) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Topics provides a mock function with given fields: +func (_m *MockParser) Topics() []string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Topics") + } + + var r0 []string + if rf, ok := ret.Get(0).(func() []string); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + return r0 +} + +// UseTraceCall provides a mock function with given fields: +func (_m *MockParser) UseTraceCall() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for UseTraceCall") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// NewMockParser creates a new instance of MockParser. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockParser(t interface { + mock.TestingT + Cleanup(func()) +}) *MockParser { + mock := &MockParser{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/v2/mocks/RPCClient.go b/v2/mocks/RPCClient.go new file mode 100644 index 0000000..e541e1d --- /dev/null +++ b/v2/mocks/RPCClient.go @@ -0,0 +1,179 @@ +// Code generated by mockery v2.46.2. DO NOT EDIT. + +package mocks + +import ( + context "context" + + pkgtypes "github.com/KyberNetwork/tradelogs/v2/pkg/types" + mock "github.com/stretchr/testify/mock" + + types "github.com/ethereum/go-ethereum/core/types" +) + +// MockRPCClient is an autogenerated mock type for the IClient type +type MockRPCClient struct { + mock.Mock +} + +// BlockByNumber provides a mock function with given fields: ctx, blockNumber +func (_m *MockRPCClient) BlockByNumber(ctx context.Context, blockNumber uint64) (*types.Block, error) { + ret := _m.Called(ctx, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for BlockByNumber") + } + + var r0 *types.Block + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) (*types.Block, error)); ok { + return rf(ctx, blockNumber) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64) *types.Block); ok { + r0 = rf(ctx, blockNumber) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64) error); ok { + r1 = rf(ctx, blockNumber) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// FetchLogs provides a mock function with given fields: ctx, from, to, address, topics +func (_m *MockRPCClient) FetchLogs(ctx context.Context, from uint64, to uint64, address string, topics []string) ([]types.Log, error) { + ret := _m.Called(ctx, from, to, address, topics) + + if len(ret) == 0 { + panic("no return value specified for FetchLogs") + } + + var r0 []types.Log + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, string, []string) ([]types.Log, error)); ok { + return rf(ctx, from, to, address, topics) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, string, []string) []types.Log); ok { + r0 = rf(ctx, from, to, address, topics) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]types.Log) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64, uint64, string, []string) error); ok { + r1 = rf(ctx, from, to, address, topics) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// FetchLogsByBlockHash provides a mock function with given fields: ctx, blockHash +func (_m *MockRPCClient) FetchLogsByBlockHash(ctx context.Context, blockHash string) ([]types.Log, error) { + ret := _m.Called(ctx, blockHash) + + if len(ret) == 0 { + panic("no return value specified for FetchLogsByBlockHash") + } + + var r0 []types.Log + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]types.Log, error)); ok { + return rf(ctx, blockHash) + } + if rf, ok := ret.Get(0).(func(context.Context, string) []types.Log); ok { + r0 = rf(ctx, blockHash) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]types.Log) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, blockHash) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// FetchTraceCalls provides a mock function with given fields: ctx, blockHash +func (_m *MockRPCClient) FetchTraceCalls(ctx context.Context, blockHash string) ([]pkgtypes.TransactionCallFrame, error) { + ret := _m.Called(ctx, blockHash) + + if len(ret) == 0 { + panic("no return value specified for FetchTraceCalls") + } + + var r0 []pkgtypes.TransactionCallFrame + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]pkgtypes.TransactionCallFrame, error)); ok { + return rf(ctx, blockHash) + } + if rf, ok := ret.Get(0).(func(context.Context, string) []pkgtypes.TransactionCallFrame); ok { + r0 = rf(ctx, blockHash) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]pkgtypes.TransactionCallFrame) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, blockHash) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetBlockNumber provides a mock function with given fields: ctx +func (_m *MockRPCClient) GetBlockNumber(ctx context.Context) (uint64, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetBlockNumber") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) uint64); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewMockRPCClient creates a new instance of MockRPCClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockRPCClient(t interface { + mock.TestingT + Cleanup(func()) +}) *MockRPCClient { + mock := &MockRPCClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/v2/pkg/handler/trade_logs.go b/v2/pkg/handler/trade_logs.go index 0b687ce..6c958c3 100644 --- a/v2/pkg/handler/trade_logs.go +++ b/v2/pkg/handler/trade_logs.go @@ -15,11 +15,12 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" ethereumTypes "github.com/ethereum/go-ethereum/core/types" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/sets" ) type TradeLogHandler struct { l *zap.SugaredLogger - rpcClient *rpcnode.Client + rpcClient rpcnode.IClient storage *tradelogs.Manager parsers []parser.Parser kafkaTopic string @@ -34,7 +35,7 @@ type logMetadata struct { timestamp uint64 } -func NewTradeLogHandler(l *zap.SugaredLogger, rpc *rpcnode.Client, storage *tradelogs.Manager, parsers []parser.Parser, +func NewTradeLogHandler(l *zap.SugaredLogger, rpc rpcnode.IClient, storage *tradelogs.Manager, parsers []parser.Parser, kafkaTopic string, publisher kafka.Publisher) *TradeLogHandler { return &TradeLogHandler{ l: l, @@ -47,8 +48,12 @@ func NewTradeLogHandler(l *zap.SugaredLogger, rpc *rpcnode.Client, storage *trad } func (h *TradeLogHandler) ProcessBlock(blockHash string, blockNumber uint64, timestamp uint64) error { + return h.ProcessBlockWithExclusion(blockHash, blockNumber, timestamp, sets.New[string]()) +} + +func (h *TradeLogHandler) ProcessBlockWithExclusion(blockHash string, blockNumber uint64, timestamp uint64, exclusions sets.Set[string]) error { // remove old trade log in db of processing block - err := h.storage.Delete([]uint64{blockNumber}) + err := h.storage.DeleteWithExclusions([]uint64{blockNumber}, exclusions) if err != nil { return fmt.Errorf("delete blocks error: %w", err) } @@ -70,7 +75,7 @@ func (h *TradeLogHandler) ProcessBlock(blockHash string, blockNumber uint64, tim timestamp: timestamp, } - tradeLogs := h.processCallFrame(call.CallFrame, metadata) + tradeLogs := h.processCallFrame(call.CallFrame, metadata, exclusions) if len(tradeLogs) == 0 { continue } @@ -113,12 +118,12 @@ func (h *TradeLogHandler) ProcessBlock(blockHash string, blockNumber uint64, tim return nil } -func (h *TradeLogHandler) processCallFrame(call types.CallFrame, metadata logMetadata) []storageTypes.TradeLog { +func (h *TradeLogHandler) processCallFrame(call types.CallFrame, metadata logMetadata, exclusions sets.Set[string]) []storageTypes.TradeLog { result := make([]storageTypes.TradeLog, 0) // process the sub trace calls for _, traceCall := range call.Calls { - tradeLogs := h.processCallFrame(traceCall, metadata) + tradeLogs := h.processCallFrame(traceCall, metadata, exclusions) result = append(result, tradeLogs...) } @@ -137,7 +142,7 @@ func (h *TradeLogHandler) processCallFrame(call types.CallFrame, metadata logMet // find the corresponding parser p := h.findMatchingParser(ethLog) - if p == nil { + if p == nil || exclusions.Has(p.Exchange()) { continue } diff --git a/v2/pkg/rpcnode/client.go b/v2/pkg/rpcnode/client.go index c83ba84..cbf17e8 100644 --- a/v2/pkg/rpcnode/client.go +++ b/v2/pkg/rpcnode/client.go @@ -13,6 +13,14 @@ import ( "go.uber.org/zap" ) +type IClient interface { + FetchTraceCalls(ctx context.Context, blockHash string) ([]types.TransactionCallFrame, error) + FetchLogsByBlockHash(ctx context.Context, blockHash string) ([]ethereumTypes.Log, error) + GetBlockNumber(ctx context.Context) (uint64, error) + BlockByNumber(ctx context.Context, blockNumber uint64) (*ethereumTypes.Block, error) + FetchLogs(ctx context.Context, from, to uint64, address string, topics []string) ([]ethereumTypes.Log, error) +} + type Client struct { l *zap.SugaredLogger ethClient []*ethclient.Client diff --git a/v2/pkg/storage/backfill/storage.go b/v2/pkg/storage/backfill/storage.go index b4effa8..997aa0f 100644 --- a/v2/pkg/storage/backfill/storage.go +++ b/v2/pkg/storage/backfill/storage.go @@ -2,6 +2,9 @@ package backfill import ( "fmt" + "time" + + "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" "go.uber.org/zap" ) @@ -9,6 +12,13 @@ import ( type IStorage interface { Get() ([]State, error) Update(backfilled uint64) error + + CreateTask(task Task) (int, error) + UpdateTask(id int, block *uint64, status string) error + GetTaskByID(taskID int) (Task, error) + GetTask() ([]Task, error) + DeleteTask(taskID int) error + GetRunningTaskNumber() (int, error) } type Storage struct { @@ -16,11 +26,30 @@ type Storage struct { l *zap.SugaredLogger } -type State struct { - Exchange string `db:"exchange" json:"exchange"` - DeployBlock uint64 `db:"deploy_block" json:"deploy_block"` - BackFilledBlock uint64 `db:"backfilled_block" json:"backfilled_block"` -} +type ( + State struct { + Exchange string `db:"exchange" json:"exchange"` + DeployBlock uint64 `db:"deploy_block" json:"deploy_block"` + BackFilledBlock uint64 `db:"backfilled_block" json:"backfilled_block"` + } + Task struct { + ID int `db:"id" json:"id"` + Exchange string `db:"exchange" json:"exchange"` + FromBlock uint64 `db:"from_block" json:"from_block"` + ToBlock uint64 `db:"to_block" json:"to_block"` + ProcessedBlock uint64 `db:"processed_block" json:"processed_block"` + CreatedAt time.Time `db:"created_at" json:"created_at,omitempty"` + UpdatedAt time.Time `db:"updated_at" json:"updated_at,omitempty"` + Status string `db:"status" json:"status"` + } +) + +const ( + StatusTypeProcessing = "processing" + StatusTypeFailed = "failed" + StatusTypeDone = "done" + StatusTypeCanceled = "canceled" +) func New(l *zap.SugaredLogger, db *sqlx.DB) *Storage { return &Storage{ @@ -31,8 +60,7 @@ func New(l *zap.SugaredLogger, db *sqlx.DB) *Storage { func (s *Storage) Get() ([]State, error) { var infos []State - // we only need exchanges that not fully filled - err := s.db.Select(&infos, "SELECT * FROM backfill WHERE backfilled_block > deploy_block OR backfilled_block = 0") + err := s.db.Select(&infos, "SELECT * FROM backfill ORDER BY deploy_block ASC") if err != nil { s.l.Errorw("failed to fetch all backfill", "err", err) return nil, fmt.Errorf("failed to fetch all backfill state: %w", err) @@ -55,3 +83,92 @@ func (s *Storage) Update(backfilled uint64) error { s.l.Infow("backfill updated", "rowsAffected", rowsAffected) return nil } + +func (s *Storage) CreateTask(task Task) (int, error) { + b := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Insert("backfill_task"). + Columns([]string{"exchange", "from_block", "to_block"}...). + Values(task.Exchange, task.FromBlock, task.ToBlock). + Suffix("RETURNING id") + q, p, err := b.ToSql() + if err != nil { + s.l.Errorw("fail to build insert task statement", "error", err) + return 0, fmt.Errorf("fail to build insert task statement: %w", err) + } + var id int + if err = s.db.QueryRow(q, p...).Scan(&id); err != nil { + s.l.Errorw("fail to exec insert backfill task", "sql", q, "arg", p, "error", err) + return 0, fmt.Errorf("fail to exec insert backfill task: %w", err) + } + return id, nil +} + +func (s *Storage) UpdateTask(id int, block *uint64, status string) error { + b := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Update("backfill_task"). + Set("updated_at", time.Now()) + + if block != nil { + b = b.Set("processed_block", *block) + } + if len(status) != 0 { + b = b.Set("status", status) + } + b = b.Where(squirrel.Eq{"id": id}) + + q, p, err := b.ToSql() + if err != nil { + s.l.Errorw("fail to build update task statement", "error", err) + return fmt.Errorf("failed to build update task statement: %w", err) + } + + _, err = s.db.Exec(q, p...) + if err != nil { + s.l.Errorw("failed to update backfill task", "err", err, "id", id) + return fmt.Errorf("failed to update backfill task: %w", err) + } + return nil +} + +func (s *Storage) GetTaskByID(taskID int) (Task, error) { + var tasks []Task + err := s.db.Select(&tasks, "SELECT * FROM backfill_task WHERE id=$1", taskID) + if err != nil { + s.l.Errorw("failed to fetch backfill task", "err", err, "id", taskID) + return Task{}, fmt.Errorf("failed to fetch all backfill task: %w", err) + } + if len(tasks) != 1 { + s.l.Errorw("invalid result length", "id", taskID, "length", len(tasks)) + return Task{}, fmt.Errorf("cannot query task with id %d", taskID) + } + return tasks[0], nil +} + +func (s *Storage) GetTask() ([]Task, error) { + var task []Task + err := s.db.Select(&task, "SELECT * FROM backfill_task") + if err != nil { + s.l.Errorw("failed to fetch all backfill", "err", err) + return nil, fmt.Errorf("failed to fetch all backfill task: %w", err) + } + return task, nil +} + +func (s *Storage) DeleteTask(taskID int) error { + _, err := s.db.Exec("DELETE FROM backfill_task WHERE id=$1", taskID) + if err != nil { + s.l.Errorw("failed to delete backfill task", "err", err, "id", taskID) + return fmt.Errorf("failed to delete backfill task: %w", err) + } + return nil +} + +func (s *Storage) GetRunningTaskNumber() (int, error) { + var count int + err := s.db.Get(&count, "SELECT count(*) FROM backfill_task WHERE status = $1", StatusTypeProcessing) + if err != nil { + s.l.Errorw("failed to count backfill", "err", err) + return 0, fmt.Errorf("failed to count backfill task: %w", err) + } + return count, nil +} diff --git a/v2/pkg/storage/tradelogs/manager.go b/v2/pkg/storage/tradelogs/manager.go index d24ce18..598bc66 100644 --- a/v2/pkg/storage/tradelogs/manager.go +++ b/v2/pkg/storage/tradelogs/manager.go @@ -5,6 +5,7 @@ import ( "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/types" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/sets" ) type Manager struct { @@ -64,3 +65,20 @@ func (m *Manager) Delete(blocks []uint64) error { m.l.Infow("deleted trade logs", "blocks", blocks) return nil } + +func (m *Manager) DeleteWithExclusions(blocks []uint64, exclusions sets.Set[string]) error { + if len(blocks) == 0 { + return nil + } + for _, storage := range m.storages { + if exclusions.Has(storage.Exchange()) { + continue + } + err := storage.Delete(blocks) + if err != nil { + return fmt.Errorf("delete trade logs failed: %w", err) + } + } + m.l.Infow("deleted trade logs", "blocks", blocks) + return nil +}