Skip to content

Commit

Permalink
Merge pull request #36 from pinax-network/fix/add_missing_metering_ev…
Browse files Browse the repository at this point in the history
…ents

add metering events for sf.firehose.v2.Fetch/Block responses
  • Loading branch information
maoueh authored Feb 23, 2024
2 parents c8ea7c0 + 07aa162 commit 1a52d69
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s

## Unreleased
* update to latest `dgrpc`
* Add missing metering events for `sf.firehose.v2.Fetch/Block` responses.

### Substreams
* Fixed bug in scheduler ramp-up function sometimes waiting before raising the number of workers
Expand Down
31 changes: 29 additions & 2 deletions firehose/server/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (s *Server) Block(ctx context.Context, request *pbfirehose.SingleBlockReque
blockNum = ref.BlockNumber.Num
}

ctx = dmetering.WithBytesMeter(ctx)
blk, err := s.blockGetter.Get(ctx, blockNum, blockHash, s.logger)
if err != nil {
if _, ok := status.FromError(err); ok {
Expand All @@ -53,9 +54,35 @@ func (s *Server) Block(ctx context.Context, request *pbfirehose.SingleBlockReque
return nil, status.Errorf(codes.NotFound, "block %s not found", bstream.NewBlockRef(blockHash, blockNum))
}

return &pbfirehose.SingleBlockResponse{
resp := &pbfirehose.SingleBlockResponse{
Block: blk.Payload,
}, nil
}

//////////////////////////////////////////////////////////////////////
meter := dmetering.GetBytesMeter(ctx)
bytesRead := meter.BytesReadDelta()
bytesWritten := meter.BytesWrittenDelta()
size := proto.Size(resp)

auth := dauth.FromContext(ctx)
event := dmetering.Event{
UserID: auth.UserID(),
ApiKeyID: auth.APIKeyID(),
IpAddress: auth.RealIP(),
Meta: auth.Meta(),
Endpoint: "sf.firehose.v2.Firehose/Block",
Metrics: map[string]float64{
"egress_bytes": float64(size),
"written_bytes": float64(bytesWritten),
"read_bytes": float64(bytesRead),
"block_count": 1,
},
Timestamp: time.Now(),
}
dmetering.Emit(ctx, event)
//////////////////////////////////////////////////////////////////////

return resp, nil
}

func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream_BlocksServer) error {
Expand Down

0 comments on commit 1a52d69

Please sign in to comment.