Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TRD-637 Backfill Task Management for Tradelogs v2 #83

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 181 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,185 @@
## This is service to indexing tradelogs from event
# Tradelogs

### Re-generate mock file
This is service to indexing tradelogs from event

# Tradelogs V2

## Backfill Server API Documentation

This server serve the endpoints to manage backfill task: list, create, cancel, restart tasks
### 1. **Backfill Task Creation**

- **URL**: `/backfill`
- **Method**: `POST`
- **Description**: Creates a new backfill task.
- **Request Body**:
- `from_block` (uint64, required): The starting block number.
- `to_block` (uint64, required): The ending block number.
- `exchange` (string, required): The exchange name.
- **Response**:
- **200 OK**: On success.
```json
{
"success": true,
"id": "<task_id>",
"message": "<message>"
}
```
- **400 Bad Request**: If there is a validation error (e.g., missing fields, invalid exchange).
```json
{
"success": false,
"error": "<error_message>"
}
```
- **500 Internal Server Error**: If there is an error during task creation.
```json
{
"success": false,
"error": "<error_message>"
}
```

### 2. **Get All Backfill Tasks**

- **URL**: `/backfill`
- **Method**: `GET`
- **Description**: Retrieves all backfill tasks.
- **Response**:
- **200 OK**: On success. The task with id -1 is the service's default backfill flow.
```json
{
"success": true,
"tasks": [
{
"id": -1,
"exchange": "",
"from_block": 20926953,
"to_block": 20962657,
"processed_block": 20962657,
"created_at": "0001-01-01T00:00:00Z",
"updated_at": "0001-01-01T00:00:00Z",
"status": "processing"
},
{
"id": 1,
"exchange": "zerox",
"from_block": 20962657,
"to_block": 20962658,
"processed_block": 20962657,
"created_at": "2024-10-14T09:07:01.059135Z",
"updated_at": "2024-10-14T17:18:32.814065Z",
"status": "done"
}
]
}
```
- **500 Internal Server Error**: If there is an error retrieving the tasks.
```json
{
"success": false,
"error": "<error_message>"
}
```

### 3. **Get Backfill Task By ID**

- **URL**: `/backfill/:id`
- **Method**: `GET`
- **Description**: Retrieves a specific backfill task by its ID.
- **URL Parameters**:
- `id` (int, required): The task ID.
- **Response**:
- **200 OK**: On success.
```json
{
"success": true,
"task": {
"id": 1,
"exchange": "zerox",
"from_block": 20962657,
"to_block": 20962658,
"processed_block": 20962657,
"created_at": "2024-10-14T09:07:01.059135Z",
"updated_at": "2024-10-14T17:18:32.814065Z",
"status": "done"
}
}
```
- **400 Bad Request**: If the task ID is invalid.
```json
{
"success": false,
"error": "invalid task id: <id>"
}
```
- **500 Internal Server Error**: If there is an error retrieving the task.
```json
{
"success": false,
"error": "<error_message>"
}
```

### 4. **Cancel Backfill Task**

- **URL**: `/backfill/cancel/:id`
- **Method**: `GET`
- **Description**: Cancels a specific backfill task by its ID.
- **URL Parameters**:
- `id` (int, required): The task ID.
- **Response**:
- **200 OK**: On success.
```json
{
"success": true
}
```
- **400 Bad Request**: If the task ID is invalid.
```json
{
"success": false,
"error": "invalid task id: <error_message>"
}
```
- **500 Internal Server Error**: If there is an error canceling the task.
```json
{
"success": false,
"error": "<error_message>"
}
```

### 5. **Restart Backfill Task**

- **URL**: `/backfill/restart/:id`
- **Method**: `GET`
- **Description**: Restarts a specific backfill task by its ID.
- **URL Parameters**:
- `id` (int, required): The task ID.
- **Response**:
- **200 OK**: On success.
```json
{
"success": true
}
```
- **400 Bad Request**: If the task ID is invalid.
```json
{
"success": false,
"error": "invalid task id: <error_message>"
}
```
- **500 Internal Server Error**: If there is an error restarting the task.
```json
{
"success": false,
"error": "<error_message>"
}
```

## Re-generate mock file

First, you need to install `mockery`

Expand Down
9 changes: 8 additions & 1 deletion v2/cmd/backfill/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

"github.com/KyberNetwork/tradelogs/v2/internal/server"
"github.com/KyberNetwork/tradelogs/v2/internal/service"
"github.com/KyberNetwork/tradelogs/v2/internal/worker"
libapp "github.com/KyberNetwork/tradelogs/v2/pkg/app"
"github.com/KyberNetwork/tradelogs/v2/pkg/handler"
Expand Down Expand Up @@ -53,6 +54,7 @@ func run(c *cli.Context) error {
l.Infow("Starting backfill service")

db, err := initDB(c)
l.Infow("init db successfully")
if err != nil {
return fmt.Errorf("cannot init DB: %w", err)
}
Expand Down Expand Up @@ -123,7 +125,12 @@ func run(c *cli.Context) error {
}
}()

s := server.NewBackfill(l, c.String(libapp.HTTPBackfillServerFlag.Name), w)
srv, err := service.NewBackfillService(backfillStorage, l, w)
if err != nil {
return fmt.Errorf("cannot create backfill service: %w", err)
}

s := server.NewBackfill(l, c.String(libapp.HTTPBackfillServerFlag.Name), srv)

