Skip to content

Commit

Permalink
kv: unit test txnMetricRecorder
Browse files Browse the repository at this point in the history
Addresses a TODO to wrap unit tests directly around this struct, like we
do for the other txnInterceptor. I plan to add a new metric to
`txnMetricRecorder`, so it makes sense to improve testing first.

Epic: None
Release note: None
  • Loading branch information
nvanbenschoten committed Nov 28, 2023
1 parent 04a04ca commit d3ef9c8
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 16 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ go_test(
"txn_correctness_test.go",
"txn_interceptor_committer_test.go",
"txn_interceptor_heartbeater_test.go",
"txn_interceptor_metric_recorder_test.go",
"txn_interceptor_pipeliner_client_test.go",
"txn_interceptor_pipeliner_test.go",
"txn_interceptor_seq_num_allocator_test.go",
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
Expand Down Expand Up @@ -258,9 +259,9 @@ func newRootTxnCoordSender(
mu: &tcs.mu.Mutex,
}
tcs.interceptorAlloc.txnMetricRecorder = txnMetricRecorder{
metrics: &tcs.metrics,
clock: tcs.clock,
txn: &tcs.mu.txn,
metrics: &tcs.metrics,
timeSource: timeutil.DefaultTimeSource{},
txn: &tcs.mu.txn,
}
tcs.initCommonInterceptors(tcf, txn, kv.RootTxn)

Expand Down
24 changes: 11 additions & 13 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,24 @@ package kvcoord

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// txnMetricRecorder is a txnInterceptor in charge of updating metrics about
// the behavior and outcome of a transaction. It records information about the
// requests that a transaction sends and updates counters and histograms when
// the transaction completes.
//
// TODO(nvanbenschoten): Unit test this file.
type txnMetricRecorder struct {
wrapped lockedSender
metrics *TxnMetrics
clock *hlc.Clock
wrapped lockedSender
metrics *TxnMetrics
timeSource timeutil.TimeSource

txn *roachpb.Transaction
txnStartNanos int64
txnStart time.Time
onePCCommit bool
parallelCommit bool
}
Expand All @@ -40,8 +38,8 @@ type txnMetricRecorder struct {
func (m *txnMetricRecorder) SendLocked(
ctx context.Context, ba *kvpb.BatchRequest,
) (*kvpb.BatchResponse, *kvpb.Error) {
if m.txnStartNanos == 0 {
m.txnStartNanos = timeutil.Now().UnixNano()
if m.txnStart.IsZero() {
m.txnStart = m.timeSource.Now()
}

br, pErr := m.wrapped.SendLocked(ctx, ba)
Expand Down Expand Up @@ -93,10 +91,10 @@ func (m *txnMetricRecorder) closeLocked() {
m.metrics.ParallelCommits.Inc(1)
}

if m.txnStartNanos != 0 {
duration := timeutil.Now().UnixNano() - m.txnStartNanos
if duration >= 0 {
m.metrics.Durations.RecordValue(duration)
if !m.txnStart.IsZero() {
dur := m.timeSource.Since(m.txnStart)
if dur >= 0 {
m.metrics.Durations.RecordValue(dur.Nanoseconds())
}
}
restarts := int64(m.txn.Epoch)
Expand Down
223 changes: 223 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvcoord

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func makeMockTxnMetricRecorder(
txn *roachpb.Transaction,
) (txnMetricRecorder, *mockLockedSender, *timeutil.ManualTime) {
mockSender := &mockLockedSender{}
metrics := MakeTxnMetrics(metric.TestSampleInterval)
timeSource := timeutil.NewManualTime(timeutil.Unix(0, 123))
return txnMetricRecorder{
wrapped: mockSender,
metrics: &metrics,
timeSource: timeSource,
txn: txn,
}, mockSender, timeSource
}

func TestTxnMetricRecorder(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

type metrics struct {
aborts, commits, commits1PC, parallelCommits, rollbacksFailed, duration, restarts int
}
check := func(t *testing.T, tm *txnMetricRecorder, m metrics) {
t.Helper()
assert.Equal(t, int64(m.aborts), tm.metrics.Aborts.Count(), "TxnMetrics.Aborts")
assert.Equal(t, int64(m.commits), tm.metrics.Commits.Count(), "TxnMetrics.Commits")
assert.Equal(t, int64(m.commits1PC), tm.metrics.Commits1PC.Count(), "TxnMetrics.Commits1PC")
assert.Equal(t, int64(m.parallelCommits), tm.metrics.ParallelCommits.Count(), "TxnMetrics.ParallelCommits")
assert.Equal(t, int64(m.rollbacksFailed), tm.metrics.RollbacksFailed.Count(), "TxnMetrics.RollbacksFailed")
// NOTE: histograms don't retain full precision, so we don't check the exact
// value. We just check whether the value is non-zero.
_, sum := tm.metrics.Durations.Total()
assert.Equal(t, m.duration != 0, sum != 0, "TxnMetrics.Durations")
_, sum = tm.metrics.Restarts.Total()
assert.Equal(t, m.restarts != 0, sum != 0, "TxnMetrics.Restarts")
}

t.Run("no-op", func(t *testing.T) {
txn := makeTxnProto()
tm, _, _ := makeMockTxnMetricRecorder(&txn)
tm.closeLocked()

check(t, &tm, metrics{aborts: 1, rollbacksFailed: 1})
})

t.Run("commit (1pc)", func(t *testing.T) {
txn := makeTxnProto()
tm, mockSender, timeSource := makeMockTxnMetricRecorder(&txn)

ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: txn.Clone()}
ba.Add(&kvpb.EndTxnRequest{Commit: true})

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 1)
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner())

// Simulate delay.
timeSource.Advance(234)

br := ba.CreateReply()
br.Txn = ba.Txn
br.Txn.Status = roachpb.COMMITTED
br.Responses[0].GetEndTxn().OnePhaseCommit = true
return br, nil
})
br, pErr := tm.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Acting as TxnCoordSender.
txn.Update(br.Txn)
tm.closeLocked()

check(t, &tm, metrics{commits: 1, commits1PC: 1, duration: 234})
})

t.Run("commit (parallel)", func(t *testing.T) {
txn := makeTxnProto()
tm, mockSender, timeSource := makeMockTxnMetricRecorder(&txn)

ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: txn.Clone()}
ba.Add(&kvpb.EndTxnRequest{Commit: true})

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 1)
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner())

// Simulate delay.
timeSource.Advance(234)

br := ba.CreateReply()
br.Txn = ba.Txn
br.Txn.Status = roachpb.COMMITTED
br.Responses[0].GetEndTxn().StagingTimestamp = br.Txn.WriteTimestamp
return br, nil
})
br, pErr := tm.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Acting as TxnCoordSender.
txn.Update(br.Txn)
tm.closeLocked()

