Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test setup and reorg handling tests #95

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.env
.mockery.yaml

configs/secrets*
2 changes: 1 addition & 1 deletion .github/workflows/format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
Expand Down
29 changes: 29 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Go Unit Tests

on:
push:
branches: [main]
pull_request:
types: [opened, synchronize]
workflow_dispatch:

jobs:
test:
name: Test
timeout-minutes: 15
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.23'

- name: Install dependencies
run: go mod download

- name: Run Unit Tests
run: go test ./... -v
env:
GO_ENV: "test"
16 changes: 16 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
with-expecter: true
mockname: "Mock{{.InterfaceName}}"
filename: "{{.MockName}}.go"
outpkg: mocks
dir: test/mocks
mock-build-tags: "!production"
packages:
github.com/thirdweb-dev/indexer/internal/rpc:
interfaces:
IRPCClient:
github.com/thirdweb-dev/indexer/internal/storage:
interfaces:
IStorage:
IMainStorage:
IStagingStorage:
IOrchestratorStorage:
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ RUN go mod download

COPY . .

RUN go build -o /bin/app main.go
RUN go build -o /bin/app -tags=production main.go

FROM alpine:3.18

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ git clone https://github.com/thirdweb-dev/insight.git
4. Create `secrets.yml` from `secrects.example.yml` and set the needed credentials
5. Build an instance
```
go build -o main
go build -o main -tags=production
```
6. Run insight
```
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ services:
image: clickhouse/clickhouse-server:latest
ports:
- "8123:8123"
- "9000:9000"
- "9440:9000"
catalyst17 marked this conversation as resolved.
Show resolved Hide resolved
volumes:
- clickhouse_data:/var/lib/clickhouse
ulimits:
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/rs/zerolog v1.33.0
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.18.0
github.com/stretchr/testify v1.9.0
github.com/swaggo/files v1.0.1
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.3
Expand All @@ -35,6 +36,7 @@ require (
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
Expand Down Expand Up @@ -76,6 +78,7 @@ require (
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
Expand All @@ -88,6 +91,7 @@ require (
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/supranational/blst v0.3.11 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ github.com/status-im/keycard-go v0.2.0/go.mod h1:wlp8ZLbsmrF6g6WjugPAx+IzoLrkdf9
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
82 changes: 48 additions & 34 deletions internal/orchestrator/reorg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,45 +71,55 @@ func (rh *ReorgHandler) Start() {
log.Debug().Msgf("Reorg handler running")
go func() {
for range ticker.C {
lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan)))
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom)
// need to include lastCheckedBlock to check if next block's parent matches
lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan-1)))
mostRecentBlockChecked, err := rh.RunFromBlock(lookbackFrom)
if err != nil {
log.Error().Err(err).Msg("Error getting recent block headers")
log.Error().Err(err).Msg("Error during reorg handling")
continue
}
if len(blockHeaders) == 0 {
log.Warn().Msg("No block headers found")
if mostRecentBlockChecked == nil {
continue
}
mostRecentBlockHeader := blockHeaders[0]
reorgEndIndex := findReorgEndIndex(blockHeaders)
if reorgEndIndex == -1 {
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
continue
}
metrics.ReorgCounter.Inc()
forkPoint, err := rh.findForkPoint(blockHeaders[reorgEndIndex:])
if err != nil {
log.Error().Err(err).Msg("Error while finding fork point")
continue
}
err = rh.handleReorg(forkPoint, lookbackFrom)
if err != nil {
log.Error().Err(err).Msg("Error while handling reorg")
continue
}
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))

rh.lastCheckedBlock = mostRecentBlockChecked
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockChecked)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockChecked.Int64()))
}
}()

// Keep the program running (otherwise it will exit)
select {}
}

func (rh *ReorgHandler) RunFromBlock(lookbackFrom *big.Int) (lastCheckedBlock *big.Int, err error) {
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom)
if err != nil {
return nil, fmt.Errorf("error getting recent block headers: %w", err)
}
if len(blockHeaders) == 0 {
log.Warn().Msg("No block headers found during reorg handling")
return nil, nil
}
mostRecentBlockHeader := blockHeaders[0]
log.Debug().Msgf("Checking for reorgs from block %s to %s", mostRecentBlockHeader.Number.String(), blockHeaders[len(blockHeaders)-1].Number.String())
reorgEndIndex := findReorgEndIndex(blockHeaders)
if reorgEndIndex == -1 {
return mostRecentBlockHeader.Number, nil
}
reorgEndBlock := blockHeaders[reorgEndIndex].Number
metrics.ReorgCounter.Inc()
forkPoint, err := rh.findFirstForkedBlockNumber(blockHeaders[reorgEndIndex:])
if err != nil {
return nil, fmt.Errorf("error while finding fork point: %w", err)
}
err = rh.handleReorg(forkPoint, reorgEndBlock)
if err != nil {
return nil, fmt.Errorf("error while handling reorg: %w", err)
}
return mostRecentBlockHeader.Number, nil
}

func findReorgEndIndex(reversedBlockHeaders []common.BlockHeader) (index int) {
for i := 0; i < len(reversedBlockHeaders)-1; i++ {
currentBlock := reversedBlockHeaders[i]
Expand All @@ -129,20 +139,21 @@ func findReorgEndIndex(reversedBlockHeaders []common.BlockHeader) (index int) {
return -1
}

func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) {
func (rh *ReorgHandler) findFirstForkedBlockNumber(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) {
newBlocksByNumber, err := rh.getNewBlocksByNumber(reversedBlockHeaders)
if err != nil {
return nil, err
}

for i := 0; i < len(reversedBlockHeaders)-1; i++ {
// skip first because that is the reorg end block
for i := 1; i < len(reversedBlockHeaders); i++ {
blockHeader := reversedBlockHeaders[i]
block, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
if !ok {
return nil, fmt.Errorf("block not found: %s", blockHeader.Number.String())
}
if block.Hash == blockHeader.Hash {
previousBlock := reversedBlockHeaders[i+1]
if blockHeader.ParentHash == block.ParentHash && blockHeader.Hash == block.Hash {
previousBlock := reversedBlockHeaders[i-1]
return previousBlock.Number, nil
}
}
Expand All @@ -151,7 +162,7 @@ func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader)
if err != nil {
return nil, fmt.Errorf("error getting next headers batch: %w", err)
}
return rh.findForkPoint(nextHeadersBatch)
return rh.findFirstForkedBlockNumber(nextHeadersBatch)
}

func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
Expand All @@ -171,13 +182,15 @@ func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.Block
}

func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) error {
log.Debug().Msgf("Handling reorg from block %s to %s", reorgStart.String(), reorgEnd.String())
blockRange := make([]*big.Int, 0, new(big.Int).Sub(reorgEnd, reorgStart).Int64())
for i := new(big.Int).Set(reorgStart); i.Cmp(reorgEnd) <= 0; i.Add(i, big.NewInt(1)) {
blockRange = append(blockRange, new(big.Int).Set(i))
}

results := rh.worker.Run(blockRange)
data := make([]common.BlockData, 0, len(results))
blocksToDelete := make([]*big.Int, 0, len(results))
for _, result := range results {
if result.Error != nil {
return fmt.Errorf("cannot fix reorg: failed block %s: %w", result.BlockNumber.String(), result.Error)
Expand All @@ -188,10 +201,11 @@ func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) erro
Transactions: result.Data.Transactions,
Traces: result.Data.Traces,
})
blocksToDelete = append(blocksToDelete, result.BlockNumber)
}
// TODO make delete and insert atomic
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blockRange); err != nil {
return fmt.Errorf("error deleting data for blocks %v: %w", blockRange, err)
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blocksToDelete); err != nil {
return fmt.Errorf("error deleting data for blocks %v: %w", blocksToDelete, err)
}
if err := rh.storage.MainStorage.InsertBlockData(&data); err != nil {
return fmt.Errorf("error saving data to main storage: %w", err)
Expand Down
Loading
Loading