Skip to content

Commit

Permalink
TRD-573 using tradelog as struct, add dockerfile & readme
Browse files Browse the repository at this point in the history
  • Loading branch information
linhnt3400 committed Sep 17, 2024
1 parent 02656de commit a2867e2
Show file tree
Hide file tree
Showing 23 changed files with 281 additions and 318 deletions.
8 changes: 8 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ COPY . .

RUN go build -o app ./cmd/tradelogs

RUN go build -o parse_log ./v2/cmd/parse_log


## DEPLOY
FROM debian:bullseye
Expand All @@ -15,10 +17,16 @@ RUN apt-get update && \
apt install -y ca-certificates && \
rm -rf /var/lib/apt/lists/*

### tradelogs v1
WORKDIR /cmd

COPY --from=builder /src/app /cmd/app

COPY cmd/tradelogs/migrations migrations

### tradelogs v2
WORKDIR /parse_log
COPY --from=builder /src/parse_log /parse_log/parse_log
COPY v2/cmd/parse_log/migrations migrations

ENTRYPOINT /cmd/app
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,22 @@
## This is service to indexing tradelogs from event

### Re-generate mock file

First, you need to install `mockery`

- https://vektra.github.io/mockery/latest/installation/
- https://github.com/vektra/mockery

Then you use the `mockery` to generate mock files from interface

```
mockery --dir=<interface directory> --name=<interface name> --output=<output dir> --structname=<name of output struct> --filename=<output filename>
```

Example: generate `Storage` interface to a struct name `MockStorage`

```
mockery --dir=v2/pkg/storage/state --name=Storage --output=v2/mocks/ --structname=MockState --filename=State.go
```

For more information `mockery --help`
11 changes: 6 additions & 5 deletions v2/cmd/parse_log/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func run(c *cli.Context) error {

db, err := initDB(c)
if err != nil {
l.Panicw("cannot init DB", "err", err)
return fmt.Errorf("cannot init DB: %w", err)
}

// trade log manager
Expand All @@ -78,7 +78,7 @@ func run(c *cli.Context) error {
for i, url := range rpcURL {
client, err := ethclient.Dial(url)
if err != nil {
panic(fmt.Errorf("cannot dial eth client: %w", err))
return fmt.Errorf("cannot dial eth client: %w", err)
}
ethClients[i] = client
}
Expand All @@ -101,13 +101,13 @@ func run(c *cli.Context) error {
broadcastTopic := c.String(libapp.KafkaBroadcastTopic.Name)
err = kafka.ValidateTopicName(broadcastTopic)
if err != nil {
panic(fmt.Errorf("invalid kafka topic: %w", err))
return fmt.Errorf("invalid kafka topic: %w", err)
}

// kafka publisher for broadcasting trade logs
kafkaPublisher, err := kafka.NewPublisher(libapp.KafkaConfigFromFlags(c))
if err != nil {
panic(fmt.Errorf("cannot create kafka publisher: %w", err))
return fmt.Errorf("cannot create kafka publisher: %w", err)
}

// trade log handler
Expand All @@ -118,7 +118,7 @@ func run(c *cli.Context) error {

mostRecentBlock, err := getMostRecentBlock(l, s, rpcNode)
if err != nil {
panic(fmt.Errorf("cannot get most recent block: %w", err))
return fmt.Errorf("cannot get most recent block: %w", err)
}

// subscribe evm listener with worker as a consumer
Expand All @@ -129,6 +129,7 @@ func run(c *cli.Context) error {
c.Int(libapp.EVMListenerMaxTrackingBlock.Name),
w,
mostRecentBlock,
c.Int(libapp.EVMListenerClientTimeoutSecondFlag.Name),
)
}

Expand Down
18 changes: 11 additions & 7 deletions v2/internal/app/evmlistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
)

const (
EVMHTTPRPCUrlFlagName = "http-rpc-url"
EVMwsRPCUrlFlagName = "ws-rpc-url"
EVMMaxTrackingBlock = "max-tracking-block"
EVMBlockExpiration = "block-expiration"
EVMHTTPRPCUrlFlagName = "http-rpc-url"
EVMwsRPCUrlFlagName = "ws-rpc-url"
EVMMaxTrackingBlock = "max-tracking-block"
EVMClientTimeoutSecond = "timeout-second"
)

//nolint:gochecknoglobals
Expand All @@ -17,26 +17,30 @@ var (
Name: EVMHTTPRPCUrlFlagName,
EnvVar: "EVM_HTTP_RPC_URL",
Usage: "HTTP RPC node url for EVM Listener",
Value: "https://ethereum.kyberengineering.io/trading-tokyo",
}
EVMListenerWsRPCUrlFlag = &cli.StringFlag{
Name: EVMwsRPCUrlFlagName,
EnvVar: "EVM_WS_RPC_URL",
Usage: "Web socket RPC node url for EVM Listener",
Value: "wss://ethereum.kyberengineering.io/trading-tokyo",
}
EVMListenerMaxTrackingBlock = &cli.IntFlag{
Name: EVMMaxTrackingBlock,
EnvVar: "EVM_MAX_TRACKING_BLOCK",
Usage: "Max tracking block number for block keeper",
Value: 32,
}
EVMListenerClientTimeoutSecondFlag = &cli.IntFlag{
Name: EVMClientTimeoutSecond,
EnvVar: "EVM_CLIENT_TIMEOUT_SECOND",
Usage: "Client timeout second",
Value: 15,
}
)

// NewEvmListenerFlags returns flags for evmlistener.
func EvmListenerFlags() []cli.Flag {
return []cli.Flag{
EVMListenerHTTPRPCUrlFlag, EVMListenerWsRPCUrlFlag,
EVMListenerMaxTrackingBlock,
EVMListenerMaxTrackingBlock, EVMListenerClientTimeoutSecondFlag,
}
}
4 changes: 2 additions & 2 deletions v2/mocks/Storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions v2/pkg/constant/exchange.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package constant

const (
ExZeroX = "zerox"
EXZeroXV3 = "zeroxV3"
ExParaswap = "paraswap"
ExTokenlon = "tokenlon"
ExZeroXRFQ = "zerox_rfq"
ExKs = "kyberswap"
ExHashflow = "hashflow"
ExKsRFQ = "kyberswapRFQ"
Ex1Inch = "1inch"
Ex1InchV6 = "1inchV6"
ExHashflowV3 = "hashflowV3"
ExParaswapTaker = "paraswap_taker"
ExUniswapX = "uniswapx"
ExNative = "native"
ExBebop = "bebop"
ExUniswapXV1 = "uniswapxV1"

Addr1InchV6 = "0x111111125421cA6dc452d289314280a0f8842A65"
AddrBebop = "0xbbbbbBB520d69a9775E85b458C58c648259FAD5F"
AddrHashflowV3 = "0x24b9d98FABF4DA1F69eE10775F240AE3dA6856fd"
AddrKyberswap = "0x6131B5fae19EA4f9D964eAc0408E4408b66337b5"
AddrKyberswapRFQ = "0x7A819Fa46734a49D0112796f9377E024c350FB26"
AddrParaswap = "0xe92b586627ccA7a83dC919cc7127196d70f55a06"
AddrUniswapX = "0x00000011F84B9aa48e5f8aA8B9897600006289Be"
AddrUniswapXV1 = "0x6000da47483062A0D734Ba3dc7576Ce6A0B645C4"
Addr0x = "0xDef1C0ded9bec7F1a1670819833240f027b25EfF"
Deployer0xV3 = "0x00000000000004533Fe15556B1E086BB1A72cEae"

TableZeroX = "tradelogs_zerox"
)
4 changes: 2 additions & 2 deletions v2/pkg/evmlistenerclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
)

func SubscribeEvent(l *zap.SugaredLogger, wsRPC, httpRPC string, maxTrackingBlock int,
publisher pubsub.Publisher, processedBlock uint64) error {
publisher pubsub.Publisher, processedBlock uint64, timeoutSecond int) error {
httpClient := &http.Client{
Timeout: time.Second * 15,
Timeout: time.Second * time.Duration(timeoutSecond),
Transport: &http.Transport{},
}

Expand Down
58 changes: 38 additions & 20 deletions v2/pkg/handler/trade_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ type TradeLogHandler struct {
}

type logMetadata struct {
blockNumber uint64
blockHash string
txHash string
txIndex int
timestamp uint64
messageSender string
interactContract string
blockNumber uint64
blockHash string
txHash string
txIndex int
timestamp uint64
}

func NewTradeLogHandler(l *zap.SugaredLogger, rpc *rpcnode.Client, storage *tradelogs.Manager, parsers []parser.Parser,
Expand All @@ -61,20 +59,23 @@ func (h *TradeLogHandler) ProcessBlock(blockHash string, blockNumber uint64, tim
for i, call := range calls {
logIndexStart = assignLogIndexes(&call.CallFrame, logIndexStart)
metadata := logMetadata{
blockNumber: blockNumber,
blockHash: blockHash,
txHash: call.TxHash,
txIndex: i,
timestamp: timestamp,
messageSender: call.CallFrame.From,
interactContract: call.CallFrame.To,
blockNumber: blockNumber,
blockHash: blockHash,
txHash: call.TxHash,
txIndex: i,
timestamp: timestamp,
}

tradeLogs := h.processCallFrame(call.CallFrame, metadata)
if len(tradeLogs) == 0 {
continue
}

for j := range tradeLogs {
tradeLogs[j].MessageSender = call.CallFrame.From
tradeLogs[j].InteractContract = call.CallFrame.To
}

err = h.storage.Insert(tradeLogs)
if err != nil {
return fmt.Errorf("write to storage error: %w", err)
Expand All @@ -83,7 +84,10 @@ func (h *TradeLogHandler) ProcessBlock(blockHash string, blockNumber uint64, tim

passCount, failCount := 0, 0
for _, log := range tradeLogs {
msgBytes, err := json.Marshal(log)
msgBytes, err := json.Marshal(kafka.Message{
Type: kafka.MessageTypeTradeLog,
Data: log,
})
if err != nil {
h.l.Errorw(" error when marshal trade log to json", "blockNumber", blockNumber, "log", log, "err", err)
failCount++
Expand Down Expand Up @@ -140,11 +144,7 @@ func (h *TradeLogHandler) processCallFrame(call types.CallFrame, metadata logMet
}

// parse trade log
tradeLogs, err := p.ParseWithCallFrame(
call, ethLog, metadata.timestamp,
storageTypes.WithMessageSender(metadata.messageSender),
storageTypes.WithInteractContract(metadata.interactContract),
)
tradeLogs, err := p.ParseWithCallFrame(call, ethLog, metadata.timestamp)
if err != nil {
h.l.Errorw("error when parse log", "log", ethLog, "err", err, "parser", p.Exchange())
continue
Expand All @@ -164,10 +164,28 @@ func (h *TradeLogHandler) findMatchingParser(log ethereumTypes.Log) parser.Parse
}

func (h *TradeLogHandler) RevertBlock(blocks []uint64) error {
if len(blocks) == 0 {
return nil
}

err := h.storage.Delete(blocks)
if err != nil {
return fmt.Errorf("delete blocks error: %w", err)
}

msgBytes, err := json.Marshal(kafka.Message{
Type: kafka.MessageTypeRevert,
Data: blocks,
})
if err != nil {
h.l.Errorw(" error when marshal revert message to json", "err", err)
}
err = h.publisher.Publish(h.kafkaTopic, msgBytes)
if err != nil {
h.l.Errorw("error when publish revert message", "err", err)
}
h.l.Infow("published revert message", "message", msgBytes)

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion v2/pkg/handler/trade_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestTradeLogHandler_ProcessBlock(t *testing.T) {
client := rpcnode.NewClient(zap.S(), ethClient)

mockStorage := &mocks.MockStorage{}
mockStorage.On("Type").Return("zerox").
mockStorage.On("Exchange").Return("zerox").
On("Insert", mock.Anything).Return(nil)
s := tradelogs.NewManager(zap.S(), []types.Storage{mockStorage})

Expand Down
9 changes: 0 additions & 9 deletions v2/pkg/kafka/config.go

This file was deleted.

21 changes: 21 additions & 0 deletions v2/pkg/kafka/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package kafka

type Config struct {
Addresses []string

UseAuthentication bool
Username string
Password string
}

type Message struct {
Type MessageType `json:"type"`
Data interface{} `json:"data"`
}

type MessageType string

const (
MessageTypeTradeLog MessageType = "trade_log"
MessageTypeRevert MessageType = "revert"
)
32 changes: 1 addition & 31 deletions v2/pkg/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,6 @@ import (
ethereumTypes "github.com/ethereum/go-ethereum/core/types"
)

const (
ExZeroX = "zerox"
EXZeroXV3 = "zeroxV3"
ExParaswap = "paraswap"
ExTokenlon = "tokenlon"
ExZeroXRFQ = "zerox_rfq"
ExKs = "kyberswap"
ExHashflow = "hashflow"
ExKsRFQ = "kyberswapRFQ"
Ex1Inch = "1inch"
Ex1InchV6 = "1inchV6"
ExHashflowV3 = "hashflowV3"
ExParaswapTaker = "paraswap_taker"
ExUniswapX = "uniswapx"
ExNative = "native"
ExBebop = "bebop"
ExUniswapXV1 = "uniswapxV1"

Addr1InchV6 = "0x111111125421cA6dc452d289314280a0f8842A65"
AddrBebop = "0xbbbbbBB520d69a9775E85b458C58c648259FAD5F"
AddrHashflowV3 = "0x24b9d98FABF4DA1F69eE10775F240AE3dA6856fd"
AddrKyberswap = "0x6131B5fae19EA4f9D964eAc0408E4408b66337b5"
AddrKyberswapRFQ = "0x7A819Fa46734a49D0112796f9377E024c350FB26"
AddrParaswap = "0xe92b586627ccA7a83dC919cc7127196d70f55a06"
AddrUniswapX = "0x00000011F84B9aa48e5f8aA8B9897600006289Be"
AddrUniswapXV1 = "0x6000da47483062A0D734Ba3dc7576Ce6A0B645C4"
Addr0x = "0xDef1C0ded9bec7F1a1670819833240f027b25EfF"
Deployer0xV3 = "0x00000000000004533Fe15556B1E086BB1A72cEae"
)

var (
ErrInvalidTopic = errors.New("invalid order topic")
ErrNotFoundTrade = errors.New("not found log")
Expand All @@ -48,6 +18,6 @@ type Parser interface {
Topics() []string
Exchange() string
UseTraceCall() bool
ParseWithCallFrame(callFrame types.CallFrame, log ethereumTypes.Log, blockTime uint64, options ...storageTypes.Option) ([]storageTypes.TradeLog, error)
ParseWithCallFrame(callFrame types.CallFrame, log ethereumTypes.Log, blockTime uint64) ([]storageTypes.TradeLog, error)
LogFromExchange(log ethereumTypes.Log) bool
}
Loading

0 comments on commit a2867e2

Please sign in to comment.