From 354b4229a9de7f6bf0c208aadd7e7fc60a75dbbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 10 Jan 2024 23:07:55 +0000 Subject: [PATCH 1/2] Replace go.uber.org/atomic with sync/atomic --- benchmark/internal_server.go | 6 +++--- benchmark/internal_tcp_server.go | 5 ++--- benchmark/real_relay.go | 4 ++-- benchmark/tcp_raw_relay.go | 5 ++--- channel.go | 4 ++-- close_test.go | 6 +++--- connection.go | 16 +++++++++------- examples/bench/client/client.go | 6 +++--- frame_pool_b_test.go | 3 +-- go.mod | 3 +-- go.sum | 17 ++--------------- handlers_with_skip_test.go | 4 ++-- mex.go | 8 ++++---- peer.go | 4 ++-- peer_test.go | 2 +- peers/prefer_test.go | 6 +++--- relay.go | 6 +++--- relay_test.go | 7 +++---- testutils/channel.go | 4 ++-- testutils/channel_opts.go | 2 +- testutils/counter.go | 13 ++++++------- testutils/logger.go | 5 ++--- testutils/mockhyperbahn/hyperbahn_test.go | 2 +- testutils/relay.go | 4 ++-- testutils/test_server.go | 4 ++-- thrift/thrift_bench_test.go | 2 +- tracing_test.go | 9 +++++---- 27 files changed, 70 insertions(+), 87 deletions(-) diff --git a/benchmark/internal_server.go b/benchmark/internal_server.go index 134730190..cb45aca53 100644 --- a/benchmark/internal_server.go +++ b/benchmark/internal_server.go @@ -23,6 +23,7 @@ package benchmark import ( "fmt" "os" + "sync/atomic" "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/hyperbahn" @@ -30,7 +31,6 @@ import ( "github.com/uber/tchannel-go/thrift" gen "github.com/uber/tchannel-go/thrift/gen-go/test" - "go.uber.org/atomic" "golang.org/x/net/context" ) @@ -118,7 +118,7 @@ func (rawHandler) OnError(ctx context.Context, err error) { } func (h rawHandler) Handle(ctx context.Context, args *raw.Args) (*raw.Res, error) { - h.calls.Inc() + h.calls.Add(1) return &raw.Res{ Arg2: args.Arg2, Arg3: args.Arg3, @@ -130,6 +130,6 @@ type handler struct { } func (h handler) Echo(ctx thrift.Context, arg1 string) (string, error) { - h.calls.Inc() + h.calls.Add(1) return arg1, nil } diff --git a/benchmark/internal_tcp_server.go b/benchmark/internal_tcp_server.go index 58c5ab0df..656e79b0c 100644 --- a/benchmark/internal_tcp_server.go +++ b/benchmark/internal_tcp_server.go @@ -23,10 +23,9 @@ package benchmark import ( "log" "net" + "sync/atomic" "github.com/uber/tchannel-go" - - "go.uber.org/atomic" ) // internalTCPServer represents a TCP server responds to TChannel @@ -98,7 +97,7 @@ func (s *internalTCPServer) writeResponses(conn net.Conn, ids chan uint32) { continue } - s.rawCalls.Inc() + s.rawCalls.Add(1) if _, err := frames.writeCallRes(id, conn); err != nil { log.Printf("writeCallRes failed: %v", err) return diff --git a/benchmark/real_relay.go b/benchmark/real_relay.go index 2bef98af9..5e87a162b 100644 --- a/benchmark/real_relay.go +++ b/benchmark/real_relay.go @@ -23,11 +23,11 @@ package benchmark import ( "errors" "os" + "sync/atomic" "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/relay" "github.com/uber/tchannel-go/relay/relaytest" - "go.uber.org/atomic" ) type fixedHosts struct { @@ -46,7 +46,7 @@ func (fh *fixedHosts) Get(cf relay.CallFrame, _ *relay.Conn) (string, error) { cf.Arg2Append(kv.Key, kv.Val) } - pickI := int(fh.pickI.Inc()-1) % len(peers) + pickI := int(fh.pickI.Add(1)-1) % len(peers) return peers[pickI], nil } diff --git a/benchmark/tcp_raw_relay.go b/benchmark/tcp_raw_relay.go index c257d2661..c109e0401 100644 --- a/benchmark/tcp_raw_relay.go +++ b/benchmark/tcp_raw_relay.go @@ -24,8 +24,7 @@ import ( "io" "log" "net" - - "go.uber.org/atomic" + "sync/atomic" ) type tcpRelay struct { @@ -87,7 +86,7 @@ func (r *tcpRelay) handleIncoming(src net.Conn) { } func (r *tcpRelay) nextDestination() string { - i := int(r.destI.Inc()-1) % len(r.dests) + i := int(r.destI.Add(1)-1) % len(r.dests) return r.dests[i] } diff --git a/channel.go b/channel.go index 1b76277d2..1da41fa9d 100644 --- a/channel.go +++ b/channel.go @@ -29,12 +29,12 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "time" "github.com/uber/tchannel-go/tnet" "github.com/opentracing/opentracing-go" - "go.uber.org/atomic" "golang.org/x/net/context" ) @@ -264,7 +264,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) { timeTicker = time.NewTicker } - chID := _nextChID.Inc() + chID := _nextChID.Add(1) logger = logger.WithFields( LogField{"serviceName", serviceName}, LogField{"process", processName}, diff --git a/close_test.go b/close_test.go index 3955a7655..68c567eca 100644 --- a/close_test.go +++ b/close_test.go @@ -23,6 +23,7 @@ package tchannel_test import ( "math/rand" "sync" + "sync/atomic" "testing" "time" @@ -33,7 +34,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/atomic" "golang.org/x/net/context" ) @@ -601,10 +601,10 @@ func TestCloseSendError(t *testing.T) { opts := testutils.NewOpts().DisableLogVerification() serverCh := testutils.NewServer(t, opts) testutils.RegisterEcho(serverCh, func() { - if counter.Inc() > 10 { + if counter.Add(1) > 10 { // Close the server in a goroutine to possibly trigger more race conditions. go func() { - closed.Inc() + closed.Add(1) serverCh.Close() }() } diff --git a/connection.go b/connection.go index 36cf64ab5..42212337b 100644 --- a/connection.go +++ b/connection.go @@ -27,12 +27,12 @@ import ( "net" "strings" "sync" + "sync/atomic" "syscall" "time" "github.com/uber/tchannel-go/tos" - "go.uber.org/atomic" "golang.org/x/net/context" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" @@ -315,7 +315,7 @@ func (ch *Channel) setConnectionTosPriority(tosPriority tos.ToS, c net.Conn) err func (ch *Channel) newConnection(baseCtx context.Context, conn net.Conn, initialID uint32, outboundHP string, remotePeer PeerInfo, remotePeerAddress peerAddressComponents, events connectionEvents) *Connection { opts := ch.connectionOptions.withDefaults() - connID := _nextConnID.Inc() + connID := _nextConnID.Add(1) connDirection := inbound log := ch.log.WithFields(LogFields{ {"connID", connID}, @@ -356,10 +356,12 @@ func (ch *Channel) newConnection(baseCtx context.Context, conn net.Conn, initial events: events, commonStatsTags: ch.commonStatsTags, healthCheckHistory: newHealthHistory(), - lastActivityRead: *atomic.NewInt64(timeNow), - lastActivityWrite: *atomic.NewInt64(timeNow), + lastActivityRead: atomic.Int64{}, + lastActivityWrite: atomic.Int64{}, baseContext: ch.connContext(baseCtx, conn), } + c.lastActivityRead.Store(timeNow) + c.lastActivityWrite.Store(timeNow) if tosPriority := opts.TosPriority; tosPriority > 0 { if err := ch.setConnectionTosPriority(tosPriority, conn); err != nil { @@ -522,7 +524,7 @@ func (c *Connection) RemotePeerInfo() PeerInfo { // NextMessageID reserves the next available message id for this connection func (c *Connection) NextMessageID() uint32 { - return c.nextMessageID.Inc() + return c.nextMessageID.Add(1) } // SendSystemError sends an error frame for the given system error. @@ -617,7 +619,7 @@ func (c *Connection) connectionError(site string, err error) error { c.close(closeLogFields...) // On any connection error, notify the exchanges of this error. - if c.stoppedExchanges.CAS(false, true) { + if c.stoppedExchanges.CompareAndSwap(false, true) { c.outbound.stopExchanges(err) c.inbound.stopExchanges(err) } @@ -638,7 +640,7 @@ func (c *Connection) protocolError(id uint32, err error) error { ) // On any connection error, notify the exchanges of this error. - if c.stoppedExchanges.CAS(false, true) { + if c.stoppedExchanges.CompareAndSwap(false, true) { c.outbound.stopExchanges(sysErr) c.inbound.stopExchanges(sysErr) } diff --git a/examples/bench/client/client.go b/examples/bench/client/client.go index 38f9d9b06..781d3be13 100644 --- a/examples/bench/client/client.go +++ b/examples/bench/client/client.go @@ -26,12 +26,12 @@ import ( "net/http" _ "net/http/pprof" "runtime" + "sync/atomic" "time" "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/raw" - "go.uber.org/atomic" "golang.org/x/net/context" ) @@ -83,14 +83,14 @@ func worker(ch *tchannel.Channel) { log.Fatalf("set failed: %v", err) continue } - counter.Inc() + counter.Add(1) for i := 0; i < *getToSetRatio; i++ { _, err := getRequest(ch, "key") if err != nil { log.Fatalf("get failed: %v", err) } - counter.Inc() + counter.Add(1) } } } diff --git a/frame_pool_b_test.go b/frame_pool_b_test.go index 6a5d71272..87c65293d 100644 --- a/frame_pool_b_test.go +++ b/frame_pool_b_test.go @@ -23,11 +23,10 @@ package tchannel_test import ( "math/rand" "sync" + "sync/atomic" "testing" . "github.com/uber/tchannel-go" - - "go.uber.org/atomic" ) func benchmarkUsing(b *testing.B, pool FramePool) { diff --git a/go.mod b/go.mod index e90e2465f..88925d0ea 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,6 @@ require ( github.com/stretchr/testify v1.5.1 github.com/uber-go/tally v3.3.15+incompatible github.com/uber/jaeger-client-go v2.22.1+incompatible - go.uber.org/atomic v1.6.0 go.uber.org/multierr v1.2.0 golang.org/x/net v0.14.0 golang.org/x/sys v0.11.0 @@ -29,6 +28,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.3.0 // indirect github.com/uber/jaeger-lib v2.4.1+incompatible // indirect - golang.org/x/tools v0.1.12 // indirect + go.uber.org/atomic v1.11.0 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect ) diff --git a/go.sum b/go.sum index cb5ba584c..e5be74ae2 100644 --- a/go.sum +++ b/go.sum @@ -40,27 +40,14 @@ github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR8 github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg= github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= -go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= -go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/multierr v1.2.0 h1:6I+W7f5VwC5SV9dNrZ3qXrDB9mD0dyGOi/ZJmYw03T4= go.uber.org/multierr v1.2.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/handlers_with_skip_test.go b/handlers_with_skip_test.go index 7a4edb419..2b968a329 100644 --- a/handlers_with_skip_test.go +++ b/handlers_with_skip_test.go @@ -22,6 +22,7 @@ package tchannel_test import ( "fmt" + "sync/atomic" "testing" "time" @@ -30,7 +31,6 @@ import ( . "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/raw" "github.com/uber/tchannel-go/testutils" - "go.uber.org/atomic" "golang.org/x/net/context" ) @@ -90,5 +90,5 @@ func TestUserHandlerWithSkipInvalidInput(t *testing.T) { type recordHandler struct{ c atomic.Uint32 } func (r *recordHandler) Handle(ctx context.Context, call *InboundCall) { - r.c.Inc() + r.c.Add(1) } diff --git a/mex.go b/mex.go index f38fb3ae5..4849b25ac 100644 --- a/mex.go +++ b/mex.go @@ -24,10 +24,10 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "github.com/uber/tchannel-go/typed" - "go.uber.org/atomic" "golang.org/x/net/context" ) @@ -255,11 +255,11 @@ func (mex *messageExchange) onCtxErr(err error) { func (mex *messageExchange) shutdown() { // The reader and writer side can both hit errors and try to shutdown the mex, // so we ensure that it's only shut down once. - if !mex.shutdownAtomic.CAS(false, true) { + if !mex.shutdownAtomic.CompareAndSwap(false, true) { return } - if mex.errChNotified.CAS(false, true) { + if mex.errChNotified.CompareAndSwap(false, true) { mex.errCh.Notify(errMexShutdown) } @@ -529,7 +529,7 @@ func (mexset *messageExchangeSet) stopExchanges(err error) { // on sendChRefs that there's no references to sendCh is violated since // readers/writers could still have a reference to sendCh even though // we shutdown the exchange and called Done on sendChRefs. - if mex.errChNotified.CAS(false, true) { + if mex.errChNotified.CompareAndSwap(false, true) { mex.errCh.Notify(err) } } diff --git a/peer.go b/peer.go index 03bee6c4d..146605912 100644 --- a/peer.go +++ b/peer.go @@ -25,11 +25,11 @@ import ( "errors" "strings" "sync" + "sync/atomic" "time" "github.com/uber/tchannel-go/trand" - "go.uber.org/atomic" "golang.org/x/net/context" ) @@ -210,7 +210,7 @@ func (l *PeerList) choosePeer(prevSelected map[string]struct{}, avoidHost bool) } l.peerHeap.pushPeer(ps) - ps.chosenCount.Inc() + ps.chosenCount.Add(1) return ps.Peer } diff --git a/peer_test.go b/peer_test.go index e5bc3fcc3..ba8f6bff9 100644 --- a/peer_test.go +++ b/peer_test.go @@ -24,6 +24,7 @@ import ( "fmt" "sort" "sync" + "sync/atomic" "testing" "time" @@ -35,7 +36,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/atomic" ) func fakePeer(t *testing.T, ch *Channel, hostPort string) *Peer { diff --git a/peers/prefer_test.go b/peers/prefer_test.go index 0a926883d..38de1cd53 100644 --- a/peers/prefer_test.go +++ b/peers/prefer_test.go @@ -23,6 +23,7 @@ package peers import ( "fmt" "hash/fnv" + "sync/atomic" "testing" "time" @@ -32,7 +33,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/atomic" "golang.org/x/net/context" ) @@ -86,7 +86,7 @@ func TestHRWScorerDistribution(t *testing.T) { func countingServer(t *testing.T, opts *testutils.ChannelOpts) (*tchannel.Channel, *atomic.Int32) { var cnt atomic.Int32 server := testutils.NewServer(t, opts) - testutils.RegisterEcho(server, func() { cnt.Inc() }) + testutils.RegisterEcho(server, func() { cnt.Add(1) }) return server, &cnt } @@ -149,7 +149,7 @@ func TestHRWScorerIntegration(t *testing.T) { // And if s1 comes back, calls should resume to s1. s1Up := testutils.NewClient(t, sOpts) - testutils.RegisterEcho(s1Up, func() { s1Count.Inc() }) + testutils.RegisterEcho(s1Up, func() { s1Count.Add(1) }) err = s1Up.ListenAndServe(s1.PeerInfo().HostPort) require.NoError(t, err, "Failed to bring up a new channel as s1") diff --git a/relay.go b/relay.go index c2486a7cc..f39d14e6e 100644 --- a/relay.go +++ b/relay.go @@ -28,11 +28,11 @@ import ( "io" "math" "sync" + "sync/atomic" "time" "github.com/uber/tchannel-go/relay" "github.com/uber/tchannel-go/typed" - "go.uber.org/atomic" ) const ( @@ -378,7 +378,7 @@ func (r *Relayer) canHandleNewCall() (bool, connectionState) { curState = r.conn.state canHandle = curState == connectionActive if canHandle { - r.pending.Inc() + r.pending.Add(1) } return nil }) @@ -675,7 +675,7 @@ func (r *Relayer) finishRelayItem(items *relayItems, id uint32) { } func (r *Relayer) decrementPending() { - r.pending.Dec() + r.pending.Add(^uint32(0)) r.conn.checkExchanges() } diff --git a/relay_test.go b/relay_test.go index f261c2290..be5d8da9f 100644 --- a/relay_test.go +++ b/relay_test.go @@ -25,12 +25,12 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net" "os" "runtime" "strings" "sync" + "sync/atomic" "testing" "time" @@ -48,7 +48,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/atomic" "golang.org/x/net/context" ) @@ -329,7 +328,7 @@ func TestRaceCloseWithNewCall(t *testing.T) { assert.True(t, closed, "Relay did not close within timeout") // Now stop all calls, and wait for the calling goroutine to end. - stopCalling.Inc() + stopCalling.Add(1) callers.Wait() }) } @@ -2114,7 +2113,7 @@ func decodeThriftHeaders(t testing.TB, bs []byte) map[string]string { require.NoError(t, err, "Failed to read headers") // Ensure there are no remaining bytes left. - remaining, err := ioutil.ReadAll(r) + remaining, err := io.ReadAll(r) require.NoError(t, err, "failed to read from arg2 reader") assert.Empty(t, remaining, "expected no bytes after reading headers") diff --git a/testutils/channel.go b/testutils/channel.go index 49ab011d0..b4b6dbe5b 100644 --- a/testutils/channel.go +++ b/testutils/channel.go @@ -25,12 +25,12 @@ import ( "encoding/json" "fmt" "net" + "sync/atomic" "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/internal/testcert" "github.com/uber/tchannel-go/raw" - "go.uber.org/atomic" "golang.org/x/net/context" ) @@ -70,7 +70,7 @@ var totalClients atomic.Uint32 func NewClientChannel(opts *ChannelOpts) (*tchannel.Channel, error) { opts = opts.Copy() - clientNum := totalClients.Inc() + clientNum := totalClients.Add(1) serviceName := defaultString(opts.ServiceName, DefaultClientName) opts.ProcessName = defaultString(opts.ProcessName, serviceName+"-"+fmt.Sprint(clientNum)) updateOptsLogger(opts) diff --git a/testutils/channel_opts.go b/testutils/channel_opts.go index c84aca29e..6b4a12ed0 100644 --- a/testutils/channel_opts.go +++ b/testutils/channel_opts.go @@ -23,13 +23,13 @@ package testutils import ( "flag" "net" + "sync/atomic" "testing" "time" "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/tos" - "go.uber.org/atomic" "golang.org/x/net/context" ) diff --git a/testutils/counter.go b/testutils/counter.go index 3ca0c92fc..40dd3b415 100644 --- a/testutils/counter.go +++ b/testutils/counter.go @@ -22,8 +22,7 @@ package testutils import ( "sync" - - "go.uber.org/atomic" + "sync/atomic" ) // Decrement is the interface returned by Decrementor. @@ -41,11 +40,11 @@ type decrementor struct { } func (d *decrementor) Single() bool { - return d.n.Dec() >= 0 + return d.n.Add(-1) >= 0 } func (d *decrementor) Multiple(n int) int { - decBy := -1 * int64(n) + decBy := -int64(n) decremented := d.n.Add(decBy) if decremented <= decBy { // Already out of tokens before this decrement. @@ -61,9 +60,9 @@ func (d *decrementor) Multiple(n int) int { // Decrementor returns a function that can be called from multiple goroutines and ensures // it will only return true n times. func Decrementor(n int) Decrement { - return &decrementor{ - n: *atomic.NewInt64(int64(n)), - } + d := &decrementor{} + d.n.Store(int64(n)) + return d } // Batch returns a slice with n broken into batches of size batchSize. diff --git a/testutils/logger.go b/testutils/logger.go index 0deb729b2..7ea3bdbd7 100644 --- a/testutils/logger.go +++ b/testutils/logger.go @@ -26,12 +26,11 @@ import ( "os" "strings" "sync" + "sync/atomic" "testing" "time" "github.com/uber/tchannel-go" - - "go.uber.org/atomic" ) // writer is shared between multiple loggers, and serializes acccesses to @@ -180,7 +179,7 @@ func (l errorLogger) checkFilters(msg string) bool { return false } - matchCount := l.s.matchCount[match].Inc() + matchCount := l.s.matchCount[match].Add(1) return uint(matchCount) <= l.v.Filters[match].Count } diff --git a/testutils/mockhyperbahn/hyperbahn_test.go b/testutils/mockhyperbahn/hyperbahn_test.go index dca31ea0b..92fc3092a 100644 --- a/testutils/mockhyperbahn/hyperbahn_test.go +++ b/testutils/mockhyperbahn/hyperbahn_test.go @@ -21,6 +21,7 @@ package mockhyperbahn_test import ( + "sync/atomic" "testing" "time" @@ -32,7 +33,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/atomic" ) var config = struct { diff --git a/testutils/relay.go b/testutils/relay.go index 229badeca..ae57c5ea8 100644 --- a/testutils/relay.go +++ b/testutils/relay.go @@ -24,13 +24,13 @@ import ( "io" "net" "sync" + "sync/atomic" "testing" "github.com/uber/tchannel-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/atomic" ) type frameRelay struct { @@ -67,7 +67,7 @@ func (r *frameRelay) listen() (listenHostPort string, cancel func()) { }() return conn.Addr().String(), func() { - r.closed.Inc() + r.closed.Add(1) conn.Close() r.Lock() for _, c := range r.conns { diff --git a/testutils/test_server.go b/testutils/test_server.go index f3abae47f..0ffd6e87b 100644 --- a/testutils/test_server.go +++ b/testutils/test_server.go @@ -25,6 +25,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "testing" "time" @@ -36,7 +37,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/atomic" "golang.org/x/net/context" ) @@ -455,7 +455,7 @@ func (ts *TestServer) verifyNoGoroutinesLeaked() { // No leaks, nothing to do. return } - if isFirstLeak := _leakedGoroutine.CAS(false, true); !isFirstLeak { + if isFirstLeak := _leakedGoroutine.CompareAndSwap(false, true); !isFirstLeak { ts.Log("Skipping check for leaked goroutines because of a previous leak.") return } diff --git a/thrift/thrift_bench_test.go b/thrift/thrift_bench_test.go index 890276442..6a8011658 100644 --- a/thrift/thrift_bench_test.go +++ b/thrift/thrift_bench_test.go @@ -22,6 +22,7 @@ package thrift_test import ( "flag" + "sync/atomic" "testing" "time" @@ -29,7 +30,6 @@ import ( "github.com/uber/tchannel-go/testutils" "github.com/stretchr/testify/require" - "go.uber.org/atomic" ) const callBatch = 100 diff --git a/tracing_test.go b/tracing_test.go index f1a3da4e4..6fa5fe85b 100644 --- a/tracing_test.go +++ b/tracing_test.go @@ -22,6 +22,7 @@ package tchannel_test import ( "sync" + "sync/atomic" "testing" "time" @@ -29,7 +30,6 @@ import ( "github.com/uber/tchannel-go/json" "github.com/uber/tchannel-go/testutils" "github.com/uber/tchannel-go/testutils/testtracing" - "go.uber.org/atomic" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" @@ -69,13 +69,14 @@ func TestTracingSpanAttributes(t *testing.T) { customAppHeaderKey = "futurama" customAppHeaderExpectedValue = "simpsons" ) - var customAppHeaderValue atomic.String + var customAppHeaderValue atomic.Pointer[string] // Register JSON handler jsonHandler := &JSONHandler{ TraceHandler: testtracing.TraceHandler{Ch: ch}, t: t, sideEffect: func(ctx json.Context) { - customAppHeaderValue.Store(ctx.Headers()[customAppHeaderKey]) + header := ctx.Headers()[customAppHeaderKey] + customAppHeaderValue.Store(&header) }, } json.Register(ch, json.Handlers{"call": jsonHandler.callJSON}, jsonHandler.onError) @@ -104,7 +105,7 @@ func TestTracingSpanAttributes(t *testing.T) { require.NoError(t, json.CallPeer(json.WithHeaders(ctx, requestHeaders), peer, ch.PeerInfo().ServiceName, "call", &testtracing.TracingRequest{}, &response)) - assert.Equal(t, customAppHeaderExpectedValue, customAppHeaderValue.Load(), "custom header was propagated") + assert.Equal(t, customAppHeaderExpectedValue, *customAppHeaderValue.Load(), "custom header was propagated") // Spans are finished in inbound.doneSending() or outbound.doneReading(), // which are called on different go-routines and may execute *after* the From 37b75e1ff00a510d42f85a1485080422f0158ed4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 11 Jan 2024 15:22:55 +0000 Subject: [PATCH 2/2] pr review --- connection.go | 2 -- relay.go | 2 ++ relay_test.go | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/connection.go b/connection.go index 42212337b..2046e13a9 100644 --- a/connection.go +++ b/connection.go @@ -356,8 +356,6 @@ func (ch *Channel) newConnection(baseCtx context.Context, conn net.Conn, initial events: events, commonStatsTags: ch.commonStatsTags, healthCheckHistory: newHealthHistory(), - lastActivityRead: atomic.Int64{}, - lastActivityWrite: atomic.Int64{}, baseContext: ch.connContext(baseCtx, conn), } c.lastActivityRead.Store(timeNow) diff --git a/relay.go b/relay.go index f39d14e6e..4aca6cf20 100644 --- a/relay.go +++ b/relay.go @@ -675,6 +675,8 @@ func (r *Relayer) finishRelayItem(items *relayItems, id uint32) { } func (r *Relayer) decrementPending() { + // This is the documented way to decrement atomic unsigned integers + // https://pkg.go.dev/sync/atomic#AddUint32 r.pending.Add(^uint32(0)) r.conn.checkExchanges() } diff --git a/relay_test.go b/relay_test.go index be5d8da9f..1cea0f3cc 100644 --- a/relay_test.go +++ b/relay_test.go @@ -25,6 +25,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net" "os" "runtime" @@ -2113,7 +2114,7 @@ func decodeThriftHeaders(t testing.TB, bs []byte) map[string]string { require.NoError(t, err, "Failed to read headers") // Ensure there are no remaining bytes left. - remaining, err := io.ReadAll(r) + remaining, err := ioutil.ReadAll(r) require.NoError(t, err, "failed to read from arg2 reader") assert.Empty(t, remaining, "expected no bytes after reading headers")