Skip to content

Commit

Permalink
test setup and reorg tests
Browse files Browse the repository at this point in the history
  • Loading branch information
iuwqyir committed Oct 8, 2024
1 parent df18b17 commit 7ade93f
Show file tree
Hide file tree
Showing 15 changed files with 2,050 additions and 38 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.env
.mockery.yaml

configs/secrets*
2 changes: 1 addition & 1 deletion .github/workflows/format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
Expand Down
29 changes: 29 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Go Unit Tests

on:
push:
branches: [main]
pull_request:
types: [opened, synchronize]
workflow_dispatch:

jobs:
test:
name: Test
timeout-minutes: 15
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.23'

- name: Install dependencies
run: go mod download

- name: Run Unit Tests
run: go test ./... -v
env:
GO_ENV: "test"
16 changes: 16 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
with-expecter: true
mockname: "Mock{{.InterfaceName}}"
filename: "{{.MockName}}.go"
outpkg: mocks
dir: test/mocks
mock-build-tags: "!production"
packages:
github.com/thirdweb-dev/indexer/internal/rpc:
interfaces:
IRPCClient:
github.com/thirdweb-dev/indexer/internal/storage:
interfaces:
IStorage:
IMainStorage:
IStagingStorage:
IOrchestratorStorage:
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ RUN go mod download

COPY . .

RUN go build -o /bin/app main.go
RUN go build -o /bin/app -tags=production main.go

