Skip to content

Commit

Permalink
Inspect command
Browse files Browse the repository at this point in the history
  • Loading branch information
kompotkot committed Jun 13, 2024
1 parent a06cf14 commit 0e786cb
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 1 deletion.
115 changes: 114 additions & 1 deletion cmd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -9,6 +10,7 @@ import (
"log"
"os"
"path/filepath"
"strconv"
"strings"
"text/template"

Expand Down Expand Up @@ -39,9 +41,10 @@ func CreateRootCommand() *cobra.Command {
starknetCmd := CreateStarknetCommand()
crawlerCmd := CreateCrawlerCommand()
indexCmd := CreateIndexCommand()
inspectorCmd := CreateInspectorCommand()
evmCmd := CreateEVMCommand()
synchronizerCmd := CreateSynchronizerCommand()
rootCmd.AddCommand(completionCmd, versionCmd, blockchainCmd, starknetCmd, evmCmd, crawlerCmd, indexCmd, synchronizerCmd)
rootCmd.AddCommand(completionCmd, versionCmd, blockchainCmd, starknetCmd, evmCmd, crawlerCmd, indexCmd, inspectorCmd, synchronizerCmd)

// By default, cobra Command objects write to stderr. We have to forcibly set them to output to
// stdout.
Expand Down Expand Up @@ -316,6 +319,116 @@ func CreateSynchronizerCommand() *cobra.Command {
return synchronizerCmd
}

type BlockInspectItem struct {
StartBlock int64
EndBlock int64
}

func CreateInspectorCommand() *cobra.Command {
inspectorCmd := &cobra.Command{
Use: "inspector",
Short: "Inspect storage and database consistency",
}

var chain, baseDir, delim, returnFunc string
var timeout int

storageCommand := &cobra.Command{
Use: "storage",
Short: "Inspect filesystem, gcp-storage, aws-bucket consistency",
PreRunE: func(cmd *cobra.Command, args []string) error {
storageErr := storage.CheckVariablesForStorage()
if storageErr != nil {
return storageErr
}

crawlerErr := crawler.CheckVariablesForCrawler()
if crawlerErr != nil {
return crawlerErr
}

return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()

basePath := filepath.Join(baseDir, crawler.SeerCrawlerStoragePrefix, "data", chain)
storageInstance, newStorageErr := storage.NewStorage(storage.SeerCrawlerStorageType, basePath)
if newStorageErr != nil {
return newStorageErr
}

// Only for gcp-storage type.
// Created for different manipulations what requires to list,
// if value set to prefix, required to set delim = '/'
var listReturnFunc storage.ListReturnFunc
switch storage.SeerCrawlerStorageType {
case "gcp-storage":
switch returnFunc {
case "prefix":
listReturnFunc = storage.GCSListReturnPrefixFunc
default:
listReturnFunc = storage.GCSListReturnNameFunc
}
default:
listReturnFunc = func(item any) string { return fmt.Sprintf("%v", item) }
}

items, listErr := storageInstance.List(ctx, delim, timeout, listReturnFunc)
if listErr != nil {
return listErr
}

itemsMap := make(map[string]BlockInspectItem)
previousMapItemKey := ""

for _, item := range items {
itemSlice := strings.Split(item, "/")
blockNums := itemSlice[len(itemSlice)-2]

blockNumsSlice := strings.Split(blockNums, "-")

blockNumS, atoiErrS := strconv.ParseInt(blockNumsSlice[0], 10, 64)
if atoiErrS != nil {
log.Printf("Unable to parse blockNumS from %s", blockNumsSlice[0])
continue
}
blockNumF, atoiErrF := strconv.ParseInt(blockNumsSlice[1], 10, 64)
if atoiErrF != nil {
log.Printf("Unable to parse blockNumS from %s", blockNumsSlice[1])
continue
}

if previousMapItemKey != blockNums && previousMapItemKey != "" {
diff := blockNumS - itemsMap[previousMapItemKey].EndBlock
if diff <= 0 {
fmt.Printf("Found incorrect blocks order between batches: %s -> %s\n", previousMapItemKey, blockNums)
} else if diff > 1 {
fmt.Printf("Found missing %d blocks during batches: %s -> %s\n", diff, previousMapItemKey, blockNums)
}
}

previousMapItemKey = blockNums
itemsMap[blockNums] = BlockInspectItem{StartBlock: blockNumS, EndBlock: blockNumF}
}

log.Printf("Processed %d items", len(itemsMap))

return nil
},
}

storageCommand.Flags().StringVar(&chain, "chain", "ethereum", "The blockchain to crawl (default: ethereum)")
storageCommand.Flags().StringVar(&baseDir, "base-dir", "", "The base directory to store the crawled data (default: '')")
storageCommand.Flags().StringVar(&delim, "delim", "", "Only for gcp-storage. The delimiter argument can be used to restrict the results to only the objects in the given 'directory'")
storageCommand.Flags().StringVar(&returnFunc, "return-func", "", "Which function use for return")
storageCommand.Flags().IntVar(&timeout, "timeout", 180, "List timeout (default: 180)")

inspectorCmd.AddCommand(storageCommand)

return inspectorCmd
}

func CreateIndexCommand() *cobra.Command {

indexCommand := &cobra.Command{
Expand Down
5 changes: 5 additions & 0 deletions storage/aws_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"bytes"
"context"
"path/filepath"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -88,6 +89,10 @@ func (s *S3) Delete(key string) error {

}

func (s *S3) List(ctx context.Context, delim string, timeout int, returnFunc ListReturnFunc) ([]string, error) {
return []string{}, nil
}

func (s *S3) ReadBatch(readItems []ReadItem) (map[string][]string, error) {
// Implement the ReadBatch method
return nil, nil
Expand Down
24 changes: 24 additions & 0 deletions storage/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package storage

import (
"bufio"
"context"
"fmt"
"log"
"os"
"path/filepath"
Expand Down Expand Up @@ -115,6 +117,28 @@ func (fs *FileStorage) ReadBatch(readItems []ReadItem) (map[string][]string, err
return result, nil
}

func (fs *FileStorage) List(ctx context.Context, delim string, timeout int, returnFunc ListReturnFunc) ([]string, error) {
prefix := fmt.Sprintf("%s/", fs.BasePath)
log.Printf("Loading directory items with prefix: %s", prefix)

dirs, readDirErr := os.ReadDir(prefix)
if readDirErr != nil {
return []string{}, readDirErr
}

var items []string
itemsLen := 0

for _, d := range dirs {
items = append(items, fmt.Sprintf("%s%s/", prefix, d.Name()))
itemsLen++
}

log.Printf("Listed %d items", itemsLen)

return items, nil
}

func (fs *FileStorage) Delete(key string) error {

// Implement the Delete method
Expand Down
73 changes: 73 additions & 0 deletions storage/gcp_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"context"
"fmt"
"io"
"log"
"path/filepath"
"strings"
"time"

"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
)

// GCS implements the Storer interface for Google Cloud Storage
Expand Down Expand Up @@ -85,6 +88,76 @@ func (g *GCS) Read(key string) ([]string, error) {
return result, nil
}

var (
GCSListReturnNameFunc = func(item any) string {
if attr, ok := item.(*storage.ObjectAttrs); ok {
return attr.Name
}
return ""
}

GCSListReturnPrefixFunc = func(item any) string {
if attr, ok := item.(*storage.ObjectAttrs); ok {
return attr.Prefix
}
return ""
}
)

func (g *GCS) List(ctx context.Context, delim string, timeout int, returnFunc ListReturnFunc) ([]string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(timeout))
defer cancel()

// Prefixes and delimiters can be used to emulate directory listings.
// Prefixes can be used to filter objects starting with prefix.
// The delimiter argument can be used to restrict the results to only the
// objects in the given "directory". Without the delimiter, the entire tree
// under the prefix is returned.
//
// For example, given these blobs:
// /a/1.txt
// /a/b/2.txt
//
// If you just specify prefix="a/", you'll get back:
// /a/1.txt
// /a/b/2.txt
//
// However, if you specify prefix="a/" and delim="/", you'll get back:
// /a/1.txt
prefix := fmt.Sprintf("%s/", g.BasePath)
log.Printf("Loading bucket items with prefix: %s and delim: %s", prefix, delim)

it := g.Client.Bucket(SeerCrawlerStorageBucket).Objects(ctx, &storage.Query{
Prefix: prefix,
Delimiter: delim,
})

var items []string
itemsLen := 0

for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return []string{}, fmt.Errorf("Bucket(%q).Objects: %w", SeerCrawlerStorageBucket, err)
}

returnVal := returnFunc(attrs)
if returnVal == "" {
continue
}

itemsLen++
items = append(items, returnVal)
}

log.Printf("Listed %d items", itemsLen)

return items, nil
}

func (g *GCS) Delete(key string) error {

ctx := context.Background()
Expand Down
5 changes: 5 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package storage

import "context"

type ListReturnFunc func(any) string

type Storer interface {
Save(batchDir, filename string, data []string) error
Read(key string) ([]string, error)
ReadBatch(readItems []ReadItem) (map[string][]string, error)
Delete(key string) error
List(ctx context.Context, delim string, timeout int, returnFunc ListReturnFunc) ([]string, error)
}

type ReadItem struct {
Expand Down

0 comments on commit 0e786cb

Please sign in to comment.