return s.Run()
}
Expand Down
14 changes: 14 additions & 0 deletions v2/cmd/migrations/00003_add_backfill_task.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TYPE backfill_status AS ENUM ('processing', 'failed', 'done', 'canceled');

create table backfill_task
(
id serial primary key,
exchange text not null,
from_block bigint not null,
to_block bigint not null,
processed_block bigint default 0 not null,
created_at timestamptz default now() not null,
updated_at timestamptz default now() not null,
status backfill_status default 'processing'::backfill_status not null
);

3 changes: 2 additions & 1 deletion v2/cmd/parse_log/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func run(c *cli.Context) error {
l.Infow("Starting log parser service")

db, err := initDB(c)
l.Infow("init db successfully")
if err != nil {
return fmt.Errorf("cannot init DB: %w", err)
}
Expand Down Expand Up @@ -151,7 +152,7 @@ func initDB(c *cli.Context) (*sqlx.DB, error) {
return db, nil
}

func getMostRecentBlock(l *zap.SugaredLogger, s state.Storage, rpcClient *rpcnode.Client) (uint64, error) {
func getMostRecentBlock(l *zap.SugaredLogger, s state.Storage, rpcClient rpcnode.IClient) (uint64, error) {
var blockNumber uint64
block, err := s.GetState(state.ProcessedBlockKey)
if err == nil {
Expand Down
94 changes: 84 additions & 10 deletions v2/internal/server/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package server
import (
"fmt"
"net/http"
"strconv"

"github.com/KyberNetwork/tradelogs/v2/internal/worker"
"github.com/KyberNetwork/tradelogs/v2/internal/service"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"github.com/rs/xid"
Expand All @@ -16,7 +17,7 @@ type BackfillServer struct {
l *zap.SugaredLogger
r *gin.Engine
bindAddr string
worker *worker.BackFiller
service *service.Backfill
}

type Query struct {
Expand All @@ -25,15 +26,15 @@ type Query struct {
Exchange string `json:"exchange" binding:"required"`
}

func NewBackfill(l *zap.SugaredLogger, bindAddr string, w *worker.BackFiller) *BackfillServer {
func NewBackfill(l *zap.SugaredLogger, bindAddr string, s *service.Backfill) *BackfillServer {
engine := gin.New()
engine.Use(gin.Recovery())

server := &BackfillServer{
l: l,
r: engine,
bindAddr: bindAddr,
worker: w,
service: s,
}

gin.SetMode(gin.ReleaseMode)
Expand All @@ -53,6 +54,10 @@ func (s *BackfillServer) Run() error {
func (s *BackfillServer) register() {
pprof.Register(s.r, "/debug")
s.r.POST("/backfill", s.backfill)
s.r.GET("/backfill", s.getAllTask)
s.r.GET("/backfill/:id", s.getTask)
s.r.GET("/backfill/cancel/:id", s.cancelTask)
s.r.GET("/backfill/restart/:id", s.restartTask)
}

func responseErr(c *gin.Context, err error) {
Expand All @@ -62,9 +67,10 @@ func responseErr(c *gin.Context, err error) {
})
}

func responseOK(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"success": true,
func internalServerError(c *gin.Context, err error) {
c.JSON(http.StatusInternalServerError, gin.H{
"success": false,
"error": err.Error(),
})
}

Expand All @@ -83,12 +89,80 @@ func (s *BackfillServer) backfill(c *gin.Context) {
l := s.l.With("reqID", xid.New().String())
l.Infow("receive backfill params", "params", params)

err := s.worker.BackfillByExchange(params.FromBlock, params.ToBlock, params.Exchange)
id, message, err := s.service.NewBackfillTask(params.FromBlock, params.ToBlock, params.Exchange)
if err != nil {
l.Errorw("error when backfill", "error", err)
responseErr(c, err)
internalServerError(c, err)
return
}

c.JSON(http.StatusOK, gin.H{
"success": true,
"id": id,
"message": message,
})
}

func (s *BackfillServer) getAllTask(c *gin.Context) {
tasks, err := s.service.ListTask()
if err != nil {
internalServerError(c, err)
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
"tasks": tasks,
})
}

func (s *BackfillServer) getTask(c *gin.Context) {
id := c.Param("id")
taskID, err := strconv.ParseInt(id, 10, 32)
if err != nil || len(id) == 0 {
responseErr(c, fmt.Errorf("invalid task id: %s", id))
return
}
task, err := s.service.GetTask(int(taskID))
Fixed Show fixed Hide fixed
if err != nil {
internalServerError(c, err)
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
"task": task,
})
}

func (s *BackfillServer) cancelTask(c *gin.Context) {
id := c.Param("id")
taskID, err := strconv.ParseInt(id, 10, 32)
if err != nil {
responseErr(c, fmt.Errorf("invalid task id: %w", err))
return
}
err = s.service.CancelBackfillTask(int(taskID))
Fixed Show fixed Hide fixed
if err != nil {
internalServerError(c, err)
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
})
}

responseOK(c)
func (s *BackfillServer) restartTask(c *gin.Context) {
id := c.Param("id")
taskID, err := strconv.ParseInt(id, 10, 32)
if err != nil {
responseErr(c, fmt.Errorf("invalid task id: %w", err))
return
}
err = s.service.RestartBackfillTask(int(taskID))
Fixed Show fixed Hide fixed
if err != nil {
internalServerError(c, err)
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
})
}
Loading
Loading