Skip to content

Commit

Permalink
feat(sn): add metrics for Append and Replicate RPCs
Browse files Browse the repository at this point in the history
This pull request defines metrics for measuring RPCs such as Append and
Replicate. It introduces four metrics:

- `log_rpc.server.duration` measures the time spent processing inbound RPC calls
  in microseconds. It is very similar to the `rpc.server.duration` defined by
  OpenTelemetry, but our metric also measures the processing time triggered by
  each call on a gRPC stream.
- `log_rpc.server.log_entry.size` measures the size of appended log entries. It
  is similar to the `rpc.server.request.size` metric, but our metric measures
  the size of each log entry included in the appended batch.
- `log_rpc.server.batch.size` measures the size of log entry batches appended.
- `log_rpc.server.log_entries_per_batch` measures the number of log entries per
  appended batch.

These metrics are histogram-type, allowing us to compute percentiles and analyze
histograms and heat maps. Users can leverage these metrics to analyze the
duration of RPCs, the distribution of log entry sizes, and the length of
batches. We expect users to find better configurations to optimize storage node
performance.
  • Loading branch information
ijsong committed Jun 6, 2024
1 parent ccee2ed commit 2d750b5
Show file tree
Hide file tree
Showing 3 changed files with 313 additions and 5 deletions.
24 changes: 22 additions & 2 deletions internal/storagenode/log_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"io"
"time"

pbtypes "github.com/gogo/protobuf/types"
"go.uber.org/multierr"
Expand All @@ -13,6 +14,7 @@ import (

snerrors "github.com/kakao/varlog/internal/storagenode/errors"
"github.com/kakao/varlog/internal/storagenode/logstream"
"github.com/kakao/varlog/internal/storagenode/telemetry"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/verrors"
"github.com/kakao/varlog/proto/snpb"
Expand Down Expand Up @@ -111,6 +113,9 @@ func (ls *logServer) appendStreamRecvLoop(stream snpb.LogIO_AppendServer, cq cha
lsid = req.LogStreamID
}

appendTask.LogStreamID = lsid
appendTask.RPCStartTime = time.Now()

if req.TopicID != tpid || req.LogStreamID != lsid {
err = status.Error(codes.InvalidArgument, "unmatched topic or logstream")
goto Out
Expand Down Expand Up @@ -149,17 +154,32 @@ func (ls *logServer) appendStreamSendLoop(stream snpb.LogIO_AppendServer, cq <-c
if !ok {
return nil
}

lsid := appendTask.LogStreamID
res, err = appendTask.WaitForCompletion(ctx)
elapsed := time.Since(appendTask.RPCStartTime)
if err != nil {
appendTask.Release()
return err
goto RecordMetric
}

appendTask.ReleaseWriteWaitGroups()
appendTask.Release()

rsp.Results = res
err = stream.Send(rsp)

RecordMetric:
code := codes.OK
if err != nil {
// TODO: Set the correct status code.
code = codes.Internal
}
if !lsid.Invalid() {
metrics, ok := ls.sn.metrics.GetLogStreamMetrics(lsid)
if ok {
metrics.LogRPCServerDuration.Record(ctx, telemetry.RPCKindAppend, code, elapsed.Microseconds())
}
}
if err != nil {
return err
}
Expand Down
20 changes: 19 additions & 1 deletion internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"time"

"go.uber.org/zap"
"google.golang.org/grpc/codes"

"github.com/kakao/varlog/internal/batchlet"
snerrors "github.com/kakao/varlog/internal/storagenode/errors"
"github.com/kakao/varlog/internal/storagenode/telemetry"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/verrors"
"github.com/kakao/varlog/proto/snpb"
)
Expand All @@ -27,6 +30,9 @@ type AppendTask struct {
start time.Time
apc appendContext
dataBatchLen int

LogStreamID types.LogStreamID
RPCStartTime time.Time
}

func NewAppendTask() *AppendTask {
Expand Down Expand Up @@ -159,6 +165,9 @@ func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, append
lse.lsm.AppendBytes.Add(appendTask.apc.totalBytes)
lse.lsm.AppendOperations.Add(1)
lse.lsm.AppendPreparationMicro.Add(preparationDuration.Microseconds())

lse.lsm.LogRPCServerBatchSize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, appendTask.apc.totalBytes)
lse.lsm.LogRPCServerLogEntriesPerBatch.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, int64(dataBatchLen))
}
}()

Expand Down Expand Up @@ -214,6 +223,10 @@ func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.App
lse.lsm.AppendDuration.Add(time.Since(startTime).Microseconds())
lse.lsm.AppendOperations.Add(1)
lse.lsm.AppendPreparationMicro.Add(preparationDuration.Microseconds())

// TODO: Set a correct error code.
lse.lsm.LogRPCServerBatchSize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, apc.totalBytes)
lse.lsm.LogRPCServerLogEntriesPerBatch.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, int64(dataBatchLen))

Check warning on line 229 in internal/storagenode/logstream/append.go

View check run for this annotation

Codecov / codecov/patch

internal/storagenode/logstream/append.go#L228-L229

Added lines #L228 - L229 were not covered by tests
}()

lse.prepareAppendContext(dataBatch, &apc)
Expand Down Expand Up @@ -271,7 +284,12 @@ func (lse *Executor) prepareAppendContextInternal(dataBatch [][]byte, begin, end
st.cwts = newListQueue()
for i := 0; i < len(batchletData); i++ {
// st.dwb.PutData(batchletData[i])
apc.totalBytes += int64(len(batchletData[i]))
logEntrySize := int64(len(batchletData[i]))
apc.totalBytes += logEntrySize
if lse.lsm != nil {
// TODO: Set the correct status code.
lse.lsm.LogRPCServerLogEntrySize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, logEntrySize)
}
awg := newAppendWaitGroup(st.wwg)
st.cwts.PushFront(newCommitWaitTask(awg))
apc.awgs = append(apc.awgs, awg)
Expand Down
Loading

0 comments on commit 2d750b5

Please sign in to comment.