From 2d750b523faa62482057d121790eba21b5da5e39 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Thu, 30 May 2024 18:54:13 +0900 Subject: [PATCH] feat(sn): add metrics for Append and Replicate RPCs This pull request defines metrics for measuring RPCs such as Append and Replicate. It introduces four metrics: - `log_rpc.server.duration` measures the time spent processing inbound RPC calls in microseconds. It is very similar to the `rpc.server.duration` defined by OpenTelemetry, but our metric also measures the processing time triggered by each call on a gRPC stream. - `log_rpc.server.log_entry.size` measures the size of appended log entries. It is similar to the `rpc.server.request.size` metric, but our metric measures the size of each log entry included in the appended batch. - `log_rpc.server.batch.size` measures the size of log entry batches appended. - `log_rpc.server.log_entries_per_batch` measures the number of log entries per appended batch. These metrics are histogram-type, allowing us to compute percentiles and analyze histograms and heat maps. Users can leverage these metrics to analyze the duration of RPCs, the distribution of log entry sizes, and the length of batches. We expect users to find better configurations to optimize storage node performance. --- internal/storagenode/log_server.go | 24 +- internal/storagenode/logstream/append.go | 20 +- internal/storagenode/telemetry/metrics.go | 274 +++++++++++++++++++++- 3 files changed, 313 insertions(+), 5 deletions(-) diff --git a/internal/storagenode/log_server.go b/internal/storagenode/log_server.go index 91ab27673..fa58be2df 100644 --- a/internal/storagenode/log_server.go +++ b/internal/storagenode/log_server.go @@ -4,6 +4,7 @@ import ( "context" "errors" "io" + "time" pbtypes "github.com/gogo/protobuf/types" "go.uber.org/multierr" @@ -13,6 +14,7 @@ import ( snerrors "github.com/kakao/varlog/internal/storagenode/errors" "github.com/kakao/varlog/internal/storagenode/logstream" + "github.com/kakao/varlog/internal/storagenode/telemetry" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/verrors" "github.com/kakao/varlog/proto/snpb" @@ -111,6 +113,9 @@ func (ls *logServer) appendStreamRecvLoop(stream snpb.LogIO_AppendServer, cq cha lsid = req.LogStreamID } + appendTask.LogStreamID = lsid + appendTask.RPCStartTime = time.Now() + if req.TopicID != tpid || req.LogStreamID != lsid { err = status.Error(codes.InvalidArgument, "unmatched topic or logstream") goto Out @@ -149,17 +154,32 @@ func (ls *logServer) appendStreamSendLoop(stream snpb.LogIO_AppendServer, cq <-c if !ok { return nil } + + lsid := appendTask.LogStreamID res, err = appendTask.WaitForCompletion(ctx) + elapsed := time.Since(appendTask.RPCStartTime) if err != nil { appendTask.Release() - return err + goto RecordMetric } - appendTask.ReleaseWriteWaitGroups() appendTask.Release() rsp.Results = res err = stream.Send(rsp) + + RecordMetric: + code := codes.OK + if err != nil { + // TODO: Set the correct status code. + code = codes.Internal + } + if !lsid.Invalid() { + metrics, ok := ls.sn.metrics.GetLogStreamMetrics(lsid) + if ok { + metrics.LogRPCServerDuration.Record(ctx, telemetry.RPCKindAppend, code, elapsed.Microseconds()) + } + } if err != nil { return err } diff --git a/internal/storagenode/logstream/append.go b/internal/storagenode/logstream/append.go index 92fa1dd07..58d3df9ac 100644 --- a/internal/storagenode/logstream/append.go +++ b/internal/storagenode/logstream/append.go @@ -7,9 +7,12 @@ import ( "time" "go.uber.org/zap" + "google.golang.org/grpc/codes" "github.com/kakao/varlog/internal/batchlet" snerrors "github.com/kakao/varlog/internal/storagenode/errors" + "github.com/kakao/varlog/internal/storagenode/telemetry" + "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/verrors" "github.com/kakao/varlog/proto/snpb" ) @@ -27,6 +30,9 @@ type AppendTask struct { start time.Time apc appendContext dataBatchLen int + + LogStreamID types.LogStreamID + RPCStartTime time.Time } func NewAppendTask() *AppendTask { @@ -159,6 +165,9 @@ func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, append lse.lsm.AppendBytes.Add(appendTask.apc.totalBytes) lse.lsm.AppendOperations.Add(1) lse.lsm.AppendPreparationMicro.Add(preparationDuration.Microseconds()) + + lse.lsm.LogRPCServerBatchSize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, appendTask.apc.totalBytes) + lse.lsm.LogRPCServerLogEntriesPerBatch.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, int64(dataBatchLen)) } }() @@ -214,6 +223,10 @@ func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.App lse.lsm.AppendDuration.Add(time.Since(startTime).Microseconds()) lse.lsm.AppendOperations.Add(1) lse.lsm.AppendPreparationMicro.Add(preparationDuration.Microseconds()) + + // TODO: Set a correct error code. + lse.lsm.LogRPCServerBatchSize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, apc.totalBytes) + lse.lsm.LogRPCServerLogEntriesPerBatch.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, int64(dataBatchLen)) }() lse.prepareAppendContext(dataBatch, &apc) @@ -271,7 +284,12 @@ func (lse *Executor) prepareAppendContextInternal(dataBatch [][]byte, begin, end st.cwts = newListQueue() for i := 0; i < len(batchletData); i++ { // st.dwb.PutData(batchletData[i]) - apc.totalBytes += int64(len(batchletData[i])) + logEntrySize := int64(len(batchletData[i])) + apc.totalBytes += logEntrySize + if lse.lsm != nil { + // TODO: Set the correct status code. + lse.lsm.LogRPCServerLogEntrySize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, logEntrySize) + } awg := newAppendWaitGroup(st.wwg) st.cwts.PushFront(newCommitWaitTask(awg)) apc.awgs = append(apc.awgs, awg) diff --git a/internal/storagenode/telemetry/metrics.go b/internal/storagenode/telemetry/metrics.go index 7f6b09b47..f2f8e96b5 100644 --- a/internal/storagenode/telemetry/metrics.go +++ b/internal/storagenode/telemetry/metrics.go @@ -3,19 +3,68 @@ package telemetry import ( "context" "fmt" + "slices" "sync" "sync/atomic" + "github.com/puzpuzpuz/xsync/v2" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" + "google.golang.org/grpc/codes" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/telemetry" ) +// LogStreamMetrics is a set of metrics measured in each log stream. +// +// Synchronous metrics are of instrumentation types defined in OpenTelemetry, +// such as Int64Histogram. +// +// Asynchronous metrics are defined as built-in atomic types, meaning they are +// just counters. Each log stream increments these counters, which are then +// collected by a measurement cycle managed by OpenTelemetry. type LogStreamMetrics struct { attrs attribute.Set + // LogRPCServerDuration records the time spent processing inbound RPC calls + // in microseconds. It helps to monitor and analyze the performance of RPC + // calls in the Varlog system. + // + // Internally, it is a wrapper of Metrics.logRPCServerDuration with + // attributes cache. It avoids creating a new attribute set for each + // observation. + LogRPCServerDuration *Int64HistogramRecorder + + // LogRPCServerLogEntrySize records the size of individual log entries + // appended. It is useful for tracking the amount of data being processed + // and stored in the Varlog system. + // + // Internally, it is a wrapper of Metrics.logRPCServerLogEntrySize with + // attributes cache. It avoids creating a new attribute set for each + // observation. + LogRPCServerLogEntrySize *Int64HistogramRecorder + + // LogRPCServerBatchSize records the size of log entry batches appended. + // This metric helps to understand the batch sizes being handled and + // optimize the batching process in the Varlog system. + // + // Internally, it is a wrapper of Metrics.logRPCServerBatchSize with + // attributes cache. It avoids creating a new attribute set for each + // observation. + LogRPCServerBatchSize *Int64HistogramRecorder + + // LogRPCServerLogEntriesPerBatch records the number of log entries per + // appended batch. It provides insights into the batch processing + // efficiency and the average number of log entries per batch in the Varlog + // system. + // + // Internally, it is a wrapper of Metrics.logRPCServerLogEntriesPerBatch + // with attributes cache. It avoids creating a new attribute set for each + // observation. + LogRPCServerLogEntriesPerBatch *Int64HistogramRecorder + AppendLogs atomic.Int64 AppendBytes atomic.Int64 AppendDuration atomic.Int64 @@ -49,13 +98,217 @@ type LogStreamMetrics struct { ReplicatePreparationMicro atomic.Int64 } +type Int64HistogramRecorder struct { + recorder metric.Int64Histogram + defaultAttrs []attribute.KeyValue + options map[uint32][]metric.RecordOption + mu *xsync.RBMutex +} + +func NewInt64HistogramRecorder(recorder metric.Int64Histogram, defaultAttrs ...attribute.KeyValue) *Int64HistogramRecorder { + return &Int64HistogramRecorder{ + recorder: recorder, + defaultAttrs: defaultAttrs, + options: make(map[uint32][]metric.RecordOption), + mu: xsync.NewRBMutex(), + } +} + +func (r *Int64HistogramRecorder) Record(ctx context.Context, rpcKind RPCKind, code codes.Code, incr int64) { + key := uint32(rpcKind)<<16 | uint32(code) + var opts []metric.RecordOption + rt := r.mu.RLock() + opts, ok := r.options[key] + r.mu.RUnlock(rt) + if !ok { + r.mu.Lock() + opts, ok = r.options[key] + if !ok { + attrs := slices.Concat(r.defaultAttrs, []attribute.KeyValue{ + semconv.RPCService(serviceNames[rpcKind]), + semconv.RPCMethod(methodNames[rpcKind]), + semconv.RPCGRPCStatusCodeKey.Int64(int64(code)), + }) + r.options[key] = []metric.RecordOption{ + metric.WithAttributeSet(attribute.NewSet(attrs...)), + } + } + r.mu.Unlock() + } + r.recorder.Record(ctx, incr, opts...) +} + +type RPCKind uint8 + +const ( + RPCKindAppend RPCKind = iota + RPCKindReplicate +) + +var serviceNames = []string{ + "varlog.snpb.LogIO", + "varlog.snpb.Replicator", +} + +var methodNames = []string{ + "Append", + "Replicate", +} + +// Metrics are a set of measurements taken in the storage node. They encompass +// all measurements taken from each log stream. type Metrics struct { - metricsMap sync.Map + metricsMap sync.Map // map[types.LogStreamID]*LogStreamMetrics + + // logRPCServerDuration measures the time spent processing inbound RPC + // calls in microseconds. Its name is log_rpc.server.duration. + // + // It is similar to rpc.server.duration defined by OpenTelemetry, but it + // differs from the specification in the following ways: + // - The unit is microseconds instead of milliseconds. + // - The bucket boundaries are customized. + // - It measures the processing time triggered by each request on the + // gRPC stream rather than the duration of the RPC itself. + // - It includes additional attributes related to Varlog. + // + // Attributes: + // - varlog.topic.id + // - varlog.logstream.id + // - rpc.system + // - rpc.method + // - rpc.service + // - rpc.grpc.status_code + // + // References: + // - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-metrics/#metric-rpcserverduration + // - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-metrics/#attributes + // - https://opentelemetry.io/docs/specs/semconv/rpc/grpc/#grpc-attributes + // - https://github.com/grpc/proposal/blob/master/A66-otel-stats.md#units + logRPCServerDuration metric.Int64Histogram + + // logRPCServerLogEntrySize measures the sizes of appended log entries in + // bytes. Its name is log_rpc.server.log_entry.size. + // + // It is similar to rpc.server.request.size defined by OpenTelemetry, but + // it differs from the specification in the following ways: + // - The bucket boundaries are customized. + // - It measures the size of each log entry in the appended batch rather than the size of the request message itself. + // - It includes additional attributes related to Varlog. + // + // Attributes: + // - varlog.topic.id + // - varlog.logstream.id + // - rpc.system + // - rpc.method + // - rpc.service + // - rpc.grpc.status_code + // + // References: + // - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-metrics/#metric-rpcserverrequestsize + // - https://github.com/grpc/proposal/blob/master/A66-otel-stats.md#units + logRPCServerLogEntrySize metric.Int64Histogram + + // logRPCServerBatchSize measures the size of appended batches in bytes. + // Its name is log_rpc.server.batch.size. It also includes additional + // attributes related to Varlog. + // + // Attributes: + // - varlog.topic.id + // - varlog.logstream.id + // - rpc.system + // - rpc.method + // - rpc.service + // - rpc.grpc.status_code + // + // References: + // - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-metrics/#metric-rpcserverrequestsize + // - https://github.com/grpc/proposal/blob/master/A66-otel-stats.md#units + logRPCServerBatchSize metric.Int64Histogram + + // logRPCServerLogEntriesPerBatch measures the number of log entries per + // batch appended. Its name is log_rpc.server.log_entries_per_batch. + // + // It is similar to rpc.server.requests_per_rpc defined by OpenTelemetry, + // but it differs from the specification in the following ways: + // - The bucket boundaries are customized. + // - It measures the number of log entries in the appended batch rather + // than the number of requests on the gRPC stream. + // - It includes additional attributes related to Varlog. + // + // Attributes: + // - varlog.topic.id + // - varlog.logstream.id + // - rpc.system + // - rpc.method + // - rpc.service + // - rpc.grpc.status_code + // + // References: + // - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-metrics/#metric-rpcserverrequests_per_rpc + // - https://github.com/grpc/proposal/blob/master/A66-otel-stats.md#units + logRPCServerLogEntriesPerBatch metric.Int64Histogram } func RegisterMetrics(meter metric.Meter) (m *Metrics, err error) { m = &Metrics{} + m.logRPCServerDuration, err = meter.Int64Histogram( + "log_rpc.server.duration", + metric.WithDescription("Time spent processing inbound RPC in microseconds."), + metric.WithUnit("us"), + metric.WithExplicitBucketBoundaries( + 200, 400, 600, 800, // 200us, 400us, 600us, 800us + 1_000, 2_000, 4_000, 6_000, 8_000, // 1ms, 2ms, 4ms, 6ms, 8ms + 10_000, 20_000, 40_000, 60_000, 80_000, // 10ms, 20ms, 40ms, 60ms, 80ms + 100_000, 150_000, 200_000, 250_000, // 100ms, 150ms, 200ms, 250ms + 500_000, 600_000, 700_000, 800_000, 900_000, // 500ms, 600ms, 700ms, 800ms + 1_000_000, 2_000_000, 4_000_000, 6_000_000, 8_000_000, 10_000_000, // 1s, 2s, 4s, 6s, 8s, 10s + ), + ) + if err != nil { + return nil, err + } + + m.logRPCServerLogEntrySize, err = meter.Int64Histogram( + "log_rpc.server.log_entry.size", + metric.WithDescription("Size of appended log entries."), + metric.WithUnit("By"), + metric.WithExplicitBucketBoundaries( + 1<<10, 2<<10, 4<<10, 8<<10, 16<<10, // 1KiB, 2KiB, 4KiB, 8KiB, 16KiB + 32<<10, 64<<10, 128<<10, 256<<10, 512<<10, // 32KiB, 64KiB, 128KiB, 256KiB, 512KiB + 1<<20, 2<<20, 4<<20, 8<<20, 16<<20, 32<<20, 64<<20, // 1MiB, 2MiB, 4MiB, 8MiB, 16MiB, 32MiB, 64MiB + ), + ) + if err != nil { + return nil, err + } + + m.logRPCServerBatchSize, err = meter.Int64Histogram( + "log_rpc.server.batch.size", + metric.WithDescription("Size of appended log entry batches."), + metric.WithUnit("By"), + metric.WithExplicitBucketBoundaries( + 1<<10, 2<<10, 4<<10, 8<<10, 16<<10, // 1KiB, 2KiB, 4KiB, 8KiB, 16KiB + 32<<10, 64<<10, 128<<10, 256<<10, 512<<10, // 32KiB, 64KiB, 128KiB, 256KiB, 512KiB + 1<<20, 2<<20, 4<<20, 8<<20, 16<<20, 32<<20, 64<<20, // 1MiB, 2MiB, 4MiB, 8MiB, 16MiB, 32MiB, 64MiB + ), + ) + if err != nil { + return nil, err + } + + m.logRPCServerLogEntriesPerBatch, err = meter.Int64Histogram( + "log_rpc.server.log_entries_per_batch", + metric.WithDescription("Number of log entries per appended batch."), + metric.WithUnit("{count}"), + metric.WithExplicitBucketBoundaries( + 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, + ), + ) + if err != nil { + return nil, err + } + var ( appendLogs metric.Int64ObservableCounter appendBytes metric.Int64ObservableCounter @@ -338,13 +591,30 @@ func RegisterMetrics(meter metric.Meter) (m *Metrics, err error) { return m, nil } +func (m *Metrics) GetLogStreamMetrics(lsid types.LogStreamID) (*LogStreamMetrics, bool) { + v, ok := m.metricsMap.Load(lsid) + if !ok { + return nil, false + } + return v.(*LogStreamMetrics), true +} + func RegisterLogStreamMetrics(m *Metrics, tpid types.TopicID, lsid types.LogStreamID) (*LogStreamMetrics, error) { attrs := attribute.NewSet( telemetry.TopicID(tpid), telemetry.LogStreamID(lsid), ) + kvs := []attribute.KeyValue{ + telemetry.TopicID(tpid), + telemetry.LogStreamID(lsid), + semconv.RPCSystemGRPC, + } lsm, loaded := m.metricsMap.LoadOrStore(lsid, &LogStreamMetrics{ - attrs: attrs, + attrs: attrs, + LogRPCServerDuration: NewInt64HistogramRecorder(m.logRPCServerDuration, kvs...), + LogRPCServerLogEntrySize: NewInt64HistogramRecorder(m.logRPCServerLogEntrySize, kvs...), + LogRPCServerBatchSize: NewInt64HistogramRecorder(m.logRPCServerBatchSize, kvs...), + LogRPCServerLogEntriesPerBatch: NewInt64HistogramRecorder(m.logRPCServerLogEntriesPerBatch, kvs...), }) if loaded { return nil, fmt.Errorf("storagenode: already registered %v", lsid)