diff --git a/cmd/fireantelope/main.go b/cmd/fireantelope/main.go index b0cc04f..e1e9805 100644 --- a/cmd/fireantelope/main.go +++ b/cmd/fireantelope/main.go @@ -60,6 +60,7 @@ func Chain() *firecore.Chain[*pbantelope.Block] { //toolsCmd.AddCommand(newToolsBackfillCmd(zlog)) parent.AddCommand(newPollerCmd(zlog, tracer)) parent.AddCommand(newSilkwormPollerCmd(zlog, tracer)) + parent.AddCommand(newCheckBlocksCmd(zlog)) return nil }, diff --git a/cmd/fireantelope/tools_fix_invalid_utf8.go b/cmd/fireantelope/tools_fix_invalid_utf8.go new file mode 100644 index 0000000..86d16eb --- /dev/null +++ b/cmd/fireantelope/tools_fix_invalid_utf8.go @@ -0,0 +1,109 @@ +package main + +import ( + "errors" + "fmt" + pbantelope "github.com/pinax-network/firehose-antelope/types/pb/sf/antelope/type/v1" + "github.com/spf13/cobra" + "github.com/streamingfast/bstream" + "github.com/streamingfast/dstore" + firecore "github.com/streamingfast/firehose-core" + "go.uber.org/zap" + "io" +) + +func newCheckBlocksCmd(logger *zap.Logger) *cobra.Command { + return &cobra.Command{ + Use: "check-blocks ", + Short: "checks all blocks for decoding issues (detects the invalid UTF-8 issue).", + Args: cobra.ExactArgs(3), + RunE: createCheckBlocksE(logger), + } +} + +func createCheckBlocksE(logger *zap.Logger) firecore.CommandExecutor { + return func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + srcStore, err := dstore.NewDBinStore(args[0]) + if err != nil { + return fmt.Errorf("unable to create source store: %w", err) + } + + start := mustParseUint64(args[1]) + stop := mustParseUint64(args[2]) + + if stop <= start { + return fmt.Errorf("stop block must be greater than start block") + } + + logger.Info("checking antelope blocks", zap.Uint64("start_block", start), zap.Uint64("stop_block", stop)) + + lastFileProcessed := "" + startWalkFrom := fmt.Sprintf("%010d", start-(start%100)) + err = srcStore.WalkFrom(ctx, "", startWalkFrom, func(filename string) error { + logger.Debug("checking merged block file", zap.String("filename", filename)) + + startBlock := mustParseUint64(filename) + + if startBlock > stop { + logger.Debug("stopping at merged block file above stop block", zap.String("filename", filename), zap.Uint64("stop", stop)) + return io.EOF + } + + if startBlock+100 < start { + logger.Debug("skipping merged block file below start block", zap.String("filename", filename)) + return nil + } + + rc, err := srcStore.OpenObject(ctx, filename) + if err != nil { + return fmt.Errorf("failed to open %s: %w", filename, err) + } + defer rc.Close() + + br, err := bstream.NewDBinBlockReader(rc) + if err != nil { + return fmt.Errorf("creating block reader: %w", err) + } + + i := 0 + for { + block, err := br.Read() + if err == io.EOF { + break + } + + var antelopeBlock pbantelope.Block + err = block.Payload.UnmarshalTo(&antelopeBlock) + if err != nil { + fmt.Printf("block_num: %d - unable to decode: %s\n", block.Number, err) + } else { + logger.Debug("successfully decoded antelope block", zap.Any("block_num", block.Number)) + } + + i++ + } + + if i != 100 { + return fmt.Errorf("expected to have read 100 blocks, we have read %d. Bailing out.", i) + } + + lastFileProcessed = filename + logger.Info("finished merged block", zap.String("filename", filename)) + + return nil + }) + fmt.Printf("Last file processed: %s.dbin.zst\n", lastFileProcessed) + + if errors.Is(err, io.EOF) { + return nil + } + + if err != nil { + return err + } + + return nil + } +} diff --git a/cmd/fireantelope/util.go b/cmd/fireantelope/util.go new file mode 100644 index 0000000..e65fca8 --- /dev/null +++ b/cmd/fireantelope/util.go @@ -0,0 +1,15 @@ +package main + +import ( + "fmt" + "strconv" +) + +func mustParseUint64(in string) uint64 { + out, err := strconv.ParseUint(in, 10, 64) + if err != nil { + panic(fmt.Errorf("unable to parse %q as uint64: %w", in, err)) + } + + return out +} diff --git a/codec/block.go b/codec/block.go deleted file mode 100644 index 41c15bf..0000000 --- a/codec/block.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2019 dfuse Platform Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package codec - -// todo legacy function, bstream.StartBlockResolver has been removed, not sure if and how we need to replace -//func BlockstoreStartBlockResolver(blocksStore dstore.Store) bstream.StartBlockResolver { -// var errFound = errors.New("found") -// -// return func(ctx context.Context, targetBlockNum uint64) (uint64, string, error) { -// var dposLibNum uint32 -// num := uint32(targetBlockNum) -// -// handlerFunc := bstream.HandlerFunc(func(block *bstream.Block, _ interface{}) error { -// if block.Number == uint64(num) { -// dposLibNum = uint32(block.LibNum) -// return errFound -// } -// -// return nil -// }) -// -// fs := bstream.NewFileSource(blocksStore, targetBlockNum, handlerFunc, zlog, nil) -// -// go fs.Run() -// -// select { -// case <-ctx.Done(): -// fs.Shutdown(context.Canceled) -// return 0, "", ctx.Err() -// case <-fs.Terminated(): -// } -// -// if dposLibNum != 0 { -// return uint64(dposLibNum), "", nil -// } -// return 0, "", fs.Err() -// } -//} diff --git a/codec/init.go b/codec/init.go deleted file mode 100644 index 2dc40be..0000000 --- a/codec/init.go +++ /dev/null @@ -1,11 +0,0 @@ -package codec - -//func init() { -// bstream.GetBlockWriterFactory = bstream.BlockWriterFactoryFunc(BlockWriterFactory) -// bstream.GetBlockReaderFactory = bstream.BlockReaderFactoryFunc(BlockReaderFactory) -// bstream.GetBlockDecoder = bstream.BlockDecoderFunc(BlockDecoder) -// bstream.GetProtocolFirstStreamableBlock = 2 -// // todo legacy option, check if needs replacement -// // bstream.GetProtocolGenesisBlock = 1 -// bstream.GetBlockWriterHeaderLen = 10 -//}