diff --git a/go.mod b/go.mod index c04ab4dd..2732f221 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 github.com/streamingfast/dgrpc v0.0.0-20240219152146-57bb131c39ca - github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2 + github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14 github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index 54cd50a9..6790b5a7 100644 --- a/go.sum +++ b/go.sum @@ -537,8 +537,8 @@ github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 h1:6o6MS3J github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw= -github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2 h1:BB3VSDl8/OHBSvjqfgufwqr4tD5l7XPjXybDm6uudj4= -github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w= +github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14 h1:/2HxIOzAgUBKyxjDO4IJPzBBaEAtzwipb/2/JGsOArA= +github.com/streamingfast/dstore v0.1.1-0.20241011152904-9acd6205dc14/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w= github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 h1:K6mJPvh1jAL+/gBS7Bh9jyzWaTib6N47m06gZOTUPwQ= github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns= github.com/streamingfast/graph v0.0.0-20220329181048-a5710712d873 h1:8+TXT26c2p6f8JMg0w4JQ8t4fAbfMmR11zLN4KFVRC8= diff --git a/metering/metering.go b/metering/metering.go index 7794fc1e..6c4843be 100644 --- a/metering/metering.go +++ b/metering/metering.go @@ -7,9 +7,6 @@ import ( "github.com/streamingfast/bstream" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" - - "github.com/streamingfast/substreams/metrics" - "github.com/streamingfast/dmetering" "github.com/streamingfast/dstore" "github.com/streamingfast/substreams/reqctx" @@ -18,15 +15,11 @@ import ( ) const ( - MeterLiveUncompressedReadBytes = "live_uncompressed_read_bytes" - MeterLiveUncompressedReadForkedBytes = "live_uncompressed_read_forked_bytes" + MeterLiveUncompressedReadBytes = "live_uncompressed_read_bytes" MeterFileUncompressedReadBytes = "file_uncompressed_read_bytes" MeterFileCompressedReadBytes = "file_compressed_read_bytes" - MeterFileUncompressedReadForkedBytes = "file_uncompressed_read_forked_bytes" - MeterFileCompressedReadForkedBytes = "file_compressed_read_forked_bytes" - MeterFileUncompressedWriteBytes = "file_uncompressed_write_bytes" MeterFileCompressedWriteBytes = "file_compressed_write_bytes" @@ -42,14 +35,9 @@ func WithBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger meter.CountInc(MeterFileCompressedReadBytes, n) })) - return opts -} + // uncompressed read bytes is measured in the file source middleware. -func WithForkedBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstore.Option { - var opts []dstore.Option - opts = append(opts, dstore.WithCompressedReadCallback(func(ctx context.Context, n int) { - meter.CountInc(MeterFileCompressedReadForkedBytes, n) - })) + // no writes are done to this store, so no need to measure write bytes return opts } @@ -85,13 +73,10 @@ func GetTotalBytesWritten(meter dmetering.Meter) uint64 { func LiveSourceMiddlewareHandlerFactory(ctx context.Context) func(handler bstream.Handler) bstream.Handler { return func(next bstream.Handler) bstream.Handler { return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error { - stepable, ok := obj.(bstream.Stepable) - if ok { + if stepable, ok := obj.(bstream.Stepable); ok { step := stepable.Step() if step.Matches(bstream.StepNew) { dmetering.GetBytesMeter(ctx).CountInc(MeterLiveUncompressedReadBytes, len(blk.GetPayload().GetValue())) - } else { - dmetering.GetBytesMeter(ctx).CountInc(MeterLiveUncompressedReadForkedBytes, len(blk.GetPayload().GetValue())) } } return next.ProcessBlock(blk, obj) @@ -102,13 +87,10 @@ func LiveSourceMiddlewareHandlerFactory(ctx context.Context) func(handler bstrea func FileSourceMiddlewareHandlerFactory(ctx context.Context) func(handler bstream.Handler) bstream.Handler { return func(next bstream.Handler) bstream.Handler { return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error { - stepable, ok := obj.(bstream.Stepable) - if ok { + if stepable, ok := obj.(bstream.Stepable); ok { step := stepable.Step() if step.Matches(bstream.StepNew) { dmetering.GetBytesMeter(ctx).CountInc(MeterFileUncompressedReadBytes, len(blk.GetPayload().GetValue())) - } else { - dmetering.GetBytesMeter(ctx).CountInc(MeterFileUncompressedReadForkedBytes, len(blk.GetPayload().GetValue())) } } return next.ProcessBlock(blk, obj) @@ -130,16 +112,13 @@ func Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string, inputBytes := meter.GetCountAndReset(MeterWasmInputBytes) liveUncompressedReadBytes := meter.GetCountAndReset(MeterLiveUncompressedReadBytes) - liveUncompressedReadForkedBytes := meter.GetCountAndReset(MeterLiveUncompressedReadForkedBytes) fileUncompressedReadBytes := meter.GetCountAndReset(MeterFileUncompressedReadBytes) - fileUncompressedReadForkedBytes := meter.GetCountAndReset(MeterFileUncompressedReadForkedBytes) - fileCompressedReadForkedBytes := meter.GetCountAndReset(MeterFileCompressedReadForkedBytes) fileCompressedReadBytes := meter.GetCountAndReset(MeterFileCompressedReadBytes) fileUncompressedWriteBytes := meter.GetCountAndReset(MeterFileUncompressedWriteBytes) fileCompressedWriteBytes := meter.GetCountAndReset(MeterFileCompressedWriteBytes) - totalReadBytes := fileCompressedReadBytes + fileCompressedReadForkedBytes + liveUncompressedReadBytes + liveUncompressedReadForkedBytes + totalReadBytes := fileUncompressedReadBytes + liveUncompressedReadBytes totalWriteBytes := fileUncompressedWriteBytes meter.CountInc(TotalReadBytes, int(totalReadBytes)) @@ -153,19 +132,16 @@ func Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string, Endpoint: endpoint, Metrics: map[string]float64{ - "egress_bytes": float64(egressBytes), - "written_bytes": float64(bytesWritten), - "read_bytes": float64(bytesRead), - MeterWasmInputBytes: float64(inputBytes), - MeterLiveUncompressedReadBytes: float64(liveUncompressedReadBytes), - MeterLiveUncompressedReadForkedBytes: float64(liveUncompressedReadForkedBytes), - MeterFileUncompressedReadBytes: float64(fileUncompressedReadBytes), - MeterFileUncompressedReadForkedBytes: float64(fileUncompressedReadForkedBytes), - MeterFileCompressedReadForkedBytes: float64(fileCompressedReadForkedBytes), - MeterFileCompressedReadBytes: float64(fileCompressedReadBytes), - MeterFileUncompressedWriteBytes: float64(fileUncompressedWriteBytes), - MeterFileCompressedWriteBytes: float64(fileCompressedWriteBytes), - "message_count": 1, + "egress_bytes": float64(egressBytes), + "written_bytes": float64(bytesWritten), + "read_bytes": float64(bytesRead), + MeterWasmInputBytes: float64(inputBytes), + MeterLiveUncompressedReadBytes: float64(liveUncompressedReadBytes), + MeterFileUncompressedReadBytes: float64(fileUncompressedReadBytes), + MeterFileCompressedReadBytes: float64(fileCompressedReadBytes), + MeterFileUncompressedWriteBytes: float64(fileUncompressedWriteBytes), + MeterFileCompressedWriteBytes: float64(fileCompressedWriteBytes), + "message_count": 1, }, Timestamp: time.Now(), } @@ -176,6 +152,4 @@ func Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string, } else { emitter.Emit(context.WithoutCancel(ctx), event) } - - metrics.MeteringEvents.Inc() } diff --git a/metering/metering_test.go b/metering/metering_test.go new file mode 100644 index 00000000..0c5ffb2b --- /dev/null +++ b/metering/metering_test.go @@ -0,0 +1,379 @@ +package metering + +import ( + "bytes" + "context" + "io" + "testing" + + "github.com/streamingfast/bstream" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/dmetering" + "github.com/streamingfast/dstore" + pbsubstreamstest "github.com/streamingfast/substreams/pb/sf/substreams/v1/test" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestWithBlockBytesReadMeteringOptions(t *testing.T) { + meter := dmetering.NewBytesMeter() + + opts := WithBlockBytesReadMeteringOptions(meter, nil) + + store, err := dstore.NewStore("memory://test", ".test", "zstd", false, opts...) + if err != nil { + t.Fatal(err) + } + + err = store.WriteObject(nil, "test", bytes.NewReader([]byte("1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"))) + if err != nil { + t.Fatal(err) + } + + r, err := store.OpenObject(nil, "test") + if err != nil { + t.Fatal(err) + } + + _, err = io.ReadAll(r) + if err != nil { + t.Fatal(err) + } + _ = r.Close() + + assert.Equal(t, 24, meter.GetCount(MeterFileCompressedReadBytes)) + assert.Equal(t, 0, meter.GetCount(MeterFileUncompressedReadBytes)) + assert.Equal(t, 0, meter.GetCount(MeterFileUncompressedWriteBytes)) + assert.Equal(t, 0, meter.GetCount(MeterFileCompressedWriteBytes)) + assert.Equal(t, 0, meter.GetCount(MeterLiveUncompressedReadBytes)) +} + +func TestWithBytesReadMeteringOptions(t *testing.T) { + meter := dmetering.NewBytesMeter() + + opts := WithBytesMeteringOptions(meter, nil) + + store, err := dstore.NewStore("memory://test", ".test", "zstd", false, opts...) + if err != nil { + t.Fatal(err) + } + + err = store.WriteObject(nil, "test", bytes.NewReader([]byte("1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"))) + if err != nil { + t.Fatal(err) + } + + r, err := store.OpenObject(nil, "test") + if err != nil { + t.Fatal(err) + } + + _, err = io.ReadAll(r) + if err != nil { + t.Fatal(err) + } + _ = r.Close() + + assert.Equal(t, 24, meter.GetCount(MeterFileCompressedReadBytes)) + assert.Equal(t, 727, meter.GetCount(MeterFileUncompressedReadBytes)) + assert.Equal(t, 727, meter.GetCount(MeterFileUncompressedWriteBytes)) + assert.Equal(t, 24, meter.GetCount(MeterFileCompressedWriteBytes)) + assert.Equal(t, 0, meter.GetCount(MeterLiveUncompressedReadBytes)) +} + +func TestFileSourceMiddlewareHandlerFactory(t *testing.T) { + type test struct { + Name string + Block *pbsubstreamstest.Block + Obj bstream.Stepable + ExpectedMetrics map[string]int + } + + for _, tt := range []test{ + { + Name: "step new", + Block: &pbsubstreamstest.Block{ + Id: "abc", + Number: 123, + }, + Obj: &testStepableObject{ + bstream.StepNew, + }, + ExpectedMetrics: map[string]int{ + MeterFileCompressedReadBytes: 0, + MeterFileUncompressedReadBytes: 7, + MeterFileUncompressedWriteBytes: 0, + MeterFileCompressedWriteBytes: 0, + MeterLiveUncompressedReadBytes: 0, + }, + }, + { + Name: "step new irreversible", + Block: &pbsubstreamstest.Block{ + Id: "abc", + Number: 123, + }, + Obj: &testStepableObject{ + bstream.StepNewIrreversible, + }, + ExpectedMetrics: map[string]int{ + MeterFileCompressedReadBytes: 0, + MeterFileUncompressedReadBytes: 7, + MeterFileUncompressedWriteBytes: 0, + MeterFileCompressedWriteBytes: 0, + MeterLiveUncompressedReadBytes: 0, + }, + }, + { + Name: "step undo", + Block: &pbsubstreamstest.Block{ + Id: "abc", + Number: 123, + }, + Obj: &testStepableObject{ + bstream.StepUndo, + }, + ExpectedMetrics: map[string]int{ + MeterFileCompressedReadBytes: 0, + MeterFileUncompressedReadBytes: 0, + MeterFileUncompressedWriteBytes: 0, + MeterFileCompressedWriteBytes: 0, + MeterLiveUncompressedReadBytes: 0, + }, + }, + { + Name: "step stalled", + Block: &pbsubstreamstest.Block{ + Id: "abc", + Number: 123, + }, + Obj: &testStepableObject{ + bstream.StepStalled, + }, + ExpectedMetrics: map[string]int{ + MeterFileCompressedReadBytes: 0, + MeterFileUncompressedReadBytes: 0, + MeterFileUncompressedWriteBytes: 0, + MeterFileCompressedWriteBytes: 0, + MeterLiveUncompressedReadBytes: 0, + }, + }, + { + Name: "step undo", + Block: &pbsubstreamstest.Block{ + Id: "abc", + Number: 123, + }, + Obj: &testStepableObject{ + bstream.StepUndo, + }, + ExpectedMetrics: map[string]int{ + MeterFileCompressedReadBytes: 0, + MeterFileUncompressedReadBytes: 0, + MeterFileUncompressedWriteBytes: 0, + MeterFileCompressedWriteBytes: 0, + MeterLiveUncompressedReadBytes: 0, + }, + }, + { + Name: "step irreversible", + Block: &pbsubstreamstest.Block{ + Id: "abc", + Number: 123, + }, + Obj: &testStepableObject{ + bstream.StepIrreversible, + }, + ExpectedMetrics: map[string]int{ + MeterFileCompressedReadBytes: 0, + MeterFileUncompressedReadBytes: 0, + MeterFileUncompressedWriteBytes: 0, + MeterFileCompressedWriteBytes: 0, + MeterLiveUncompressedReadBytes: 0, + }, + }, + } { + t.Run(tt.Name, func(t *testing.T) { + ctx := dmetering.WithBytesMeter(context.Background()) + meter := dmetering.GetBytesMeter(ctx) + + handler := bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error { + return nil + }) + + testHandler := FileSourceMiddlewareHandlerFactory(ctx)(handler) + + err := testHandler.ProcessBlock(bstreamBlk(t, tt.Block), tt.Obj) + assert.NoError(t, err) + + for k, v := range tt.ExpectedMetrics { + assert.Equal(t, v, meter.GetCount(k)) + } + }) + } +} + +func TestLiveSourceMiddlewareHandlerFactory(t *testing.T) { + type test struct { + Name string + Block *pbsubstreamstest.Block + Obj bstream.Stepable + ExpectedMetrics map[string]int + } + + for _, tt := range []test{ + { + Name: "step new", + Block: &pbsubstreamstest.Block{ + Id: "abc", + Number: 123, + }, + Obj: &testStepableObject{ + bstream.StepNew, + }, + ExpectedMetrics: map[string]int{ + MeterFileCompressedReadBytes: 0, + MeterFileUncompressedReadBytes: 0, + MeterFileUncompressedWriteBytes: 0, + MeterFileCompressedWriteBytes: 0, + MeterLiveUncompressedReadBytes: 7, + }, + }, + { + Name: "step new irreversible", + Block: &pbsubstreamstest.Block{ + Id: "abc", + Number: 123, + }, + Obj: &testStepableObject{ + bstream.StepNewIrreversible, + }, + ExpectedMetrics: map[string]int{ + MeterFileCompressedReadBytes: 0, + MeterFileUncompressedReadBytes: 0, + MeterFileUncompressedWriteBytes: 0, + MeterFileCompressedWriteBytes: 0, + MeterLiveUncompressedReadBytes: 7, + }, + }, + { + Name: "step undo", + Block: &pbsubstreamstest.Block{ + Id: "abc", + Number: 123, + }, + Obj: &testStepableObject{ + bstream.StepUndo, + }, + ExpectedMetrics: map[string]int{ + MeterFileCompressedReadBytes: 0, + MeterFileUncompressedReadBytes: 0, + MeterFileUncompressedWriteBytes: 0, + MeterFileCompressedWriteBytes: 0, + MeterLiveUncompressedReadBytes: 0, + }, + }, + { + Name: "step stalled", + Block: &pbsubstreamstest.Block{ + Id: "abc", + Number: 123, + }, + Obj: &testStepableObject{ + bstream.StepStalled, + }, + ExpectedMetrics: map[string]int{ + MeterFileCompressedReadBytes: 0, + MeterFileUncompressedReadBytes: 0, + MeterFileUncompressedWriteBytes: 0, + MeterFileCompressedWriteBytes: 0, + MeterLiveUncompressedReadBytes: 0, + }, + }, + { + Name: "step undo", + Block: &pbsubstreamstest.Block{ + Id: "abc", + Number: 123, + }, + Obj: &testStepableObject{ + bstream.StepUndo, + }, + ExpectedMetrics: map[string]int{ + MeterFileCompressedReadBytes: 0, + MeterFileUncompressedReadBytes: 0, + MeterFileUncompressedWriteBytes: 0, + MeterFileCompressedWriteBytes: 0, + MeterLiveUncompressedReadBytes: 0, + }, + }, + { + Name: "step irreversible", + Block: &pbsubstreamstest.Block{ + Id: "abc", + Number: 123, + }, + Obj: &testStepableObject{ + bstream.StepIrreversible, + }, + ExpectedMetrics: map[string]int{ + MeterFileCompressedReadBytes: 0, + MeterFileUncompressedReadBytes: 0, + MeterFileUncompressedWriteBytes: 0, + MeterFileCompressedWriteBytes: 0, + MeterLiveUncompressedReadBytes: 0, + }, + }, + } { + t.Run(tt.Name, func(t *testing.T) { + ctx := dmetering.WithBytesMeter(context.Background()) + meter := dmetering.GetBytesMeter(ctx) + + handler := bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error { + return nil + }) + + testHandler := LiveSourceMiddlewareHandlerFactory(ctx)(handler) + + err := testHandler.ProcessBlock(bstreamBlk(t, tt.Block), tt.Obj) + assert.NoError(t, err) + + for k, v := range tt.ExpectedMetrics { + assert.Equal(t, v, meter.GetCount(k)) + } + }) + } +} + +func bstreamBlk(t *testing.T, blk *pbsubstreamstest.Block) *pbbstream.Block { + payload, err := anypb.New(blk) + assert.NoError(t, err) + + bb := &pbbstream.Block{ + Id: blk.Id, + Number: blk.Number, + ParentId: "", + Timestamp: ×tamppb.Timestamp{}, + LibNum: 0, + PayloadKind: 0, + PayloadVersion: 0, + Payload: payload, + } + + return bb +} + +type testStepableObject struct { + step bstream.StepType +} + +func (t *testStepableObject) Step() bstream.StepType { + return t.step +} +func (t *testStepableObject) FinalBlockHeight() uint64 { + return 0 +} +func (t *testStepableObject) ReorgJunctionBlock() bstream.BlockRef { + return nil +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 0b02fb91..71db6cee 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -30,8 +30,6 @@ var Tier2RequestCounter = MetricSet.NewCounter("substreams_tier2_request_counter var AppReadinessTier1 = MetricSet.NewAppReadiness("substreams_tier1") var AppReadinessTier2 = MetricSet.NewAppReadiness("substreams_tier2") -var MeteringEvents = MetricSet.NewCounter("substreams_metering_events_emitted_counter", "Number of metering events emitted") - var registerOnce sync.Once func RegisterMetricSet(zlog *zap.Logger) { diff --git a/service/stream.go b/service/stream.go index e40878ea..035222e2 100644 --- a/service/stream.go +++ b/service/stream.go @@ -56,7 +56,7 @@ func (sf *StreamFactory) New( forkedBlocksStore := sf.forkedBlocksStore if clonable, ok := forkedBlocksStore.(dstore.Clonable); ok { var err error - forkedBlocksStore, err = clonable.Clone(ctx, metering.WithForkedBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...) + forkedBlocksStore, err = clonable.Clone(ctx) if err != nil { return nil, err }