Skip to content

Commit

Permalink
TRD-637 update backfill task update function
Browse files Browse the repository at this point in the history
  • Loading branch information
linhnt3400 committed Oct 16, 2024
1 parent b1ab5fc commit 0e738ed
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 39 deletions.
2 changes: 1 addition & 1 deletion v2/cmd/backfill/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func run(c *cli.Context) error {
return fmt.Errorf("cannot create backfill service: %w", err)
}

s := server.NewBackfill(l, c.String(libapp.HTTPBackfillServerFlag.Name), srv, parsers)
s := server.NewBackfill(l, c.String(libapp.HTTPBackfillServerFlag.Name), srv)

return s.Run()
}
Expand Down
19 changes: 1 addition & 18 deletions v2/internal/server/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strconv"

"github.com/KyberNetwork/tradelogs/v2/internal/service"
"github.com/KyberNetwork/tradelogs/v2/pkg/parser"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"github.com/rs/xid"
Expand All @@ -19,7 +18,6 @@ type BackfillServer struct {
r *gin.Engine
bindAddr string
service *service.Backfill
parsers []parser.Parser
}

type Query struct {
Expand All @@ -28,7 +26,7 @@ type Query struct {
Exchange string `json:"exchange" binding:"required"`
}

func NewBackfill(l *zap.SugaredLogger, bindAddr string, s *service.Backfill, parsers []parser.Parser) *BackfillServer {
func NewBackfill(l *zap.SugaredLogger, bindAddr string, s *service.Backfill) *BackfillServer {
engine := gin.New()
engine.Use(gin.Recovery())

Expand All @@ -37,7 +35,6 @@ func NewBackfill(l *zap.SugaredLogger, bindAddr string, s *service.Backfill, par
r: engine,
bindAddr: bindAddr,
service: s,
parsers: parsers,
}

gin.SetMode(gin.ReleaseMode)
Expand Down Expand Up @@ -89,11 +86,6 @@ func (s *BackfillServer) backfill(c *gin.Context) {
return
}

if !s.isValidExchange(params.Exchange) {
responseErr(c, fmt.Errorf("invalid exchange %s", params.Exchange))
return
}

l := s.l.With("reqID", xid.New().String())
l.Infow("receive backfill params", "params", params)

Expand All @@ -111,15 +103,6 @@ func (s *BackfillServer) backfill(c *gin.Context) {
})
}

func (s *BackfillServer) isValidExchange(exchange string) bool {
for _, p := range s.parsers {
if p.Exchange() == exchange {
return true
}
}
return false
}

func (s *BackfillServer) getAllTask(c *gin.Context) {
tasks, err := s.service.ListTask()
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions v2/internal/service/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (s *Backfill) rerunAllTasks() error {
func (s *Backfill) NewBackfillTask(from, to uint64, exchange string) (int, string, error) {
var message string

if !s.worker.IsValidExchange(exchange) {
return 0, "", fmt.Errorf("invalid exchange %s", exchange)
}

// limit max 10 tasks running at the same time
count, err := s.storage.GetRunningTaskNumber()
if err != nil {
Expand Down Expand Up @@ -99,7 +103,7 @@ func (s *Backfill) CancelBackfillTask(id int) error {
if task.Status != backfill.StatusTypeProcessing {
return fmt.Errorf("task with id %d is not processing, current status: %s", id, task.Status)
}
err = s.storage.UpdateTask(task.ID, task.ProcessedBlock, backfill.StatusTypeCanceled)
err = s.storage.UpdateTask(task.ID, nil, backfill.StatusTypeCanceled)
if err != nil {
return fmt.Errorf("cannot cancel backfill task with id %d: %w", id, err)
}
Expand All @@ -124,10 +128,7 @@ func (s *Backfill) RestartBackfillTask(id int) error {
if task.Status == backfill.StatusTypeProcessing || task.Status == backfill.StatusTypeDone {
return fmt.Errorf("cannot restart task with id %d, current status: %s", id, task.Status)
}
err = s.storage.UpdateTask(task.ID, task.ProcessedBlock, backfill.StatusTypeProcessing)
if err != nil {
return fmt.Errorf("cannot cancel backfill task with id %d: %w", id, err)
}

go s.worker.BackfillByExchange(task)
return nil
}
Expand Down
8 changes: 7 additions & 1 deletion v2/internal/service/backfill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/KyberNetwork/tradelogs/v2/internal/worker"
"github.com/KyberNetwork/tradelogs/v2/mocks"
"github.com/KyberNetwork/tradelogs/v2/pkg/parser"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/backfill"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
Expand All @@ -30,6 +31,7 @@ func TestBackfill_NewBackfillTask(t *testing.T) {
backfillStorage := &mocks.MockBackfillStorage{}
stateStorage := &mocks.MockState{}
rpcClient := &mocks.MockRPCClient{}
mockParser := &mocks.MockParser{}

backfillStorage.On("GetTask").Return([]backfill.Task{}, nil).
On("CreateTask", mock.Anything).Return(0, nil).
Expand All @@ -46,7 +48,11 @@ func TestBackfill_NewBackfillTask(t *testing.T) {

rpcClient.On("FetchLogs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]types.Log{}, nil)

w := worker.NewBackFiller(nil, backfillStorage, stateStorage, zap.S(), rpcClient, nil)
mockParser.On("Exchange").Return("test").
On("Address").Return("").
On("Topics").Return([]string{})

w := worker.NewBackFiller(nil, backfillStorage, stateStorage, zap.S(), rpcClient, []parser.Parser{mockParser})
srv, err := NewBackfillService(backfillStorage, zap.S(), w)
assert.NoError(t, err)

Expand Down
33 changes: 25 additions & 8 deletions v2/internal/worker/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package worker
import (
"context"
"fmt"
"sort"
"strconv"
"strings"

"github.com/KyberNetwork/tradelogs/v2/pkg/handler"
"github.com/KyberNetwork/tradelogs/v2/pkg/parser"
"github.com/KyberNetwork/tradelogs/v2/pkg/rpcnode"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/backfill"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/state"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/sets"
"sort"
"strconv"
"strings"
)

type BackFiller struct {
Expand Down Expand Up @@ -60,6 +61,15 @@ func (w *BackFiller) Run() error {
}
}

func (w *BackFiller) IsValidExchange(exchange string) bool {
for _, p := range w.parsers {
if p.Exchange() == exchange {
return true
}
}
return false
}

// GetBlockRanges return separated and non-overlapping ranges that cover all backfill ranges
func (w *BackFiller) GetBlockRanges() (uint64, uint64, sets.Set[string], error) {
states, err := w.backfillStorage.Get()
Expand Down Expand Up @@ -145,10 +155,17 @@ func (w *BackFiller) BackfillByExchange(task backfill.Task) {
return
}

// update the status before starting to backfill
err := w.backfillStorage.UpdateTask(task.ID, nil, backfill.StatusTypeProcessing)
if err != nil {
w.l.Errorw("cannot update task status", "task", task.ID, "err", err)
return
}

blocks, exclusions, err := w.getBlockByExchange(from, to, task.Exchange)
if err != nil {
w.l.Errorw("cannot get block", "task", task, "err", err)
err = w.backfillStorage.UpdateTask(task.ID, task.ToBlock, backfill.StatusTypeFailed)
err = w.backfillStorage.UpdateTask(task.ID, nil, backfill.StatusTypeFailed)
if err != nil {
w.l.Errorw("cannot update task status", "task", task, "err", err)
}
Expand All @@ -173,22 +190,22 @@ func (w *BackFiller) BackfillByExchange(task backfill.Task) {
err = w.processBlock(b, exclusions)
if err != nil {
w.l.Errorw("cannot backfill block", "task_id", task.ID, "block", b, "err", err)
err = w.backfillStorage.UpdateTask(task.ID, task.ProcessedBlock, backfill.StatusTypeFailed)
err = w.backfillStorage.UpdateTask(task.ID, nil, backfill.StatusTypeFailed)
if err != nil {
w.l.Errorw("cannot update task status", "task_id", task.ID, "err", err)
}
return
}

// update the processing block
err = w.backfillStorage.UpdateTask(task.ID, b, backfill.StatusTypeProcessing)
// update the processed block
err = w.backfillStorage.UpdateTask(task.ID, &b, "")
if err != nil {
w.l.Errorw("cannot update task in db", "id", task.ID, "err", err)
return
}
}

err = w.backfillStorage.UpdateTask(task.ID, task.FromBlock, backfill.StatusTypeDone)
err = w.backfillStorage.UpdateTask(task.ID, &task.FromBlock, backfill.StatusTypeDone)
if err != nil {
w.l.Errorw("cannot update task status", "task_id", task.ID, "err", err)
}
Expand Down
4 changes: 2 additions & 2 deletions v2/mocks/BackfillStorage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0e738ed

Please sign in to comment.