FROM alpine:3.18

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ git clone https://github.com/thirdweb-dev/insight.git
4. Create `secrets.yml` from `secrects.example.yml` and set the needed credentials
5. Build an instance
```
go build -o main
go build -o main -tags=production
```
6. Run insight
```
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ services:
image: clickhouse/clickhouse-server:latest
ports:
- "8123:8123"
- "9000:9000"
- "9440:9000"
volumes:
- clickhouse_data:/var/lib/clickhouse
ulimits:
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/rs/zerolog v1.33.0
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.18.0
github.com/stretchr/testify v1.9.0
github.com/swaggo/files v1.0.1
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.3
Expand All @@ -35,6 +36,7 @@ require (
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
Expand Down Expand Up @@ -76,6 +78,7 @@ require (
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
Expand All @@ -88,6 +91,7 @@ require (
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/supranational/blst v0.3.11 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ github.com/status-im/keycard-go v0.2.0/go.mod h1:wlp8ZLbsmrF6g6WjugPAx+IzoLrkdf9
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
81 changes: 47 additions & 34 deletions internal/orchestrator/reorg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,45 +71,54 @@ func (rh *ReorgHandler) Start() {
log.Debug().Msgf("Reorg handler running")
go func() {
for range ticker.C {
lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan)))
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom)
// need to include lastCheckedBlock to check if next block's parent matches
lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan-1)))
mostRecentBlockChecked, err := rh.RunFromBlock(lookbackFrom)
if err != nil {
log.Error().Err(err).Msg("Error getting recent block headers")
log.Error().Err(err).Msg("Error during reorg handling")
continue
}
if len(blockHeaders) == 0 {
log.Warn().Msg("No block headers found")
if mostRecentBlockChecked == nil {
continue
}
mostRecentBlockHeader := blockHeaders[0]
reorgEndIndex := findReorgEndIndex(blockHeaders)
if reorgEndIndex == -1 {
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
continue
}
metrics.ReorgCounter.Inc()
forkPoint, err := rh.findForkPoint(blockHeaders[reorgEndIndex:])
if err != nil {
log.Error().Err(err).Msg("Error while finding fork point")
continue
}
err = rh.handleReorg(forkPoint, lookbackFrom)
if err != nil {
log.Error().Err(err).Msg("Error while handling reorg")
continue
}
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))

rh.lastCheckedBlock = mostRecentBlockChecked
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockChecked)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockChecked.Int64()))
}
}()

// Keep the program running (otherwise it will exit)
select {}
}

func (rh *ReorgHandler) RunFromBlock(lookbackFrom *big.Int) (lastCheckedBlock *big.Int, err error) {
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom)
if err != nil {
return nil, fmt.Errorf("error getting recent block headers: %w", err)
}
if len(blockHeaders) == 0 {
log.Warn().Msg("No block headers found during reorg handling")
return nil, nil
}
mostRecentBlockHeader := blockHeaders[0]
reorgEndIndex := findReorgEndIndex(blockHeaders)
if reorgEndIndex == -1 {
return mostRecentBlockHeader.Number, nil
}
reorgEndBlock := blockHeaders[reorgEndIndex].Number
metrics.ReorgCounter.Inc()
forkPoint, err := rh.findFirstForkedBlockNumber(blockHeaders[reorgEndIndex:])
if err != nil {
return nil, fmt.Errorf("error while finding fork point: %w", err)
}
err = rh.handleReorg(forkPoint, reorgEndBlock)
if err != nil {
return nil, fmt.Errorf("error while handling reorg: %w", err)
}
return mostRecentBlockHeader.Number, nil
}

func findReorgEndIndex(reversedBlockHeaders []common.BlockHeader) (index int) {
for i := 0; i < len(reversedBlockHeaders)-1; i++ {
currentBlock := reversedBlockHeaders[i]
Expand All @@ -129,20 +138,21 @@ func findReorgEndIndex(reversedBlockHeaders []common.BlockHeader) (index int) {
return -1
}

func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) {
func (rh *ReorgHandler) findFirstForkedBlockNumber(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) {
newBlocksByNumber, err := rh.getNewBlocksByNumber(reversedBlockHeaders)
if err != nil {
return nil, err
}

for i := 0; i < len(reversedBlockHeaders)-1; i++ {
// skip first because that is the reorg end block
for i := 1; i < len(reversedBlockHeaders); i++ {
blockHeader := reversedBlockHeaders[i]
block, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
if !ok {
return nil, fmt.Errorf("block not found: %s", blockHeader.Number.String())
}
if block.Hash == blockHeader.Hash {
previousBlock := reversedBlockHeaders[i+1]
if blockHeader.ParentHash == block.ParentHash && blockHeader.Hash == block.Hash {
previousBlock := reversedBlockHeaders[i-1]
return previousBlock.Number, nil
}
}
Expand All @@ -151,7 +161,7 @@ func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader)
if err != nil {
return nil, fmt.Errorf("error getting next headers batch: %w", err)
}
return rh.findForkPoint(nextHeadersBatch)
return rh.findFirstForkedBlockNumber(nextHeadersBatch)
}

func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
Expand All @@ -171,13 +181,15 @@ func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.Block
}

func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) error {
log.Debug().Msgf("Handling reorg from block %s to %s", reorgStart.String(), reorgEnd.String())
blockRange := make([]*big.Int, 0, new(big.Int).Sub(reorgEnd, reorgStart).Int64())
for i := new(big.Int).Set(reorgStart); i.Cmp(reorgEnd) <= 0; i.Add(i, big.NewInt(1)) {
blockRange = append(blockRange, new(big.Int).Set(i))
}

results := rh.worker.Run(blockRange)
data := make([]common.BlockData, 0, len(results))
blocksToDelete := make([]*big.Int, 0, len(results))
for _, result := range results {
if result.Error != nil {
return fmt.Errorf("cannot fix reorg: failed block %s: %w", result.BlockNumber.String(), result.Error)
Expand All @@ -188,10 +200,11 @@ func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) erro
Transactions: result.Data.Transactions,
Traces: result.Data.Traces,
})
blocksToDelete = append(blocksToDelete, result.BlockNumber)
}
// TODO make delete and insert atomic
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blockRange); err != nil {
return fmt.Errorf("error deleting data for blocks %v: %w", blockRange, err)
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blocksToDelete); err != nil {
return fmt.Errorf("error deleting data for blocks %v: %w", blocksToDelete, err)
}
if err := rh.storage.MainStorage.InsertBlockData(&data); err != nil {
return fmt.Errorf("error saving data to main storage: %w", err)
Expand Down
Loading

0 comments on commit 7ade93f

Please sign in to comment.