diff --git a/cmd.go b/cmd.go index 52a2255..e53081f 100644 --- a/cmd.go +++ b/cmd.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "errors" "fmt" @@ -9,6 +10,7 @@ import ( "log" "os" "path/filepath" + "strconv" "strings" "text/template" @@ -39,9 +41,10 @@ func CreateRootCommand() *cobra.Command { starknetCmd := CreateStarknetCommand() crawlerCmd := CreateCrawlerCommand() indexCmd := CreateIndexCommand() + inspectorCmd := CreateInspectorCommand() evmCmd := CreateEVMCommand() synchronizerCmd := CreateSynchronizerCommand() - rootCmd.AddCommand(completionCmd, versionCmd, blockchainCmd, starknetCmd, evmCmd, crawlerCmd, indexCmd, synchronizerCmd) + rootCmd.AddCommand(completionCmd, versionCmd, blockchainCmd, starknetCmd, evmCmd, crawlerCmd, indexCmd, inspectorCmd, synchronizerCmd) // By default, cobra Command objects write to stderr. We have to forcibly set them to output to // stdout. @@ -316,6 +319,116 @@ func CreateSynchronizerCommand() *cobra.Command { return synchronizerCmd } +type BlockInspectItem struct { + StartBlock int64 + EndBlock int64 +} + +func CreateInspectorCommand() *cobra.Command { + inspectorCmd := &cobra.Command{ + Use: "inspector", + Short: "Inspect storage and database consistency", + } + + var chain, baseDir, delim, returnFunc string + var timeout int + + storageCommand := &cobra.Command{ + Use: "storage", + Short: "Inspect filesystem, gcp-storage, aws-bucket consistency", + PreRunE: func(cmd *cobra.Command, args []string) error { + storageErr := storage.CheckVariablesForStorage() + if storageErr != nil { + return storageErr + } + + crawlerErr := crawler.CheckVariablesForCrawler() + if crawlerErr != nil { + return crawlerErr + } + + return nil + }, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + + basePath := filepath.Join(baseDir, crawler.SeerCrawlerStoragePrefix, "data", chain) + storageInstance, newStorageErr := storage.NewStorage(storage.SeerCrawlerStorageType, basePath) + if newStorageErr != nil { + return newStorageErr + } + + // Only for gcp-storage type. + // Created for different manipulations what requires to list, + // if value set to prefix, required to set delim = '/' + var listReturnFunc storage.ListReturnFunc + switch storage.SeerCrawlerStorageType { + case "gcp-storage": + switch returnFunc { + case "prefix": + listReturnFunc = storage.GCSListReturnPrefixFunc + default: + listReturnFunc = storage.GCSListReturnNameFunc + } + default: + listReturnFunc = func(item any) string { return fmt.Sprintf("%v", item) } + } + + items, listErr := storageInstance.List(ctx, delim, timeout, listReturnFunc) + if listErr != nil { + return listErr + } + + itemsMap := make(map[string]BlockInspectItem) + previousMapItemKey := "" + + for _, item := range items { + itemSlice := strings.Split(item, "/") + blockNums := itemSlice[len(itemSlice)-2] + + blockNumsSlice := strings.Split(blockNums, "-") + + blockNumS, atoiErrS := strconv.ParseInt(blockNumsSlice[0], 10, 64) + if atoiErrS != nil { + log.Printf("Unable to parse blockNumS from %s", blockNumsSlice[0]) + continue + } + blockNumF, atoiErrF := strconv.ParseInt(blockNumsSlice[1], 10, 64) + if atoiErrF != nil { + log.Printf("Unable to parse blockNumS from %s", blockNumsSlice[1]) + continue + } + + if previousMapItemKey != blockNums && previousMapItemKey != "" { + diff := blockNumS - itemsMap[previousMapItemKey].EndBlock + if diff <= 0 { + fmt.Printf("Found incorrect blocks order between batches: %s -> %s\n", previousMapItemKey, blockNums) + } else if diff > 1 { + fmt.Printf("Found missing %d blocks during batches: %s -> %s\n", diff, previousMapItemKey, blockNums) + } + } + + previousMapItemKey = blockNums + itemsMap[blockNums] = BlockInspectItem{StartBlock: blockNumS, EndBlock: blockNumF} + } + + log.Printf("Processed %d items", len(itemsMap)) + + return nil + }, + } + + storageCommand.Flags().StringVar(&chain, "chain", "ethereum", "The blockchain to crawl (default: ethereum)") + storageCommand.Flags().StringVar(&baseDir, "base-dir", "", "The base directory to store the crawled data (default: '')") + storageCommand.Flags().StringVar(&delim, "delim", "", "Only for gcp-storage. The delimiter argument can be used to restrict the results to only the objects in the given 'directory'") + storageCommand.Flags().StringVar(&returnFunc, "return-func", "", "Which function use for return") + storageCommand.Flags().IntVar(&timeout, "timeout", 180, "List timeout (default: 180)") + + inspectorCmd.AddCommand(storageCommand) + + return inspectorCmd +} + func CreateIndexCommand() *cobra.Command { indexCommand := &cobra.Command{ diff --git a/storage/aws_bucket.go b/storage/aws_bucket.go index 817e999..a757e5c 100644 --- a/storage/aws_bucket.go +++ b/storage/aws_bucket.go @@ -2,6 +2,7 @@ package storage import ( "bytes" + "context" "path/filepath" "github.com/aws/aws-sdk-go/aws" @@ -88,6 +89,10 @@ func (s *S3) Delete(key string) error { } +func (s *S3) List(ctx context.Context, delim string, timeout int, returnFunc ListReturnFunc) ([]string, error) { + return []string{}, nil +} + func (s *S3) ReadBatch(readItems []ReadItem) (map[string][]string, error) { // Implement the ReadBatch method return nil, nil diff --git a/storage/filesystem.go b/storage/filesystem.go index 2e81440..903fcb1 100644 --- a/storage/filesystem.go +++ b/storage/filesystem.go @@ -2,6 +2,8 @@ package storage import ( "bufio" + "context" + "fmt" "log" "os" "path/filepath" @@ -115,6 +117,28 @@ func (fs *FileStorage) ReadBatch(readItems []ReadItem) (map[string][]string, err return result, nil } +func (fs *FileStorage) List(ctx context.Context, delim string, timeout int, returnFunc ListReturnFunc) ([]string, error) { + prefix := fmt.Sprintf("%s/", fs.BasePath) + log.Printf("Loading directory items with prefix: %s", prefix) + + dirs, readDirErr := os.ReadDir(prefix) + if readDirErr != nil { + return []string{}, readDirErr + } + + var items []string + itemsLen := 0 + + for _, d := range dirs { + items = append(items, fmt.Sprintf("%s%s/", prefix, d.Name())) + itemsLen++ + } + + log.Printf("Listed %d items", itemsLen) + + return items, nil +} + func (fs *FileStorage) Delete(key string) error { // Implement the Delete method diff --git a/storage/gcp_storage.go b/storage/gcp_storage.go index 7c53453..f6ca27d 100644 --- a/storage/gcp_storage.go +++ b/storage/gcp_storage.go @@ -5,10 +5,13 @@ import ( "context" "fmt" "io" + "log" "path/filepath" "strings" + "time" "cloud.google.com/go/storage" + "google.golang.org/api/iterator" ) // GCS implements the Storer interface for Google Cloud Storage @@ -85,6 +88,76 @@ func (g *GCS) Read(key string) ([]string, error) { return result, nil } +var ( + GCSListReturnNameFunc = func(item any) string { + if attr, ok := item.(*storage.ObjectAttrs); ok { + return attr.Name + } + return "" + } + + GCSListReturnPrefixFunc = func(item any) string { + if attr, ok := item.(*storage.ObjectAttrs); ok { + return attr.Prefix + } + return "" + } +) + +func (g *GCS) List(ctx context.Context, delim string, timeout int, returnFunc ListReturnFunc) ([]string, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(timeout)) + defer cancel() + + // Prefixes and delimiters can be used to emulate directory listings. + // Prefixes can be used to filter objects starting with prefix. + // The delimiter argument can be used to restrict the results to only the + // objects in the given "directory". Without the delimiter, the entire tree + // under the prefix is returned. + // + // For example, given these blobs: + // /a/1.txt + // /a/b/2.txt + // + // If you just specify prefix="a/", you'll get back: + // /a/1.txt + // /a/b/2.txt + // + // However, if you specify prefix="a/" and delim="/", you'll get back: + // /a/1.txt + prefix := fmt.Sprintf("%s/", g.BasePath) + log.Printf("Loading bucket items with prefix: %s and delim: %s", prefix, delim) + + it := g.Client.Bucket(SeerCrawlerStorageBucket).Objects(ctx, &storage.Query{ + Prefix: prefix, + Delimiter: delim, + }) + + var items []string + itemsLen := 0 + + for { + attrs, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return []string{}, fmt.Errorf("Bucket(%q).Objects: %w", SeerCrawlerStorageBucket, err) + } + + returnVal := returnFunc(attrs) + if returnVal == "" { + continue + } + + itemsLen++ + items = append(items, returnVal) + } + + log.Printf("Listed %d items", itemsLen) + + return items, nil +} + func (g *GCS) Delete(key string) error { ctx := context.Background() diff --git a/storage/storage.go b/storage/storage.go index b543451..cf948c9 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1,10 +1,15 @@ package storage +import "context" + +type ListReturnFunc func(any) string + type Storer interface { Save(batchDir, filename string, data []string) error Read(key string) ([]string, error) ReadBatch(readItems []ReadItem) (map[string][]string, error) Delete(key string) error + List(ctx context.Context, delim string, timeout int, returnFunc ListReturnFunc) ([]string, error) } type ReadItem struct {