check(t, &tm, metrics{commits: 1, parallelCommits: 1, duration: 234})
})

t.Run("abort", func(t *testing.T) {
txn := makeTxnProto()
tm, mockSender, timeSource := makeMockTxnMetricRecorder(&txn)

ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: txn.Clone()}
ba.Add(&kvpb.EndTxnRequest{Commit: true})

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 1)
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner())

// Simulate delay.
timeSource.Advance(234)

br := ba.CreateReply()
br.Txn = ba.Txn
br.Txn.Status = roachpb.ABORTED
return br, nil
})
br, pErr := tm.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Acting as TxnCoordSender.
txn.Update(br.Txn)
tm.closeLocked()

check(t, &tm, metrics{aborts: 1, duration: 234})
})

t.Run("restart", func(t *testing.T) {
txn := makeTxnProto()
tm, mockSender, timeSource := makeMockTxnMetricRecorder(&txn)

ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: txn.Clone()}
ba.Add(&kvpb.EndTxnRequest{Commit: true})

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Equal(t, enginepb.TxnEpoch(0), ba.Txn.Epoch)
require.Len(t, ba.Requests, 1)
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner())

// Simulate delay.
timeSource.Advance(234)

wtoErr := &kvpb.WriteTooOldError{ActualTimestamp: txn.WriteTimestamp.Add(0, 10)}
return nil, kvpb.NewErrorWithTxn(wtoErr, ba.Txn)
})
br, pErr := tm.SendLocked(ctx, ba)
require.Nil(t, br)
require.NotNil(t, pErr)
require.NotNil(t, pErr.GetTxn())

// Acting as TxnCoordSender.
txn.Update(pErr.GetTxn())
txn.Restart(0, 0, hlc.Timestamp{})

// Resend the batch at the new epoch.
ba.Header = kvpb.Header{Txn: txn.Clone()}

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Equal(t, enginepb.TxnEpoch(1), ba.Txn.Epoch)
require.Len(t, ba.Requests, 1)
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner())

// Simulate delay.
timeSource.Advance(234)

br = ba.CreateReply()
br.Txn = ba.Txn
br.Txn.Status = roachpb.COMMITTED
return br, nil
})
br, pErr = tm.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Acting as TxnCoordSender.
txn.Update(br.Txn)
tm.closeLocked()

check(t, &tm, metrics{commits: 1, duration: 468, restarts: 1})
})
}

0 comments on commit d3ef9c8

Please sign in to comment.