diff --git a/v2/cmd/backfill/main.go b/v2/cmd/backfill/main.go index 7edf911..5afe9a6 100644 --- a/v2/cmd/backfill/main.go +++ b/v2/cmd/backfill/main.go @@ -130,7 +130,7 @@ func run(c *cli.Context) error { return fmt.Errorf("cannot create backfill service: %w", err) } - s := server.NewBackfill(l, c.String(libapp.HTTPBackfillServerFlag.Name), srv, parsers) + s := server.NewBackfill(l, c.String(libapp.HTTPBackfillServerFlag.Name), srv) return s.Run() } diff --git a/v2/internal/server/backfill.go b/v2/internal/server/backfill.go index a5febd0..022f4e6 100644 --- a/v2/internal/server/backfill.go +++ b/v2/internal/server/backfill.go @@ -6,7 +6,6 @@ import ( "strconv" "github.com/KyberNetwork/tradelogs/v2/internal/service" - "github.com/KyberNetwork/tradelogs/v2/pkg/parser" "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" "github.com/rs/xid" @@ -19,7 +18,6 @@ type BackfillServer struct { r *gin.Engine bindAddr string service *service.Backfill - parsers []parser.Parser } type Query struct { @@ -28,7 +26,7 @@ type Query struct { Exchange string `json:"exchange" binding:"required"` } -func NewBackfill(l *zap.SugaredLogger, bindAddr string, s *service.Backfill, parsers []parser.Parser) *BackfillServer { +func NewBackfill(l *zap.SugaredLogger, bindAddr string, s *service.Backfill) *BackfillServer { engine := gin.New() engine.Use(gin.Recovery()) @@ -37,7 +35,6 @@ func NewBackfill(l *zap.SugaredLogger, bindAddr string, s *service.Backfill, par r: engine, bindAddr: bindAddr, service: s, - parsers: parsers, } gin.SetMode(gin.ReleaseMode) @@ -89,11 +86,6 @@ func (s *BackfillServer) backfill(c *gin.Context) { return } - if !s.isValidExchange(params.Exchange) { - responseErr(c, fmt.Errorf("invalid exchange %s", params.Exchange)) - return - } - l := s.l.With("reqID", xid.New().String()) l.Infow("receive backfill params", "params", params) @@ -111,15 +103,6 @@ func (s *BackfillServer) backfill(c *gin.Context) { }) } -func (s *BackfillServer) isValidExchange(exchange string) bool { - for _, p := range s.parsers { - if p.Exchange() == exchange { - return true - } - } - return false -} - func (s *BackfillServer) getAllTask(c *gin.Context) { tasks, err := s.service.ListTask() if err != nil { diff --git a/v2/internal/service/backfill.go b/v2/internal/service/backfill.go index 5714105..d9a7a6e 100644 --- a/v2/internal/service/backfill.go +++ b/v2/internal/service/backfill.go @@ -45,6 +45,10 @@ func (s *Backfill) rerunAllTasks() error { func (s *Backfill) NewBackfillTask(from, to uint64, exchange string) (int, string, error) { var message string + if !s.worker.IsValidExchange(exchange) { + return 0, "", fmt.Errorf("invalid exchange %s", exchange) + } + // limit max 10 tasks running at the same time count, err := s.storage.GetRunningTaskNumber() if err != nil { @@ -99,7 +103,7 @@ func (s *Backfill) CancelBackfillTask(id int) error { if task.Status != backfill.StatusTypeProcessing { return fmt.Errorf("task with id %d is not processing, current status: %s", id, task.Status) } - err = s.storage.UpdateTask(task.ID, task.ProcessedBlock, backfill.StatusTypeCanceled) + err = s.storage.UpdateTask(task.ID, nil, backfill.StatusTypeCanceled) if err != nil { return fmt.Errorf("cannot cancel backfill task with id %d: %w", id, err) } @@ -124,10 +128,7 @@ func (s *Backfill) RestartBackfillTask(id int) error { if task.Status == backfill.StatusTypeProcessing || task.Status == backfill.StatusTypeDone { return fmt.Errorf("cannot restart task with id %d, current status: %s", id, task.Status) } - err = s.storage.UpdateTask(task.ID, task.ProcessedBlock, backfill.StatusTypeProcessing) - if err != nil { - return fmt.Errorf("cannot cancel backfill task with id %d: %w", id, err) - } + go s.worker.BackfillByExchange(task) return nil } diff --git a/v2/internal/service/backfill_test.go b/v2/internal/service/backfill_test.go index e1d0bc9..ff38051 100644 --- a/v2/internal/service/backfill_test.go +++ b/v2/internal/service/backfill_test.go @@ -5,6 +5,7 @@ import ( "github.com/KyberNetwork/tradelogs/v2/internal/worker" "github.com/KyberNetwork/tradelogs/v2/mocks" + "github.com/KyberNetwork/tradelogs/v2/pkg/parser" "github.com/KyberNetwork/tradelogs/v2/pkg/storage/backfill" "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/assert" @@ -30,6 +31,7 @@ func TestBackfill_NewBackfillTask(t *testing.T) { backfillStorage := &mocks.MockBackfillStorage{} stateStorage := &mocks.MockState{} rpcClient := &mocks.MockRPCClient{} + mockParser := &mocks.MockParser{} backfillStorage.On("GetTask").Return([]backfill.Task{}, nil). On("CreateTask", mock.Anything).Return(0, nil). @@ -46,7 +48,11 @@ func TestBackfill_NewBackfillTask(t *testing.T) { rpcClient.On("FetchLogs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]types.Log{}, nil) - w := worker.NewBackFiller(nil, backfillStorage, stateStorage, zap.S(), rpcClient, nil) + mockParser.On("Exchange").Return("test"). + On("Address").Return(""). + On("Topics").Return([]string{}) + + w := worker.NewBackFiller(nil, backfillStorage, stateStorage, zap.S(), rpcClient, []parser.Parser{mockParser}) srv, err := NewBackfillService(backfillStorage, zap.S(), w) assert.NoError(t, err) diff --git a/v2/internal/worker/backfiller.go b/v2/internal/worker/backfiller.go index 9e14bc9..a0576f3 100644 --- a/v2/internal/worker/backfiller.go +++ b/v2/internal/worker/backfiller.go @@ -3,6 +3,10 @@ package worker import ( "context" "fmt" + "sort" + "strconv" + "strings" + "github.com/KyberNetwork/tradelogs/v2/pkg/handler" "github.com/KyberNetwork/tradelogs/v2/pkg/parser" "github.com/KyberNetwork/tradelogs/v2/pkg/rpcnode" @@ -10,9 +14,6 @@ import ( "github.com/KyberNetwork/tradelogs/v2/pkg/storage/state" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/sets" - "sort" - "strconv" - "strings" ) type BackFiller struct { @@ -60,6 +61,15 @@ func (w *BackFiller) Run() error { } } +func (w *BackFiller) IsValidExchange(exchange string) bool { + for _, p := range w.parsers { + if p.Exchange() == exchange { + return true + } + } + return false +} + // GetBlockRanges return separated and non-overlapping ranges that cover all backfill ranges func (w *BackFiller) GetBlockRanges() (uint64, uint64, sets.Set[string], error) { states, err := w.backfillStorage.Get() @@ -145,10 +155,17 @@ func (w *BackFiller) BackfillByExchange(task backfill.Task) { return } + // update the status before starting to backfill + err := w.backfillStorage.UpdateTask(task.ID, nil, backfill.StatusTypeProcessing) + if err != nil { + w.l.Errorw("cannot update task status", "task", task.ID, "err", err) + return + } + blocks, exclusions, err := w.getBlockByExchange(from, to, task.Exchange) if err != nil { w.l.Errorw("cannot get block", "task", task, "err", err) - err = w.backfillStorage.UpdateTask(task.ID, task.ToBlock, backfill.StatusTypeFailed) + err = w.backfillStorage.UpdateTask(task.ID, nil, backfill.StatusTypeFailed) if err != nil { w.l.Errorw("cannot update task status", "task", task, "err", err) } @@ -173,22 +190,22 @@ func (w *BackFiller) BackfillByExchange(task backfill.Task) { err = w.processBlock(b, exclusions) if err != nil { w.l.Errorw("cannot backfill block", "task_id", task.ID, "block", b, "err", err) - err = w.backfillStorage.UpdateTask(task.ID, task.ProcessedBlock, backfill.StatusTypeFailed) + err = w.backfillStorage.UpdateTask(task.ID, nil, backfill.StatusTypeFailed) if err != nil { w.l.Errorw("cannot update task status", "task_id", task.ID, "err", err) } return } - // update the processing block - err = w.backfillStorage.UpdateTask(task.ID, b, backfill.StatusTypeProcessing) + // update the processed block + err = w.backfillStorage.UpdateTask(task.ID, &b, "") if err != nil { w.l.Errorw("cannot update task in db", "id", task.ID, "err", err) return } } - err = w.backfillStorage.UpdateTask(task.ID, task.FromBlock, backfill.StatusTypeDone) + err = w.backfillStorage.UpdateTask(task.ID, &task.FromBlock, backfill.StatusTypeDone) if err != nil { w.l.Errorw("cannot update task status", "task_id", task.ID, "err", err) } diff --git a/v2/mocks/BackfillStorage.go b/v2/mocks/BackfillStorage.go index 0131030..f9ac054 100644 --- a/v2/mocks/BackfillStorage.go +++ b/v2/mocks/BackfillStorage.go @@ -193,7 +193,7 @@ func (_m *MockBackfillStorage) Update(backfilled uint64) error { } // UpdateTask provides a mock function with given fields: id, block, status -func (_m *MockBackfillStorage) UpdateTask(id int, block uint64, status string) error { +func (_m *MockBackfillStorage) UpdateTask(id int, block *uint64, status string) error { ret := _m.Called(id, block, status) if len(ret) == 0 { @@ -201,7 +201,7 @@ func (_m *MockBackfillStorage) UpdateTask(id int, block uint64, status string) e } var r0 error - if rf, ok := ret.Get(0).(func(int, uint64, string) error); ok { + if rf, ok := ret.Get(0).(func(int, *uint64, string) error); ok { r0 = rf(id, block, status) } else { r0 = ret.Error(0) diff --git a/v2/mocks/Parser.go b/v2/mocks/Parser.go new file mode 100644 index 0000000..3ddccea --- /dev/null +++ b/v2/mocks/Parser.go @@ -0,0 +1,184 @@ +// Code generated by mockery v2.46.2. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + + pkgtypes "github.com/KyberNetwork/tradelogs/v2/pkg/types" + + tradelogstypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/types" + + types "github.com/ethereum/go-ethereum/core/types" +) + +// MockParser is an autogenerated mock type for the Parser type +type MockParser struct { + mock.Mock +} + +// Address provides a mock function with given fields: +func (_m *MockParser) Address() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Address") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Exchange provides a mock function with given fields: +func (_m *MockParser) Exchange() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Exchange") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// LogFromExchange provides a mock function with given fields: log +func (_m *MockParser) LogFromExchange(log types.Log) bool { + ret := _m.Called(log) + + if len(ret) == 0 { + panic("no return value specified for LogFromExchange") + } + + var r0 bool + if rf, ok := ret.Get(0).(func(types.Log) bool); ok { + r0 = rf(log) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Parse provides a mock function with given fields: log, blockTime +func (_m *MockParser) Parse(log types.Log, blockTime uint64) ([]tradelogstypes.TradeLog, error) { + ret := _m.Called(log, blockTime) + + if len(ret) == 0 { + panic("no return value specified for Parse") + } + + var r0 []tradelogstypes.TradeLog + var r1 error + if rf, ok := ret.Get(0).(func(types.Log, uint64) ([]tradelogstypes.TradeLog, error)); ok { + return rf(log, blockTime) + } + if rf, ok := ret.Get(0).(func(types.Log, uint64) []tradelogstypes.TradeLog); ok { + r0 = rf(log, blockTime) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]tradelogstypes.TradeLog) + } + } + + if rf, ok := ret.Get(1).(func(types.Log, uint64) error); ok { + r1 = rf(log, blockTime) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ParseWithCallFrame provides a mock function with given fields: callFrame, log, blockTime +func (_m *MockParser) ParseWithCallFrame(callFrame pkgtypes.CallFrame, log types.Log, blockTime uint64) ([]tradelogstypes.TradeLog, error) { + ret := _m.Called(callFrame, log, blockTime) + + if len(ret) == 0 { + panic("no return value specified for ParseWithCallFrame") + } + + var r0 []tradelogstypes.TradeLog + var r1 error + if rf, ok := ret.Get(0).(func(pkgtypes.CallFrame, types.Log, uint64) ([]tradelogstypes.TradeLog, error)); ok { + return rf(callFrame, log, blockTime) + } + if rf, ok := ret.Get(0).(func(pkgtypes.CallFrame, types.Log, uint64) []tradelogstypes.TradeLog); ok { + r0 = rf(callFrame, log, blockTime) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]tradelogstypes.TradeLog) + } + } + + if rf, ok := ret.Get(1).(func(pkgtypes.CallFrame, types.Log, uint64) error); ok { + r1 = rf(callFrame, log, blockTime) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Topics provides a mock function with given fields: +func (_m *MockParser) Topics() []string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Topics") + } + + var r0 []string + if rf, ok := ret.Get(0).(func() []string); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + return r0 +} + +// UseTraceCall provides a mock function with given fields: +func (_m *MockParser) UseTraceCall() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for UseTraceCall") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// NewMockParser creates a new instance of MockParser. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockParser(t interface { + mock.TestingT + Cleanup(func()) +}) *MockParser { + mock := &MockParser{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/v2/pkg/storage/backfill/storage.go b/v2/pkg/storage/backfill/storage.go index a72cb62..997aa0f 100644 --- a/v2/pkg/storage/backfill/storage.go +++ b/v2/pkg/storage/backfill/storage.go @@ -14,7 +14,7 @@ type IStorage interface { Update(backfilled uint64) error CreateTask(task Task) (int, error) - UpdateTask(id int, block uint64, status string) error + UpdateTask(id int, block *uint64, status string) error GetTaskByID(taskID int) (Task, error) GetTask() ([]Task, error) DeleteTask(taskID int) error @@ -103,8 +103,26 @@ func (s *Storage) CreateTask(task Task) (int, error) { return id, nil } -func (s *Storage) UpdateTask(id int, block uint64, status string) error { - _, err := s.db.Exec("UPDATE backfill_task SET processed_block = $1, updated_at = $2, status = $3 WHERE id = $4", block, time.Now(), status, id) +func (s *Storage) UpdateTask(id int, block *uint64, status string) error { + b := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Update("backfill_task"). + Set("updated_at", time.Now()) + + if block != nil { + b = b.Set("processed_block", *block) + } + if len(status) != 0 { + b = b.Set("status", status) + } + b = b.Where(squirrel.Eq{"id": id}) + + q, p, err := b.ToSql() + if err != nil { + s.l.Errorw("fail to build update task statement", "error", err) + return fmt.Errorf("failed to build update task statement: %w", err) + } + + _, err = s.db.Exec(q, p...) if err != nil { s.l.Errorw("failed to update backfill task", "err", err, "id", id) return fmt.Errorf("failed to update backfill task: %w", err) @@ -147,7 +165,7 @@ func (s *Storage) DeleteTask(taskID int) error { func (s *Storage) GetRunningTaskNumber() (int, error) { var count int - err := s.db.Select(&count, "SELECT count(*) FROM backfill_task WHERE status = $1", StatusTypeProcessing) + err := s.db.Get(&count, "SELECT count(*) FROM backfill_task WHERE status = $1", StatusTypeProcessing) if err != nil { s.l.Errorw("failed to count backfill", "err", err) return 0, fmt.Errorf("failed to count backfill task: %w", err)