diff --git a/internal/storagenode/log_server.go b/internal/storagenode/log_server.go index 91ab27673..fa58be2df 100644 --- a/internal/storagenode/log_server.go +++ b/internal/storagenode/log_server.go @@ -4,6 +4,7 @@ import ( "context" "errors" "io" + "time" pbtypes "github.com/gogo/protobuf/types" "go.uber.org/multierr" @@ -13,6 +14,7 @@ import ( snerrors "github.com/kakao/varlog/internal/storagenode/errors" "github.com/kakao/varlog/internal/storagenode/logstream" + "github.com/kakao/varlog/internal/storagenode/telemetry" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/verrors" "github.com/kakao/varlog/proto/snpb" @@ -111,6 +113,9 @@ func (ls *logServer) appendStreamRecvLoop(stream snpb.LogIO_AppendServer, cq cha lsid = req.LogStreamID } + appendTask.LogStreamID = lsid + appendTask.RPCStartTime = time.Now() + if req.TopicID != tpid || req.LogStreamID != lsid { err = status.Error(codes.InvalidArgument, "unmatched topic or logstream") goto Out @@ -149,17 +154,32 @@ func (ls *logServer) appendStreamSendLoop(stream snpb.LogIO_AppendServer, cq <-c if !ok { return nil } + + lsid := appendTask.LogStreamID res, err = appendTask.WaitForCompletion(ctx) + elapsed := time.Since(appendTask.RPCStartTime) if err != nil { appendTask.Release() - return err + goto RecordMetric } - appendTask.ReleaseWriteWaitGroups() appendTask.Release() rsp.Results = res err = stream.Send(rsp) + + RecordMetric: + code := codes.OK + if err != nil { + // TODO: Set the correct status code. + code = codes.Internal + } + if !lsid.Invalid() { + metrics, ok := ls.sn.metrics.GetLogStreamMetrics(lsid) + if ok { + metrics.LogRPCServerDuration.Record(ctx, telemetry.RPCKindAppend, code, elapsed.Microseconds()) + } + } if err != nil { return err } diff --git a/internal/storagenode/logstream/append.go b/internal/storagenode/logstream/append.go index 92fa1dd07..58d3df9ac 100644 --- a/internal/storagenode/logstream/append.go +++ b/internal/storagenode/logstream/append.go @@ -7,9 +7,12 @@ import ( "time" "go.uber.org/zap" + "google.golang.org/grpc/codes" "github.com/kakao/varlog/internal/batchlet" snerrors "github.com/kakao/varlog/internal/storagenode/errors" + "github.com/kakao/varlog/internal/storagenode/telemetry" + "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/verrors" "github.com/kakao/varlog/proto/snpb" ) @@ -27,6 +30,9 @@ type AppendTask struct { start time.Time apc appendContext dataBatchLen int + + LogStreamID types.LogStreamID + RPCStartTime time.Time } func NewAppendTask() *AppendTask { @@ -159,6 +165,9 @@ func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, append lse.lsm.AppendBytes.Add(appendTask.apc.totalBytes) lse.lsm.AppendOperations.Add(1) lse.lsm.AppendPreparationMicro.Add(preparationDuration.Microseconds()) + + lse.lsm.LogRPCServerBatchSize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, appendTask.apc.totalBytes) + lse.lsm.LogRPCServerLogEntriesPerBatch.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, int64(dataBatchLen)) } }() @@ -214,6 +223,10 @@ func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.App lse.lsm.AppendDuration.Add(time.Since(startTime).Microseconds()) lse.lsm.AppendOperations.Add(1) lse.lsm.AppendPreparationMicro.Add(preparationDuration.Microseconds()) + + // TODO: Set a correct error code. + lse.lsm.LogRPCServerBatchSize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, apc.totalBytes) + lse.lsm.LogRPCServerLogEntriesPerBatch.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, int64(dataBatchLen)) }() lse.prepareAppendContext(dataBatch, &apc) @@ -271,7 +284,12 @@ func (lse *Executor) prepareAppendContextInternal(dataBatch [][]byte, begin, end st.cwts = newListQueue() for i := 0; i < len(batchletData); i++ { // st.dwb.PutData(batchletData[i]) - apc.totalBytes += int64(len(batchletData[i])) + logEntrySize := int64(len(batchletData[i])) + apc.totalBytes += logEntrySize + if lse.lsm != nil { + // TODO: Set the correct status code. + lse.lsm.LogRPCServerLogEntrySize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, logEntrySize) + } awg := newAppendWaitGroup(st.wwg) st.cwts.PushFront(newCommitWaitTask(awg)) apc.awgs = append(apc.awgs, awg) diff --git a/internal/storagenode/telemetry/metrics.go b/internal/storagenode/telemetry/metrics.go index 7f6b09b47..f2f8e96b5 100644 --- a/internal/storagenode/telemetry/metrics.go +++ b/internal/storagenode/telemetry/metrics.go @@ -3,19 +3,68 @@ package telemetry import ( "context" "fmt" + "slices" "sync" "sync/atomic" + "github.com/puzpuzpuz/xsync/v2" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" + "google.golang.org/grpc/codes" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/telemetry" ) +// LogStreamMetrics is a set of metrics measured in each log stream. +// +// Synchronous metrics are of instrumentation types defined in OpenTelemetry, +// such as Int64Histogram. +// +// Asynchronous metrics are defined as built-in atomic types, meaning they are +// just counters. Each log stream increments these counters, which are then +// collected by a measurement cycle managed by OpenTelemetry. type LogStreamMetrics struct { attrs attribute.Set + // LogRPCServerDuration records the time spent processing inbound RPC calls + // in microseconds. It helps to monitor and analyze the performance of RPC + // calls in the Varlog system. + // + // Internally, it is a wrapper of Metrics.logRPCServerDuration with + // attributes cache. It avoids creating a new attribute set for each + // observation. + LogRPCServerDuration *Int64HistogramRecorder + + // LogRPCServerLogEntrySize records the size of individual log entries + // appended. It is useful for tracking the amount of data being processed + // and stored in the Varlog system. + // + // Internally, it is a wrapper of Metrics.logRPCServerLogEntrySize with + // attributes cache. It avoids creating a new attribute set for each + // observation. + LogRPCServerLogEntrySize *Int64HistogramRecorder + + // LogRPCServerBatchSize records the size of log entry batches appended. + // This metric helps to understand the batch sizes being handled and + // optimize the batching process in the Varlog system. + // + // Internally, it is a wrapper of Metrics.logRPCServerBatchSize with + // attributes cache. It avoids creating a new attribute set for each + // observation. + LogRPCServerBatchSize *Int64HistogramRecorder + + // LogRPCServerLogEntriesPerBatch records the number of log entries per + // appended batch. It provides insights into the batch processing + // efficiency and the average number of log entries per batch in the Varlog + // system. + // + // Internally, it is a wrapper of Metrics.logRPCServerLogEntriesPerBatch + // with attributes cache. It avoids creating a new attribute set for each + // observation. + LogRPCServerLogEntriesPerBatch *Int64HistogramRecorder + AppendLogs atomic.Int64 AppendBytes atomic.Int64 AppendDuration atomic.Int64 @@ -49,13 +98,217 @@ type LogStreamMetrics struct { ReplicatePreparationMicro atomic.Int64 } +type Int64HistogramRecorder struct { + recorder metric.Int64Histogram + defaultAttrs []attribute.KeyValue + options map[uint32][]metric.RecordOption + mu *xsync.RBMutex +} + +func NewInt64HistogramRecorder(recorder metric.Int64Histogram, defaultAttrs ...attribute.KeyValue) *Int64HistogramRecorder { + return &Int64HistogramRecorder{ + recorder: recorder, + defaultAttrs: defaultAttrs, + options: make(map[uint32][]metric.RecordOption), + mu: xsync.NewRBMutex(), + } +} + +func (r *Int64HistogramRecorder) Record(ctx context.Context, rpcKind RPCKind, code codes.Code, incr int64) { + key := uint32(rpcKind)<<16 | uint32(code) + var opts []metric.RecordOption + rt := r.mu.RLock() + opts, ok := r.options[key] + r.mu.RUnlock(rt) + if !ok { + r.mu.Lock() + opts, ok = r.options[key] + if !ok { + attrs := slices.Concat(r.defaultAttrs, []attribute.KeyValue{ + semconv.RPCService(serviceNames[rpcKind]), + semconv.RPCMethod(methodNames[rpcKind]), + semconv.RPCGRPCStatusCodeKey.Int64(int64(code)), + }) + r.options[key] = []metric.RecordOption{ + metric.WithAttributeSet(attribute.NewSet(attrs...)), + } + } + r.mu.Unlock() + } + r.recorder.Record(ctx, incr, opts...) +} + +type RPCKind uint8 + +const ( + RPCKindAppend RPCKind = iota + RPCKindReplicate +) + +var serviceNames = []string{ + "varlog.snpb.LogIO", + "varlog.snpb.Replicator", +} + +var methodNames = []string{ + "Append", + "Replicate", +} + +// Metrics are a set of measurements taken in the storage node. They encompass +// all measurements taken from each log stream. type Metrics struct { - metricsMap sync.Map + metricsMap sync.Map // map[types.LogStreamID]*LogStreamMetrics + + // logRPCServerDuration measures the time spent processing inbound RPC + // calls in microseconds. Its name is log_rpc.server.duration. + // + // It is similar to rpc.server.duration defined by OpenTelemetry, but it + // differs from the specification in the following ways: + // - The unit is microseconds instead of milliseconds. + // - The bucket boundaries are customized. + // - It measures the processing time triggered by each request on the + // gRPC stream rather than the duration of the RPC itself. + // - It includes additional attributes related to Varlog. + // + // Attributes: + // - varlog.topic.id + // - varlog.logstream.id + // - rpc.system + // - rpc.method + // - rpc.service + // - rpc.grpc.status_code + // + // References: + // - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-metrics/#metric-rpcserverduration + // - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-metrics/#attributes + // - https://opentelemetry.io/docs/specs/semconv/rpc/grpc/#grpc-attributes + // - https://github.com/grpc/proposal/blob/master/A66-otel-stats.md#units + logRPCServerDuration metric.Int64Histogram + + // logRPCServerLogEntrySize measures the sizes of appended log entries in + // bytes. Its name is log_rpc.server.log_entry.size. + // + // It is similar to rpc.server.request.size defined by OpenTelemetry, but + // it differs from the specification in the following ways: + // - The bucket boundaries are customized. + // - It measures the size of each log entry in the appended batch rather than the size of the request message itself. + // - It includes additional attributes related to Varlog. + // + // Attributes: + // - varlog.topic.id + // - varlog.logstream.id + // - rpc.system + // - rpc.method + // - rpc.service + // - rpc.grpc.status_code + // + // References: + // - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-metrics/#metric-rpcserverrequestsize + // - https://github.com/grpc/proposal/blob/master/A66-otel-stats.md#units + logRPCServerLogEntrySize metric.Int64Histogram + + // logRPCServerBatchSize measures the size of appended batches in bytes. + // Its name is log_rpc.server.batch.size. It also includes additional + // attributes related to Varlog. + // + // Attributes: + // - varlog.topic.id + // - varlog.logstream.id + // - rpc.system + // - rpc.method + // - rpc.service + // - rpc.grpc.status_code + // + // References: + // - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-metrics/#metric-rpcserverrequestsize + // - https://github.com/grpc/proposal/blob/master/A66-otel-stats.md#units + logRPCServerBatchSize metric.Int64Histogram + + // logRPCServerLogEntriesPerBatch measures the number of log entries per + // batch appended. Its name is log_rpc.server.log_entries_per_batch. + // + // It is similar to rpc.server.requests_per_rpc defined by OpenTelemetry, + // but it differs from the specification in the following ways: + // - The bucket boundaries are customized. + // - It measures the number of log entries in the appended batch rather + // than the number of requests on the gRPC stream. + // - It includes additional attributes related to Varlog. + // + // Attributes: + // - varlog.topic.id + // - varlog.logstream.id + // - rpc.system + // - rpc.method + // - rpc.service + // - rpc.grpc.status_code + // + // References: + // - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-metrics/#metric-rpcserverrequests_per_rpc + // - https://github.com/grpc/proposal/blob/master/A66-otel-stats.md#units + logRPCServerLogEntriesPerBatch metric.Int64Histogram } func RegisterMetrics(meter metric.Meter) (m *Metrics, err error) { m = &Metrics{} + m.logRPCServerDuration, err = meter.Int64Histogram( + "log_rpc.server.duration", + metric.WithDescription("Time spent processing inbound RPC in microseconds."), + metric.WithUnit("us"), + metric.WithExplicitBucketBoundaries( + 200, 400, 600, 800, // 200us, 400us, 600us, 800us + 1_000, 2_000, 4_000, 6_000, 8_000, // 1ms, 2ms, 4ms, 6ms, 8ms + 10_000, 20_000, 40_000, 60_000, 80_000, // 10ms, 20ms, 40ms, 60ms, 80ms + 100_000, 150_000, 200_000, 250_000, // 100ms, 150ms, 200ms, 250ms + 500_000, 600_000, 700_000, 800_000, 900_000, // 500ms, 600ms, 700ms, 800ms + 1_000_000, 2_000_000, 4_000_000, 6_000_000, 8_000_000, 10_000_000, // 1s, 2s, 4s, 6s, 8s, 10s + ), + ) + if err != nil { + return nil, err + } + + m.logRPCServerLogEntrySize, err = meter.Int64Histogram( + "log_rpc.server.log_entry.size", + metric.WithDescription("Size of appended log entries."), + metric.WithUnit("By"), + metric.WithExplicitBucketBoundaries( + 1<<10, 2<<10, 4<<10, 8<<10, 16<<10, // 1KiB, 2KiB, 4KiB, 8KiB, 16KiB + 32<<10, 64<<10, 128<<10, 256<<10, 512<<10, // 32KiB, 64KiB, 128KiB, 256KiB, 512KiB + 1<<20, 2<<20, 4<<20, 8<<20, 16<<20, 32<<20, 64<<20, // 1MiB, 2MiB, 4MiB, 8MiB, 16MiB, 32MiB, 64MiB + ), + ) + if err != nil { + return nil, err + } + + m.logRPCServerBatchSize, err = meter.Int64Histogram( + "log_rpc.server.batch.size", + metric.WithDescription("Size of appended log entry batches."), + metric.WithUnit("By"), + metric.WithExplicitBucketBoundaries( + 1<<10, 2<<10, 4<<10, 8<<10, 16<<10, // 1KiB, 2KiB, 4KiB, 8KiB, 16KiB + 32<<10, 64<<10, 128<<10, 256<<10, 512<<10, // 32KiB, 64KiB, 128KiB, 256KiB, 512KiB + 1<<20, 2<<20, 4<<20, 8<<20, 16<<20, 32<<20, 64<<20, // 1MiB, 2MiB, 4MiB, 8MiB, 16MiB, 32MiB, 64MiB + ), + ) + if err != nil { + return nil, err + } + + m.logRPCServerLogEntriesPerBatch, err = meter.Int64Histogram( + "log_rpc.server.log_entries_per_batch", + metric.WithDescription("Number of log entries per appended batch."), + metric.WithUnit("{count}"), + metric.WithExplicitBucketBoundaries( + 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, + ), + ) + if err != nil { + return nil, err + } + var ( appendLogs metric.Int64ObservableCounter appendBytes metric.Int64ObservableCounter @@ -338,13 +591,30 @@ func RegisterMetrics(meter metric.Meter) (m *Metrics, err error) { return m, nil } +func (m *Metrics) GetLogStreamMetrics(lsid types.LogStreamID) (*LogStreamMetrics, bool) { + v, ok := m.metricsMap.Load(lsid) + if !ok { + return nil, false + } + return v.(*LogStreamMetrics), true +} + func RegisterLogStreamMetrics(m *Metrics, tpid types.TopicID, lsid types.LogStreamID) (*LogStreamMetrics, error) { attrs := attribute.NewSet( telemetry.TopicID(tpid), telemetry.LogStreamID(lsid), ) + kvs := []attribute.KeyValue{ + telemetry.TopicID(tpid), + telemetry.LogStreamID(lsid), + semconv.RPCSystemGRPC, + } lsm, loaded := m.metricsMap.LoadOrStore(lsid, &LogStreamMetrics{ - attrs: attrs, + attrs: attrs, + LogRPCServerDuration: NewInt64HistogramRecorder(m.logRPCServerDuration, kvs...), + LogRPCServerLogEntrySize: NewInt64HistogramRecorder(m.logRPCServerLogEntrySize, kvs...), + LogRPCServerBatchSize: NewInt64HistogramRecorder(m.logRPCServerBatchSize, kvs...), + LogRPCServerLogEntriesPerBatch: NewInt64HistogramRecorder(m.logRPCServerLogEntriesPerBatch, kvs...), }) if loaded { return nil, fmt.Errorf("storagenode: already registered %v", lsid)