Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronizer improvments and fixes #30

Merged
merged 8 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 27 additions & 47 deletions cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,10 @@ func CreateRootCommand() *cobra.Command {
blockchainCmd := CreateBlockchainCommand()
starknetCmd := CreateStarknetCommand()
crawlerCmd := CreateCrawlerCommand()
indexCmd := CreateIndexCommand()
inspectorCmd := CreateInspectorCommand()
evmCmd := CreateEVMCommand()
synchronizerCmd := CreateSynchronizerCommand()
rootCmd.AddCommand(completionCmd, versionCmd, blockchainCmd, starknetCmd, evmCmd, crawlerCmd, indexCmd, inspectorCmd, synchronizerCmd)
rootCmd.AddCommand(completionCmd, versionCmd, blockchainCmd, starknetCmd, evmCmd, crawlerCmd, inspectorCmd, synchronizerCmd)

// By default, cobra Command objects write to stderr. We have to forcibly set them to output to
// stdout.
Expand Down Expand Up @@ -284,9 +283,9 @@ func CreateCrawlerCommand() *cobra.Command {
}

func CreateSynchronizerCommand() *cobra.Command {
var startBlock, endBlock uint64
var startBlock, endBlock, batchSize uint64
var timeout int
var chain, baseDir, output, abi_source string
var chain, baseDir, customerDbUriFlag string

synchronizerCmd := &cobra.Command{
Use: "synchronizer",
Expand Down Expand Up @@ -321,12 +320,23 @@ func CreateSynchronizerCommand() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
indexer.InitDBConnection()

newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, timeout)
newSynchronizer, synchonizerErr := synchronizer.NewSynchronizer(chain, baseDir, startBlock, endBlock, batchSize, timeout)
if synchonizerErr != nil {
return synchonizerErr
}

newSynchronizer.SyncCustomers()
latestBlockNumber, latestErr := newSynchronizer.Client.GetLatestBlockNumber()
if latestErr != nil {
return fmt.Errorf("Failed to get latest block number: %v", latestErr)
}

if startBlock > latestBlockNumber.Uint64() {
log.Fatalf("Start block could not be greater then latest block number at blockchain")
}

crawler.CurrentBlockchainState.SetLatestBlockNumber(latestBlockNumber)

newSynchronizer.Start(customerDbUriFlag)

return nil
},
Expand All @@ -337,8 +347,8 @@ func CreateSynchronizerCommand() *cobra.Command {
synchronizerCmd.Flags().Uint64Var(&endBlock, "end-block", 0, "The block number to end decoding at (default: latest block)")
synchronizerCmd.Flags().StringVar(&baseDir, "base-dir", "", "The base directory to store the crawled data (default: '')")
synchronizerCmd.Flags().IntVar(&timeout, "timeout", 30, "The timeout for the crawler in seconds (default: 30)")
synchronizerCmd.Flags().StringVar(&output, "output", "output", "The output directory to store the decoded data (default: output)")
synchronizerCmd.Flags().StringVar(&abi_source, "abi-source", "abi", "The source of the ABI (default: abi)")
synchronizerCmd.Flags().Uint64Var(&batchSize, "batch-size", 100, "The number of blocks to crawl in each batch (default: 100)")
synchronizerCmd.Flags().StringVar(&customerDbUriFlag, "customer-db-uri", "", "Set customer database URI for development. This workflow bypass fetching customer IDs and its database URL connection strings from mdb-v3-controller API")

return synchronizerCmd
}
Expand All @@ -357,9 +367,9 @@ func CreateInspectorCommand() *cobra.Command {
var chain, baseDir, delim, returnFunc, batch, target string
var timeout, row int

decodeCommand := &cobra.Command{
Use: "decode",
Short: "Decode proto data from storage",
readCommand := &cobra.Command{
Use: "read",
Short: "Read and decode indexed proto data from storage",
PreRunE: func(cmd *cobra.Command, args []string) error {
storageErr := storage.CheckVariablesForStorage()
if storageErr != nil {
Expand Down Expand Up @@ -414,11 +424,11 @@ func CreateInspectorCommand() *cobra.Command {
},
}

decodeCommand.Flags().StringVar(&chain, "chain", "ethereum", "The blockchain to crawl (default: ethereum)")
decodeCommand.Flags().StringVar(&baseDir, "base-dir", "", "The base directory to store the crawled data (default: '')")
decodeCommand.Flags().StringVar(&batch, "batch", "", "What batch to read")
decodeCommand.Flags().StringVar(&target, "target", "", "What to read: blocks, logs or transactions")
decodeCommand.Flags().IntVar(&row, "row", 0, "Row to read (default: 0)")
readCommand.Flags().StringVar(&chain, "chain", "ethereum", "The blockchain to crawl (default: ethereum)")
readCommand.Flags().StringVar(&baseDir, "base-dir", "", "The base directory to store the crawled data (default: '')")
readCommand.Flags().StringVar(&batch, "batch", "", "What batch to read")
readCommand.Flags().StringVar(&target, "target", "", "What to read: blocks, logs or transactions")
readCommand.Flags().IntVar(&row, "row", 0, "Row to read (default: 0)")

var storageVerify bool

Expand Down Expand Up @@ -599,41 +609,11 @@ func CreateInspectorCommand() *cobra.Command {
storageCommand.Flags().StringVar(&returnFunc, "return-func", "", "Which function use for return")
storageCommand.Flags().IntVar(&timeout, "timeout", 180, "List timeout (default: 180)")

inspectorCmd.AddCommand(storageCommand, decodeCommand, dbCommand)
inspectorCmd.AddCommand(storageCommand, readCommand, dbCommand)

return inspectorCmd
}

func CreateIndexCommand() *cobra.Command {

indexCommand := &cobra.Command{
Use: "index",
Short: "Index storage of moonstream blockstore",
}

// subcommands

initializeCommand := &cobra.Command{
Use: "initialize",
Short: "Initialize the index storage",
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("Initializing index storage from go")
},
}

readCommand := &cobra.Command{
Use: "read",
Short: "Read the index storage",
Run: func(cmd *cobra.Command, args []string) {
// index.Read()
},
}

indexCommand.AddCommand(initializeCommand, readCommand)

return indexCommand
}

func CreateStarknetParseCommand() *cobra.Command {
var infile string
var rawABI []byte
Expand Down
9 changes: 4 additions & 5 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Crawler struct {
blockchain string
startBlock int64
endBlock int64
blocksBatch int64
batchSize int64
confirmations int64
force bool
baseDir string
Expand Down Expand Up @@ -75,7 +75,7 @@ func NewCrawler(blockchain string, startBlock, endBlock, batchSize, confirmation
blockchain: blockchain,
startBlock: startBlock,
endBlock: endBlock,
blocksBatch: batchSize,
batchSize: batchSize,
confirmations: confirmations,
force: force,
baseDir: baseDir,
Expand Down Expand Up @@ -129,7 +129,6 @@ func SetDefaultStartBlock(confirmations int64, latestBlockNumber *big.Int) int64
func (c *Crawler) Start(threads int) {
latestBlockNumber := CurrentBlockchainState.GetLatestBlockNumber()
if c.force {
// Start form specified startBlock if it is set and not 0
if c.startBlock == 0 {
c.startBlock = SetDefaultStartBlock(c.confirmations, latestBlockNumber)
}
Expand All @@ -153,7 +152,7 @@ func (c *Crawler) Start(threads int) {
}
}

tempEndBlock := c.startBlock + c.blocksBatch
tempEndBlock := c.startBlock + c.batchSize
var safeBlock int64

retryWaitTime := 10 * time.Second
Expand Down Expand Up @@ -181,7 +180,7 @@ func (c *Crawler) Start(threads int) {

safeBlock = latestBlockNumber.Int64() - c.confirmations

tempEndBlock = c.startBlock + c.blocksBatch
tempEndBlock = c.startBlock + c.batchSize
if c.endBlock != 0 {
if c.endBlock <= tempEndBlock {
tempEndBlock = c.endBlock
Expand Down
6 changes: 3 additions & 3 deletions indexer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func (p *PostgreSQLpgx) ReadABIJobs(blockchain string) ([]AbiJob, error) {
return nil, nil // or return an appropriate error if this is considered an error state
}

log.Println("Parsed abiJobs:", len(abiJobs), " for blockchain:", blockchain)
log.Println("Parsed abiJobs:", len(abiJobs), "for blockchain:", blockchain)
// If you need to process or log the first ABI job separately, do it here

return abiJobs, nil
Expand Down Expand Up @@ -1001,7 +1001,7 @@ func (p *PostgreSQLpgx) WriteEvents(blockchain string, events []EventLabel) erro
}
}

log.Printf("Records %d events inserted into %s", len(events), tableName)
log.Printf("Pushed %d events into %s", len(events), tableName)

return nil
}
Expand Down Expand Up @@ -1073,6 +1073,6 @@ func (p *PostgreSQLpgx) WriteTransactions(blockchain string, transactions []Tran
}
}

log.Printf("Records %d transactions inserted into %s", len(transactions), tableName)
log.Printf("Pushed %d transactions into %s", len(transactions), tableName)
return nil
}
3 changes: 1 addition & 2 deletions sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,4 @@ export SEER_CRAWLER_STORAGE_PREFIX="<dev_or_prod>"

# Environment variables for local development
export MOONSTREAM_STORAGE_GCP_SERVICE_ACCOUNT_CREDS_PATH="<path_to_json_credentials_file_for_service_accout_at_google_cloud>"

export SEER_CRAWLER_DEBUG=false
export SEER_CRAWLER_DEBUG=false
Loading
Loading