Skip to content

Commit

Permalink
Fix tier2call (#447)
Browse files Browse the repository at this point in the history
* Fix tier2call

* Renaming flags and handling dstore error

* Hope it will pass steph

* Rename state bundle size
  • Loading branch information
ArnaudBger authored Apr 3, 2024
1 parent 3d026f4 commit 209622e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
5 changes: 4 additions & 1 deletion service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s
logger.Info("incoming substreams ProcessRange request", fields...)

switch {

case request.MeteringConfig == "":
return fmt.Errorf("metering config is required in request")
case request.BlockType == "":
Expand Down Expand Up @@ -222,6 +221,10 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P
s.runtimeConfig.StateBundleSize = request.StateBundleSize

mergedBlocksStore, err := dstore.NewDBinStore(request.MergedBlocksStore)
if err != nil {
return fmt.Errorf("setting up block store from url %q: %w", request.MergedBlocksStore, err)
}

if cloned, ok := mergedBlocksStore.(dstore.Clonable); ok {
mergedBlocksStore, err = cloned.Clone(ctx)
if err != nil {
Expand Down
30 changes: 24 additions & 6 deletions tools/tier2call.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@ func init() {
tier2CallCmd.Flags().Bool("insecure", false, "Skip certificate validation on GRPC connection")
tier2CallCmd.Flags().Bool("plaintext", false, "Establish GRPC connection in plaintext")
tier2CallCmd.Flags().StringSliceP("header", "H", nil, "Additional headers to be sent in the substreams request")

tier2CallCmd.Flags().StringArrayP("params", "p", nil, "Set a params for parameterizable modules. Can be specified multiple times. Ex: -p module1=valA -p module2=valX&valY")
tier2CallCmd.Flags().String("metering-plugin", "null://", "Metering configuration")
tier2CallCmd.Flags().String("block-type", "sf.ethereum.type.v2.Block", "Block type")
tier2CallCmd.Flags().String("merged-blocks-store-url", "./firehose-data/storage/merged-blocks", "Merged blocks store")
tier2CallCmd.Flags().Uint64("state-bundle-size", uint64(1_000), "State segment size")
tier2CallCmd.Flags().String("state-store-url", "./firehose-data/localdata", "Substreams state data storage")
tier2CallCmd.Flags().String("state-store-default-tag", "", "Substreams state store default tag")

Cmd.AddCommand(tier2CallCmd)
}
Expand Down Expand Up @@ -92,12 +97,25 @@ func tier2CallE(cmd *cobra.Command, args []string) error {
ctx = metadata.AppendToOutgoingContext(ctx, headerArray...)
}

meteringConfig := mustGetString(cmd, "metering-plugin")
blockType := mustGetString(cmd, "block-type")
stateStore := mustGetString(cmd, "state-store-url")
stateStoreDefaultTag := mustGetString(cmd, "state-store-default-tag")
mergedBlocksStore := mustGetString(cmd, "merged-blocks-store-url")
stateBundleSize := mustGetUint64(cmd, "state-bundle-size")

req, err := ssClient.ProcessRange(ctx, &pbssinternal.ProcessRangeRequest{
StartBlockNum: uint64(startBlock),
StopBlockNum: uint64(stopBlock),
OutputModule: outputModule,
Modules: pkg.Modules,
Stage: uint32(stage),
StartBlockNum: uint64(startBlock),
StopBlockNum: uint64(stopBlock),
OutputModule: outputModule,
Modules: pkg.Modules,
Stage: uint32(stage),
MeteringConfig: meteringConfig,
BlockType: blockType,
MergedBlocksStore: mergedBlocksStore,
StateBundleSize: stateBundleSize,
StateStore: stateStore,
StateStoreDefaultTag: stateStoreDefaultTag,
}, callOpts...)
if err != nil {
return fmt.Errorf("process range request: %w", err)
Expand Down

0 comments on commit 209622e

Please sign in to comment.