Skip to content

Commit

Permalink
add output module hash to metering
Browse files Browse the repository at this point in the history
  • Loading branch information
colindickson committed Oct 30, 2024
1 parent c9d10c6 commit ebdc718
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 19 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
github.com/rs/cors v1.10.0
github.com/schollz/closestmatch v2.1.0+incompatible
github.com/shopspring/decimal v1.3.1
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951
github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545
github.com/streamingfast/sf-tracing v0.0.0-20240430173521-888827872b90
github.com/streamingfast/shutter v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,8 @@ github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 h1:xJB7rXnOHLes
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1/go.mod h1:QSm/AfaDsE0k1xBYi0lW580YJ/WDV/FKZI628tkZR0Y=
github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca h1:/k5H6MUo5Vi8AKPsSr+TMeA/XJ0uMyEX6feHpOozTlQ=
github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca/go.mod h1:NuKCwOHjbT0nRji0O+7+c70AiBfLHEKNoovs/gFfMPY=
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 h1:6o6MS3JHrp9A7V6EBHbR7W7mzVCFmXc8U0AjTfvz7PI=
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05 h1:jjx6kO2z0mNSKElL8YHnjt65+NB/CSsJH5C9efyrzw8=
github.com/streamingfast/dmetering v0.0.0-20241028183059-d11d4ec85e05/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw=
github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14 h1:/2HxIOzAgUBKyxjDO4IJPzBBaEAtzwipb/2/JGsOArA=
Expand Down
11 changes: 7 additions & 4 deletions metering/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMet
endpoint = fmt.Sprintf("%s%s", endpoint, "Backfill")
}

outputModuleHash := reqctx.OutputModuleHash(ctx)

meter := dmetering.GetBytesMeter(ctx)

bytesRead := meter.BytesReadDelta()
Expand All @@ -143,10 +145,11 @@ func (ms *MetricsSender) Send(ctx context.Context, userID, apiKeyID, ip, userMet
meter.CountInc(TotalWriteBytes, int(totalWriteBytes))

event := dmetering.Event{
UserID: userID,
ApiKeyID: apiKeyID,
IpAddress: ip,
Meta: userMeta,
UserID: userID,
ApiKeyID: apiKeyID,
IpAddress: ip,
Meta: userMeta,
OutputModuleHash: outputModuleHash,

Endpoint: endpoint,
Metrics: map[string]float64{
Expand Down
2 changes: 2 additions & 0 deletions reqctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var tracerKey = contextKeyType(2)
var spanKey = contextKeyType(3)
var reqStatsKey = contextKeyType(4)
var moduleExecutionTracingConfigKey = contextKeyType(5)
var outputModuleHashKey = contextKeyType(6)
var tier2RequestParametersKeyKey = contextKeyType(7)

func Logger(ctx context.Context) *zap.Logger {
return logging.Logger(ctx, zap.NewNop())
Expand Down
14 changes: 14 additions & 0 deletions reqctx/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,17 @@ func IsBackfillerRequest(ctx context.Context) bool {
_, ok = md[backFillerKey]
return ok
}

type outputModuleKeyType int

func WithOutputModuleHash(ctx context.Context, hash string) context.Context {
return context.WithValue(ctx, outputModuleHashKey, hash)
}

func OutputModuleHash(ctx context.Context) string {
hash, ok := ctx.Value(outputModuleHashKey).(string)
if !ok {
return ""
}
return hash
}
4 changes: 0 additions & 4 deletions reqctx/tier2request.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ type Tier2RequestParameters struct {
WASMModules map[string]string
}

type tier2RequestParametersKey int

const tier2RequestParametersKeyKey = tier2RequestParametersKey(0)

func WithTier2RequestParameters(ctx context.Context, parameters Tier2RequestParameters) context.Context {
return context.WithValue(ctx, tier2RequestParametersKeyKey, parameters)
}
Expand Down
17 changes: 9 additions & 8 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,24 +219,25 @@ func (s *Tier1Service) Blocks(
mut.Unlock()
}()

respFunc := tier1ResponseHandler(respContext, &mut, logger, stream)

span.SetAttributes(attribute.Int64("substreams.tier", 1))

request := req.Msg
if request.Modules == nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request"))
}

if err := ValidateTier1Request(request, s.blockType); err != nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
}

execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules, bstream.GetProtocolFirstStreamableBlock)
if err != nil {
return bsstream.NewErrInvalidArg(err.Error())
}
outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule)
ctx = reqctx.WithOutputModuleHash(ctx, outputModuleHash)

respFunc := tier1ResponseHandler(respContext, &mut, logger, stream)

span.SetAttributes(attribute.Int64("substreams.tier", 1))

if err := ValidateTier1Request(request, s.blockType); err != nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
}

moduleNames := make([]string, len(request.Modules.Modules))
for i := 0; i < len(moduleNames); i++ {
Expand Down
9 changes: 9 additions & 0 deletions service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"time"

"github.com/streamingfast/bstream"

"connectrpc.com/connect"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/streamingfast/bstream/stream"
Expand Down Expand Up @@ -206,6 +208,13 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
}

execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, true, request.Modules, bstream.GetProtocolFirstStreamableBlock)
if err != nil {
return bsstream.NewErrInvalidArg(err.Error())
}
outputModuleHash := execGraph.ModuleHashes().Get(request.OutputModule)
ctx = reqctx.WithOutputModuleHash(ctx, outputModuleHash)

emitter, err := dmetering.New(request.MeteringConfig, logger)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("unable to initialize dmetering: %w", err))
Expand Down

0 comments on commit ebdc718

Please sign in to comment.