Skip to content

Commit

Permalink
persist error parsing log (#40)
Browse files Browse the repository at this point in the history
* persist error parsing log
  • Loading branch information
ngocthanh1389 authored Apr 17, 2024
1 parent 9857c9a commit b4a13e9
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 34 deletions.
12 changes: 12 additions & 0 deletions cmd/tradelogs/migrations/00005_error_parsing_log.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE errorlogs (
address text NOT NULL,
topics text NOT NULL,
data bytea NOT NULL,
block_number int8 NOT NULL,
tx_hash text NOT NULL,
tx_index int8 NOT NULL,
block_hash text NOT NULL,
log_index int8 NOT NULL,
time int8 NOT NULL,
CONSTRAINT errorlogs_pkey PRIMARY KEY (block_number, log_index)
);
1 change: 1 addition & 0 deletions internal/app/rpcnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var (
Name: RPCUrlFlagName,
EnvVar: "RPC_URL",
Usage: "RPC node url",
Value: "https://ethereum.kyberengineering.io/trading-ethereum",
}
)

Expand Down
1 change: 1 addition & 0 deletions internal/server/tradelogs/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (b *Broadcaster) addConn(event, maker string, conn *websocket.Conn) {
if !ok {
return
}
conn.Close()
delete(e, id)
b.mu.Unlock()
}
Expand Down
13 changes: 12 additions & 1 deletion internal/server/tradelogs/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,20 @@ const (
func TestWSClient(t *testing.T) {
t.Skip()
for {
conn, _, err := websocket.DefaultDialer.Dial(url2, nil)
conn, _, err := websocket.DefaultDialer.Dial(url1, nil)
if err != nil {
log.Fatal("dial:", err)
}
conn.SetPongHandler(func(appData string) error {
log.Println("pong", appData)
return nil
})
go func() {
for range time.NewTicker(time.Second).C {
log.Println("ping")
_ = conn.WriteMessage(websocket.PingMessage, []byte{})
}
}()
for range time.NewTicker(time.Second).C {
if _, data, err := conn.ReadMessage(); err == nil {
var l storage.TradeLog
Expand All @@ -31,6 +41,7 @@ func TestWSClient(t *testing.T) {
log.Println("receive:", l)
} else {
conn.Close()
log.Println("err", err)
break
}
}
Expand Down
73 changes: 41 additions & 32 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,22 @@ package worker

import (
"context"
"fmt"
"strings"
"time"

etype "github.com/KyberNetwork/evmlistener/pkg/types"
"github.com/KyberNetwork/evmlistener/pkg/types"
"github.com/KyberNetwork/tradelogs/pkg/convert"
"github.com/KyberNetwork/tradelogs/pkg/evmlistenerclient"
"github.com/KyberNetwork/tradelogs/pkg/parser"
"github.com/KyberNetwork/tradelogs/pkg/storage"
"github.com/ethereum/go-ethereum/common/lru"
"go.uber.org/zap"
)

type EVMLog struct {
log etype.Log
ts uint64
}

type Worker struct {
listener *evmlistenerclient.Client
l *zap.SugaredLogger
s *storage.Storage
p map[string]parser.Parser
errLogs lru.BasicLRU[string, EVMLog]
tradeLogChan chan storage.TradeLog
}

Expand All @@ -41,7 +34,6 @@ func New(l *zap.SugaredLogger, s *storage.Storage, listener *evmlistenerclient.C
l: l,
s: s,
p: p,
errLogs: lru.NewBasicLRU[string, EVMLog](1000),
tradeLogChan: tradeLogChan,
}, nil
}
Expand Down Expand Up @@ -85,14 +77,8 @@ func (w *Worker) processMessages(m []evmlistenerclient.Message) error {
for _, block := range message.NewBlocks {
for _, block := range message.RevertedBlocks {
deleteBlocks = append(deleteBlocks, block.Number.Uint64())
for _, k := range w.errLogs.Keys() {
l, ok := w.errLogs.Peek(k)
if !ok {
continue
}
if l.log.BlockHash == block.Hash {
w.errLogs.Remove(k)
}
if err := w.s.DeleteErrorLogsWithBlock(block.Hash); err != nil {
return err
}
}
for _, log := range block.Logs {
Expand All @@ -106,10 +92,19 @@ func (w *Worker) processMessages(m []evmlistenerclient.Message) error {
order, err := ps.Parse(convert.ToETHLog(log), block.Timestamp)
if err != nil {
w.l.Errorw("error when parse log", "log", log, "order", order, "err", err)
w.errLogs.Add(fmt.Sprintf("%d-%d", log.BlockNumber, log.Index), EVMLog{
log: log,
ts: block.Timestamp,
})
if err := w.s.InsertErrorLog(storage.EVMLog{
Address: log.Address,
Topics: strings.Join(log.Topics, ","),
Data: log.Data,
BlockNumber: log.BlockNumber,
TxHash: log.TxHash,
TxIndex: log.TxIndex,
BlockHash: log.BlockHash,
Index: log.Index,
Time: block.Timestamp,
}); err != nil {
return err
}
continue
}
insertOrders = append(insertOrders, order)
Expand All @@ -132,25 +127,39 @@ func (w *Worker) processMessages(m []evmlistenerclient.Message) error {

func (w *Worker) retryParseLog() error {
insertOrders := []storage.TradeLog{}
keys := w.errLogs.Keys()
w.l.Infow("start retry logs", "len", len(keys))
for _, k := range keys {
l, ok := w.errLogs.Peek(k)
if !ok {
logs, err := w.s.GetErrorLogs()
if err != nil {
return err
}
w.l.Infow("start retry logs", "len", len(logs))
for _, l := range logs {
topics := strings.Split(l.Topics, ",")
if len(topics) == 0 {
continue
}
ps := w.p[l.log.Topics[0]]
ps := w.p[topics[0]]
if ps == nil {
continue
}
order, err := ps.Parse(convert.ToETHLog(l.log), l.ts)
order, err := ps.Parse(convert.ToETHLog(types.Log{
Address: l.Address,
Topics: topics,
Data: l.Data,
BlockNumber: l.BlockNumber,
TxHash: l.TxHash,
TxIndex: l.TxIndex,
BlockHash: l.BlockHash,
Index: l.Index,
}), l.Time)
if err != nil {
w.l.Errorw("error when retry log", "log", l.log, "err", err)
w.l.Errorw("error when retry log", "log", l, "err", err)
continue
}

w.l.Infow("retry log successfully", "key", k, "parser", ps.Exchange())
w.errLogs.Remove(k)
w.l.Infow("retry log successfully", "tx", order.TxHash, "index", order.LogIndex, "parser", ps.Exchange())
if err := w.s.DeleteErrorLogsWithLogIndex(l.BlockNumber, l.Index); err != nil {
return err
}
insertOrders = append(insertOrders, order)
}

Expand Down
92 changes: 91 additions & 1 deletion pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,23 @@ import (
"go.uber.org/zap"
)

const tradeLogsTable = "tradelogs"
const (
tradeLogsTable = "tradelogs"
errorLogsTable = "errorlogs"
)

type EVMLog struct {
Address string `db:"address"`
Topics string `db:"topics"`
Data []byte `db:"data"`
BlockNumber uint64 `db:"block_number"`
TxHash string `db:"tx_hash"`
TxIndex uint `db:"tx_index"`
BlockHash string `db:"block_hash"`
Index uint `db:"log_index"`
Removed bool `db:"removed"`
Time uint64 `db:"time"`
}

type Storage struct {
db *sqlx.DB
Expand Down Expand Up @@ -132,3 +148,77 @@ func tradelogsColumns() []string {
"maker_traits",
}
}

func (s *Storage) InsertErrorLog(log EVMLog) error {
b := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar).Insert(errorLogsTable).Columns(
[]string{"address", "topics", "data", "block_number", "tx_hash",
"tx_index", "block_hash", "log_index", "time"}...,
).Values(log.Address, log.Topics, log.Data, log.BlockNumber, log.TxHash,
log.TxIndex, log.BlockHash, log.Index, log.Time)
q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE
SET
address=excluded.address,
topics=excluded.topics,
data=excluded.data,
block_number=excluded.block_number,
tx_hash=excluded.tx_hash,
tx_index=excluded.tx_index,
block_hash=excluded.block_hash,
log_index=excluded.log_index,
time=excluded.time
`).ToSql()
if err != nil {
s.l.Errorw("Error build insert", "error", err)
return err
}
if _, err := s.db.Exec(q, p...); err != nil {
s.l.Errorw("Error exec insert", "sql", q, "arg", p, "error", err)
return err
}
return nil
}

func (s *Storage) GetErrorLogs() ([]EVMLog, error) {
q, p, err := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar).
Select("*").
From(errorLogsTable).ToSql()
if err != nil {
return nil, err
}

var result []EVMLog
if err := s.db.Select(&result, q, p...); err != nil {
return nil, err
}
return result, nil
}

func (s *Storage) DeleteErrorLogsWithBlock(block_hash string) error {
q, p, err := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar).
Delete(errorLogsTable).
Where(squirrel.Eq{"block_hash": block_hash}).
ToSql()
if err != nil {
s.l.Errorw("Error while delete", "block_number", block_hash, "error", err)
return err
}
if _, err := s.db.Exec(q, p...); err != nil {
return err
}
return nil
}

func (s *Storage) DeleteErrorLogsWithLogIndex(block uint64, logIndex uint) error {
q, p, err := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar).
Delete(errorLogsTable).
Where(squirrel.Eq{"block_number": block, "log_index": logIndex}).
ToSql()
if err != nil {
s.l.Errorw("Error while delete", "block_number", block, "log_index", logIndex, "error", err)
return err
}
if _, err := s.db.Exec(q, p...); err != nil {
return err
}
return nil
}

0 comments on commit b4a13e9

Please sign in to comment.