Skip to content

Commit

Permalink
Metering update: more detailed metering with addition of new metrics.…
Browse files Browse the repository at this point in the history
… *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead.
  • Loading branch information
colindickson committed Aug 27, 2024
1 parent 3cf18ca commit 05a4548
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 92 deletions.
2 changes: 2 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

* Metering update: more detailed metering with addition of new metrics (`live_uncompressed_read_bytes`, `live_uncompressed_read_forked_bytes`, `file_uncompressed_read_bytes`, `file_uncompressed_read_forked_bytes`, `file_compressed_read_forked_bytes`, `file_compressed_read_bytes`, `file_uncompressed_write_bytes`, `file_compressed_write_bytes`). *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead.

### Server
* Add `sf.substreams.rpc.v2.EndpointInfo/Info` endpoint (if the infoserver is given as a module, i.e. from firehose-core)
* Add an execution timeout of 3 minutes per block by default (can be overriden in tier1/tier2 Configs) -- this is useful when an external (eth_call) is stuck on a forked block hash.
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ require (
github.com/jhump/protoreflect v1.14.0
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5 // indirect
github.com/streamingfast/bstream v0.0.2-0.20240603153252-ec8d37625188
github.com/streamingfast/bstream v0.0.2-0.20240819202225-ca1b790abf0b
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80
github.com/streamingfast/dauth v0.0.0-20240219205130-bfe428489338
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1
github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca
github.com/streamingfast/dstore v0.1.1-0.20240311181234-470a7a84936f
github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb
github.com/stretchr/testify v1.8.4
Expand Down Expand Up @@ -57,7 +57,7 @@ require (
github.com/rs/cors v1.10.0
github.com/schollz/closestmatch v2.1.0+incompatible
github.com/shopspring/decimal v1.3.1
github.com/streamingfast/dmetering v0.0.0-20240403142935-dc8bb3bb32c3
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545
github.com/streamingfast/sf-tracing v0.0.0-20240430173521-888827872b90
github.com/streamingfast/shutter v1.5.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU=
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
github.com/streamingfast/bstream v0.0.2-0.20240603153252-ec8d37625188 h1:aso0Q0qT9h1ICG6//G9P0VQpql2c2A87J8YMUaJipQs=
github.com/streamingfast/bstream v0.0.2-0.20240603153252-ec8d37625188/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg=
github.com/streamingfast/bstream v0.0.2-0.20240819202225-ca1b790abf0b h1:LbT8xpXFY5bsZbQfhQJGcXUBXbl/QZZ7CqfN6nLtpwM=
github.com/streamingfast/bstream v0.0.2-0.20240819202225-ca1b790abf0b/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg=
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80 h1:UxJUTcEVkdZy8N77E3exz0iNlgQuxl4m220GPvzdZ2s=
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80/go.mod h1:QxjVH73Lkqk+mP8bndvhMuQDUINfkgsYhdCH/5TJFKI=
github.com/streamingfast/dauth v0.0.0-20240219205130-bfe428489338 h1:o3Imquu+RhIdF62OSr/ZxVPsn6jpKHwBV/Upl6P28o0=
Expand All @@ -533,12 +533,12 @@ github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 h1:xJB7rXnOHLes
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1/go.mod h1:QSm/AfaDsE0k1xBYi0lW580YJ/WDV/FKZI628tkZR0Y=
github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca h1:/k5H6MUo5Vi8AKPsSr+TMeA/XJ0uMyEX6feHpOozTlQ=
github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca/go.mod h1:NuKCwOHjbT0nRji0O+7+c70AiBfLHEKNoovs/gFfMPY=
github.com/streamingfast/dmetering v0.0.0-20240403142935-dc8bb3bb32c3 h1:u3C2jzTc7d58PvVjlZew4HmZ1g1xr9yWBd8eWjmQNig=
github.com/streamingfast/dmetering v0.0.0-20240403142935-dc8bb3bb32c3/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 h1:6o6MS3JHrp9A7V6EBHbR7W7mzVCFmXc8U0AjTfvz7PI=
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw=
github.com/streamingfast/dstore v0.1.1-0.20240311181234-470a7a84936f h1:a7ANk6z1IiEMMUEV02Y5QoczDy/bOxeu9py5d3Kmw6E=
github.com/streamingfast/dstore v0.1.1-0.20240311181234-470a7a84936f/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w=
github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2 h1:BB3VSDl8/OHBSvjqfgufwqr4tD5l7XPjXybDm6uudj4=
github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 h1:K6mJPvh1jAL+/gBS7Bh9jyzWaTib6N47m06gZOTUPwQ=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns=
github.com/streamingfast/graph v0.0.0-20220329181048-a5710712d873 h1:8+TXT26c2p6f8JMg0w4JQ8t4fAbfMmR11zLN4KFVRC8=
Expand Down
133 changes: 133 additions & 0 deletions metering/metering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package metering

import (
"context"
"time"

"github.com/streamingfast/dmetering"
"github.com/streamingfast/dstore"
"github.com/streamingfast/substreams/reqctx"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

const (
MeterLiveUncompressedReadBytes = "live_uncompressed_read_bytes"
MeterLiveUncompressedReadForkedBytes = "live_uncompressed_read_forked_bytes"

MeterFileUncompressedReadBytes = "file_uncompressed_read_bytes"
MeterFileCompressedReadBytes = "file_compressed_read_bytes"

MeterFileUncompressedReadForkedBytes = "file_uncompressed_read_forked_bytes"
MeterFileCompressedReadForkedBytes = "file_compressed_read_forked_bytes"

MeterFileUncompressedWriteBytes = "file_uncompressed_write_bytes"
MeterFileCompressedWriteBytes = "file_compressed_write_bytes"

MeterWasmInputBytes = "wasm_input_bytes"

TotalReadBytes = "total_read_bytes"
TotalWriteBytes = "total_write_bytes"
)

func WithBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstore.Option {
var opts []dstore.Option
opts = append(opts, dstore.WithCompressedReadCallback(func(ctx context.Context, n int) {
meter.CountInc(MeterFileCompressedReadBytes, n)
}))

return opts
}

func WithForkedBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstore.Option {
var opts []dstore.Option
opts = append(opts, dstore.WithCompressedReadCallback(func(ctx context.Context, n int) {
meter.CountInc(MeterFileCompressedReadForkedBytes, n)
}))

return opts
}

func WithBytesMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstore.Option {
var opts []dstore.Option
opts = append(opts, dstore.WithUncompressedReadCallback(func(ctx context.Context, n int) {
meter.CountInc(MeterFileUncompressedReadBytes, n)
}))
opts = append(opts, dstore.WithCompressedReadCallback(func(ctx context.Context, n int) {
meter.CountInc(MeterFileCompressedReadBytes, n)
}))
opts = append(opts, dstore.WithUncompressedWriteCallback(func(ctx context.Context, n int) {
meter.CountInc(MeterFileUncompressedWriteBytes, n)
}))
opts = append(opts, dstore.WithCompressedWriteCallback(func(ctx context.Context, n int) {
meter.CountInc(MeterFileCompressedWriteBytes, n)
}))

return opts
}

func GetTotalBytesRead(meter dmetering.Meter) uint64 {
total := uint64(meter.GetCount(TotalReadBytes))
return total
}

func GetTotalBytesWritten(meter dmetering.Meter) uint64 {
total := uint64(meter.GetCount(TotalWriteBytes))
return total
}

func Send(ctx context.Context, meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) {
bytesRead := meter.BytesReadDelta()
bytesWritten := meter.BytesWrittenDelta()
egressBytes := proto.Size(resp)

inputBytes := meter.GetCountAndReset(MeterWasmInputBytes)

liveUncompressedReadBytes := meter.GetCountAndReset(MeterLiveUncompressedReadBytes)
liveUncompressedReadForkedBytes := meter.GetCountAndReset(MeterLiveUncompressedReadForkedBytes)
fileUncompressedReadBytes := meter.GetCountAndReset(MeterFileUncompressedReadBytes)
fileUncompressedReadForkedBytes := meter.GetCountAndReset(MeterFileUncompressedReadForkedBytes)
fileCompressedReadForkedBytes := meter.GetCountAndReset(MeterFileCompressedReadForkedBytes)
fileCompressedReadBytes := meter.GetCountAndReset(MeterFileCompressedReadBytes)

fileUncompressedWriteBytes := meter.GetCountAndReset(MeterFileUncompressedWriteBytes)
fileCompressedWriteBytes := meter.GetCountAndReset(MeterFileCompressedWriteBytes)

totalReadBytes := fileCompressedReadBytes + fileCompressedReadForkedBytes + liveUncompressedReadBytes + liveUncompressedReadForkedBytes
totalWriteBytes := fileUncompressedWriteBytes

meter.CountInc(TotalReadBytes, int(totalReadBytes))
meter.CountInc(TotalWriteBytes, int(totalWriteBytes))

event := dmetering.Event{
UserID: userID,
ApiKeyID: apiKeyID,
IpAddress: ip,
Meta: userMeta,

Endpoint: endpoint,
Metrics: map[string]float64{
"egress_bytes": float64(egressBytes),
"written_bytes": float64(bytesWritten),
"read_bytes": float64(bytesRead),
MeterWasmInputBytes: float64(inputBytes),
MeterLiveUncompressedReadBytes: float64(liveUncompressedReadBytes),
MeterLiveUncompressedReadForkedBytes: float64(liveUncompressedReadForkedBytes),
MeterFileUncompressedReadBytes: float64(fileUncompressedReadBytes),
MeterFileUncompressedReadForkedBytes: float64(fileUncompressedReadForkedBytes),
MeterFileCompressedReadForkedBytes: float64(fileCompressedReadForkedBytes),
MeterFileCompressedReadBytes: float64(fileCompressedReadBytes),
MeterFileUncompressedWriteBytes: float64(fileUncompressedWriteBytes),
MeterFileCompressedWriteBytes: float64(fileCompressedWriteBytes),
"message_count": 1,
},
Timestamp: time.Now(),
}

emitter := reqctx.Emitter(ctx)
if emitter == nil {
dmetering.Emit(context.WithoutCancel(ctx), event)
} else {
emitter.Emit(context.WithoutCancel(ctx), event)
}
}
11 changes: 7 additions & 4 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/streamingfast/substreams/metering"

"github.com/RoaringBitmap/roaring/roaring64"
"github.com/streamingfast/bstream"
"github.com/streamingfast/dmetering"
Expand Down Expand Up @@ -286,6 +288,7 @@ func (p *Pipeline) runParallelProcess(ctx context.Context, reqPlan *plan.Request
stream := response.New(p.respFunc)

meter := dmetering.GetBytesMeter(ctx)

for {
select {
case <-time.After(time.Millisecond * 500):
Expand All @@ -294,7 +297,7 @@ func (p *Pipeline) runParallelProcess(ctx context.Context, reqPlan *plan.Request
modStats := stats.AggregatedModulesStats()
remoteBytesRead, remoteBytesWritten := stats.RemoteBytesConsumption()

stream.SendModulesStats(modStats, stagesProgress, jobs, meter.BytesRead()+remoteBytesRead, meter.BytesWritten()+remoteBytesWritten)
stream.SendModulesStats(modStats, stagesProgress, jobs, metering.GetTotalBytesRead(meter)+remoteBytesRead, metering.GetTotalBytesWritten(meter)+remoteBytesWritten)
case <-progressCtx.Done():
return
}
Expand Down Expand Up @@ -412,7 +415,7 @@ func (p *Pipeline) returnRPCModuleProgressOutputs(forceOutput bool) error {

meter := dmetering.GetBytesMeter(p.ctx)
remoteBytesRead, remoteBytesWritten := stats.RemoteBytesConsumption()
return stream.SendModulesStats(modStats, stagesProgress, jobs, meter.BytesRead()+remoteBytesRead, meter.BytesWritten()+remoteBytesWritten)
return stream.SendModulesStats(modStats, stagesProgress, jobs, metering.GetTotalBytesRead(meter)+remoteBytesRead, metering.GetTotalBytesWritten(meter)+remoteBytesWritten)

}

Expand All @@ -421,8 +424,8 @@ func (p *Pipeline) toInternalUpdate(clock *pbsubstreams.Clock) *pbssinternal.Upd

out := &pbssinternal.Update{
DurationMs: uint64(time.Since(p.startTime).Milliseconds()),
TotalBytesRead: meter.BytesRead(),
TotalBytesWritten: meter.BytesWritten(),
TotalBytesRead: metering.GetTotalBytesRead(meter),
TotalBytesWritten: metering.GetTotalBytesWritten(meter),
ModulesStats: reqctx.ReqStats(p.ctx).LocalModulesStats(),
}

Expand Down
6 changes: 5 additions & 1 deletion pipeline/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/dmetering"
"github.com/streamingfast/substreams/metering"
"github.com/streamingfast/substreams/metrics"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
Expand Down Expand Up @@ -125,7 +126,10 @@ func (p *Pipeline) processBlock(
case bstream.StepNew:
p.blockStepMap[bstream.StepNew]++

// legacy metering
//todo: (deprecated)
dmetering.GetBytesMeter(ctx).AddBytesRead(execOutput.Len())

err = p.handleStepNew(ctx, clock, cursor, execOutput)
if err != nil && err != io.EOF {
return err
Expand Down Expand Up @@ -254,7 +258,7 @@ func (p *Pipeline) handleStepNew(ctx context.Context, clock *pbsubstreams.Clock,
return fmt.Errorf("pre block hook: %w", err)
}

dmetering.GetBytesMeter(ctx).CountInc("wasm_input_bytes", execOutput.Len())
dmetering.GetBytesMeter(ctx).CountInc(metering.MeterWasmInputBytes, execOutput.Len())
if err := p.executeModules(ctx, execOutput); err != nil {
return fmt.Errorf("execute modules: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion service/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/stream"
"go.uber.org/zap"
)

Expand All @@ -14,7 +15,8 @@ type StreamFactoryFunc func(ctx context.Context,
cursor string,
finalBlocksOnly bool,
cursorIsTarget bool,
logger *zap.Logger) (Streamable, error)
logger *zap.Logger,
extraOpts ...stream.Option) (Streamable, error)

type Streamable interface {
Run(ctx context.Context) error
Expand Down
43 changes: 0 additions & 43 deletions service/metering.go

This file was deleted.

Loading

0 comments on commit 05a4548

Please sign in to comment.