Skip to content

Commit

Permalink
metering: remove forked bytes metrics, refactor all metering code int…
Browse files Browse the repository at this point in the history
…o the metering package and add unit tests
  • Loading branch information
colindickson committed Oct 11, 2024
1 parent 599fda8 commit 460c5c3
Show file tree
Hide file tree
Showing 6 changed files with 399 additions and 48 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1
github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca
github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2
github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb
github.com/stretchr/testify v1.8.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,8 @@ github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 h1:6o6MS3J
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw=
github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2 h1:BB3VSDl8/OHBSvjqfgufwqr4tD5l7XPjXybDm6uudj4=
github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w=
github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14 h1:/2HxIOzAgUBKyxjDO4IJPzBBaEAtzwipb/2/JGsOArA=
github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 h1:K6mJPvh1jAL+/gBS7Bh9jyzWaTib6N47m06gZOTUPwQ=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns=
github.com/streamingfast/graph v0.0.0-20220329181048-a5710712d873 h1:8+TXT26c2p6f8JMg0w4JQ8t4fAbfMmR11zLN4KFVRC8=
Expand Down
58 changes: 16 additions & 42 deletions metering/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (

"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"

"github.com/streamingfast/substreams/metrics"

"github.com/streamingfast/dmetering"
"github.com/streamingfast/dstore"
"github.com/streamingfast/substreams/reqctx"
Expand All @@ -18,15 +15,11 @@ import (
)

const (
MeterLiveUncompressedReadBytes = "live_uncompressed_read_bytes"
MeterLiveUncompressedReadForkedBytes = "live_uncompressed_read_forked_bytes"
MeterLiveUncompressedReadBytes = "live_uncompressed_read_bytes"

MeterFileUncompressedReadBytes = "file_uncompressed_read_bytes"
MeterFileCompressedReadBytes = "file_compressed_read_bytes"

MeterFileUncompressedReadForkedBytes = "file_uncompressed_read_forked_bytes"
MeterFileCompressedReadForkedBytes = "file_compressed_read_forked_bytes"

MeterFileUncompressedWriteBytes = "file_uncompressed_write_bytes"
MeterFileCompressedWriteBytes = "file_compressed_write_bytes"

Expand All @@ -42,14 +35,9 @@ func WithBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger
meter.CountInc(MeterFileCompressedReadBytes, n)
}))

return opts
}
// uncompressed read bytes is measured in the file source middleware.

func WithForkedBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstore.Option {
var opts []dstore.Option
opts = append(opts, dstore.WithCompressedReadCallback(func(ctx context.Context, n int) {
meter.CountInc(MeterFileCompressedReadForkedBytes, n)
}))
// no writes are done to this store, so no need to measure write bytes

return opts
}
Expand Down Expand Up @@ -85,13 +73,10 @@ func GetTotalBytesWritten(meter dmetering.Meter) uint64 {
func LiveSourceMiddlewareHandlerFactory(ctx context.Context) func(handler bstream.Handler) bstream.Handler {
return func(next bstream.Handler) bstream.Handler {
return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error {
stepable, ok := obj.(bstream.Stepable)
if ok {
if stepable, ok := obj.(bstream.Stepable); ok {
step := stepable.Step()
if step.Matches(bstream.StepNew) {
dmetering.GetBytesMeter(ctx).CountInc(MeterLiveUncompressedReadBytes, len(blk.GetPayload().GetValue()))
} else {
dmetering.GetBytesMeter(ctx).CountInc(MeterLiveUncompressedReadForkedBytes, len(blk.GetPayload().GetValue()))
}
}
return next.ProcessBlock(blk, obj)
Expand All @@ -102,13 +87,10 @@ func LiveSourceMiddlewareHandlerFactory(ctx context.Context) func(handler bstrea
func FileSourceMiddlewareHandlerFactory(ctx context.Context) func(handler bstream.Handler) bstream.Handler {
return func(next bstream.Handler) bstream.Handler {
return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error {
stepable, ok := obj.(bstream.Stepable)
if ok {
if stepable, ok := obj.(bstream.Stepable); ok {
step := stepable.Step()
if step.Matches(bstream.StepNew) {
dmetering.GetBytesMeter(ctx).CountInc(MeterFileUncompressedReadBytes, len(blk.GetPayload().GetValue()))
} else {
dmetering.GetBytesMeter(ctx).CountInc(MeterFileUncompressedReadForkedBytes, len(blk.GetPayload().GetValue()))
}
}
return next.ProcessBlock(blk, obj)
Expand All @@ -130,16 +112,13 @@ func Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string,
inputBytes := meter.GetCountAndReset(MeterWasmInputBytes)

liveUncompressedReadBytes := meter.GetCountAndReset(MeterLiveUncompressedReadBytes)
liveUncompressedReadForkedBytes := meter.GetCountAndReset(MeterLiveUncompressedReadForkedBytes)
fileUncompressedReadBytes := meter.GetCountAndReset(MeterFileUncompressedReadBytes)
fileUncompressedReadForkedBytes := meter.GetCountAndReset(MeterFileUncompressedReadForkedBytes)
fileCompressedReadForkedBytes := meter.GetCountAndReset(MeterFileCompressedReadForkedBytes)
fileCompressedReadBytes := meter.GetCountAndReset(MeterFileCompressedReadBytes)

fileUncompressedWriteBytes := meter.GetCountAndReset(MeterFileUncompressedWriteBytes)
fileCompressedWriteBytes := meter.GetCountAndReset(MeterFileCompressedWriteBytes)

totalReadBytes := fileCompressedReadBytes + fileCompressedReadForkedBytes + liveUncompressedReadBytes + liveUncompressedReadForkedBytes
totalReadBytes := fileUncompressedReadBytes + liveUncompressedReadBytes
totalWriteBytes := fileUncompressedWriteBytes

meter.CountInc(TotalReadBytes, int(totalReadBytes))
Expand All @@ -153,19 +132,16 @@ func Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string,

Endpoint: endpoint,
Metrics: map[string]float64{
"egress_bytes": float64(egressBytes),
"written_bytes": float64(bytesWritten),
"read_bytes": float64(bytesRead),
MeterWasmInputBytes: float64(inputBytes),
MeterLiveUncompressedReadBytes: float64(liveUncompressedReadBytes),
MeterLiveUncompressedReadForkedBytes: float64(liveUncompressedReadForkedBytes),
MeterFileUncompressedReadBytes: float64(fileUncompressedReadBytes),
MeterFileUncompressedReadForkedBytes: float64(fileUncompressedReadForkedBytes),
MeterFileCompressedReadForkedBytes: float64(fileCompressedReadForkedBytes),
MeterFileCompressedReadBytes: float64(fileCompressedReadBytes),
MeterFileUncompressedWriteBytes: float64(fileUncompressedWriteBytes),
MeterFileCompressedWriteBytes: float64(fileCompressedWriteBytes),
"message_count": 1,
"egress_bytes": float64(egressBytes),
"written_bytes": float64(bytesWritten),
"read_bytes": float64(bytesRead),
MeterWasmInputBytes: float64(inputBytes),
MeterLiveUncompressedReadBytes: float64(liveUncompressedReadBytes),
MeterFileUncompressedReadBytes: float64(fileUncompressedReadBytes),
MeterFileCompressedReadBytes: float64(fileCompressedReadBytes),
MeterFileUncompressedWriteBytes: float64(fileUncompressedWriteBytes),
MeterFileCompressedWriteBytes: float64(fileCompressedWriteBytes),
"message_count": 1,
},
Timestamp: time.Now(),
}
Expand All @@ -176,6 +152,4 @@ func Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string,
} else {
emitter.Emit(context.WithoutCancel(ctx), event)
}

metrics.MeteringEvents.Inc()
}
Loading

0 comments on commit 460c5c3

Please sign in to comment.