diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index b6e008b26605..be3f610669ea 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -143,6 +143,7 @@ go_test( "txn_correctness_test.go", "txn_interceptor_committer_test.go", "txn_interceptor_heartbeater_test.go", + "txn_interceptor_metric_recorder_test.go", "txn_interceptor_pipeliner_client_test.go", "txn_interceptor_pipeliner_test.go", "txn_interceptor_seq_num_allocator_test.go", diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index ad17a52ee111..3099c812a703 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" @@ -258,9 +259,9 @@ func newRootTxnCoordSender( mu: &tcs.mu.Mutex, } tcs.interceptorAlloc.txnMetricRecorder = txnMetricRecorder{ - metrics: &tcs.metrics, - clock: tcs.clock, - txn: &tcs.mu.txn, + metrics: &tcs.metrics, + timeSource: timeutil.DefaultTimeSource{}, + txn: &tcs.mu.txn, } tcs.initCommonInterceptors(tcf, txn, kv.RootTxn) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go index 3fd2b42bcee4..ac957ea1bce2 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go @@ -12,10 +12,10 @@ package kvcoord import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -23,15 +23,13 @@ import ( // the behavior and outcome of a transaction. It records information about the // requests that a transaction sends and updates counters and histograms when // the transaction completes. -// -// TODO(nvanbenschoten): Unit test this file. type txnMetricRecorder struct { - wrapped lockedSender - metrics *TxnMetrics - clock *hlc.Clock + wrapped lockedSender + metrics *TxnMetrics + timeSource timeutil.TimeSource txn *roachpb.Transaction - txnStartNanos int64 + txnStart time.Time onePCCommit bool parallelCommit bool } @@ -40,8 +38,8 @@ type txnMetricRecorder struct { func (m *txnMetricRecorder) SendLocked( ctx context.Context, ba *kvpb.BatchRequest, ) (*kvpb.BatchResponse, *kvpb.Error) { - if m.txnStartNanos == 0 { - m.txnStartNanos = timeutil.Now().UnixNano() + if m.txnStart.IsZero() { + m.txnStart = m.timeSource.Now() } br, pErr := m.wrapped.SendLocked(ctx, ba) @@ -93,10 +91,10 @@ func (m *txnMetricRecorder) closeLocked() { m.metrics.ParallelCommits.Inc(1) } - if m.txnStartNanos != 0 { - duration := timeutil.Now().UnixNano() - m.txnStartNanos - if duration >= 0 { - m.metrics.Durations.RecordValue(duration) + if !m.txnStart.IsZero() { + dur := m.timeSource.Since(m.txnStart) + if dur >= 0 { + m.metrics.Durations.RecordValue(dur.Nanoseconds()) } } restarts := int64(m.txn.Epoch) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder_test.go new file mode 100644 index 000000000000..fda5d049f337 --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder_test.go @@ -0,0 +1,223 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func makeMockTxnMetricRecorder( + txn *roachpb.Transaction, +) (txnMetricRecorder, *mockLockedSender, *timeutil.ManualTime) { + mockSender := &mockLockedSender{} + metrics := MakeTxnMetrics(metric.TestSampleInterval) + timeSource := timeutil.NewManualTime(timeutil.Unix(0, 123)) + return txnMetricRecorder{ + wrapped: mockSender, + metrics: &metrics, + timeSource: timeSource, + txn: txn, + }, mockSender, timeSource +} + +func TestTxnMetricRecorder(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + type metrics struct { + aborts, commits, commits1PC, parallelCommits, rollbacksFailed, duration, restarts int + } + check := func(t *testing.T, tm *txnMetricRecorder, m metrics) { + t.Helper() + assert.Equal(t, int64(m.aborts), tm.metrics.Aborts.Count(), "TxnMetrics.Aborts") + assert.Equal(t, int64(m.commits), tm.metrics.Commits.Count(), "TxnMetrics.Commits") + assert.Equal(t, int64(m.commits1PC), tm.metrics.Commits1PC.Count(), "TxnMetrics.Commits1PC") + assert.Equal(t, int64(m.parallelCommits), tm.metrics.ParallelCommits.Count(), "TxnMetrics.ParallelCommits") + assert.Equal(t, int64(m.rollbacksFailed), tm.metrics.RollbacksFailed.Count(), "TxnMetrics.RollbacksFailed") + // NOTE: histograms don't retain full precision, so we don't check the exact + // value. We just check whether the value is non-zero. + _, sum := tm.metrics.Durations.Total() + assert.Equal(t, m.duration != 0, sum != 0, "TxnMetrics.Durations") + _, sum = tm.metrics.Restarts.Total() + assert.Equal(t, m.restarts != 0, sum != 0, "TxnMetrics.Restarts") + } + + t.Run("no-op", func(t *testing.T) { + txn := makeTxnProto() + tm, _, _ := makeMockTxnMetricRecorder(&txn) + tm.closeLocked() + + check(t, &tm, metrics{aborts: 1, rollbacksFailed: 1}) + }) + + t.Run("commit (1pc)", func(t *testing.T) { + txn := makeTxnProto() + tm, mockSender, timeSource := makeMockTxnMetricRecorder(&txn) + + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: txn.Clone()} + ba.Add(&kvpb.EndTxnRequest{Commit: true}) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 1) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner()) + + // Simulate delay. + timeSource.Advance(234) + + br := ba.CreateReply() + br.Txn = ba.Txn + br.Txn.Status = roachpb.COMMITTED + br.Responses[0].GetEndTxn().OnePhaseCommit = true + return br, nil + }) + br, pErr := tm.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + + // Acting as TxnCoordSender. + txn.Update(br.Txn) + tm.closeLocked() + + check(t, &tm, metrics{commits: 1, commits1PC: 1, duration: 234}) + }) + + t.Run("commit (parallel)", func(t *testing.T) { + txn := makeTxnProto() + tm, mockSender, timeSource := makeMockTxnMetricRecorder(&txn) + + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: txn.Clone()} + ba.Add(&kvpb.EndTxnRequest{Commit: true}) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 1) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner()) + + // Simulate delay. + timeSource.Advance(234) + + br := ba.CreateReply() + br.Txn = ba.Txn + br.Txn.Status = roachpb.COMMITTED + br.Responses[0].GetEndTxn().StagingTimestamp = br.Txn.WriteTimestamp + return br, nil + }) + br, pErr := tm.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + + // Acting as TxnCoordSender. + txn.Update(br.Txn) + tm.closeLocked() + + check(t, &tm, metrics{commits: 1, parallelCommits: 1, duration: 234}) + }) + + t.Run("abort", func(t *testing.T) { + txn := makeTxnProto() + tm, mockSender, timeSource := makeMockTxnMetricRecorder(&txn) + + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: txn.Clone()} + ba.Add(&kvpb.EndTxnRequest{Commit: true}) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 1) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner()) + + // Simulate delay. + timeSource.Advance(234) + + br := ba.CreateReply() + br.Txn = ba.Txn + br.Txn.Status = roachpb.ABORTED + return br, nil + }) + br, pErr := tm.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + + // Acting as TxnCoordSender. + txn.Update(br.Txn) + tm.closeLocked() + + check(t, &tm, metrics{aborts: 1, duration: 234}) + }) + + t.Run("restart", func(t *testing.T) { + txn := makeTxnProto() + tm, mockSender, timeSource := makeMockTxnMetricRecorder(&txn) + + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: txn.Clone()} + ba.Add(&kvpb.EndTxnRequest{Commit: true}) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Equal(t, enginepb.TxnEpoch(0), ba.Txn.Epoch) + require.Len(t, ba.Requests, 1) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner()) + + // Simulate delay. + timeSource.Advance(234) + + wtoErr := &kvpb.WriteTooOldError{ActualTimestamp: txn.WriteTimestamp.Add(0, 10)} + return nil, kvpb.NewErrorWithTxn(wtoErr, ba.Txn) + }) + br, pErr := tm.SendLocked(ctx, ba) + require.Nil(t, br) + require.NotNil(t, pErr) + require.NotNil(t, pErr.GetTxn()) + + // Acting as TxnCoordSender. + txn.Update(pErr.GetTxn()) + txn.Restart(0, 0, hlc.Timestamp{}) + + // Resend the batch at the new epoch. + ba.Header = kvpb.Header{Txn: txn.Clone()} + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Equal(t, enginepb.TxnEpoch(1), ba.Txn.Epoch) + require.Len(t, ba.Requests, 1) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner()) + + // Simulate delay. + timeSource.Advance(234) + + br = ba.CreateReply() + br.Txn = ba.Txn + br.Txn.Status = roachpb.COMMITTED + return br, nil + }) + br, pErr = tm.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + + // Acting as TxnCoordSender. + txn.Update(br.Txn) + tm.closeLocked() + + check(t, &tm, metrics{commits: 1, duration: 468, restarts: 1}) + }) +}