Skip to content

Commit

Permalink
perf(client): optimize with atomic.Int64 for lastSubscribeAt in pkg/v…
Browse files Browse the repository at this point in the history
…arlog.(subscriber)

Switches the type of `pkg/varlog.(subscriber).lastSubscribeAt` from
`atomic.Value` to `atomic.Int64` to eliminate heap escapes. The frequent updates
with new `time.Time` values to `lastSubscribeAt` previously led to significant
heap allocation due to heap escape. By adopting `atomic.Int64`, we can prevent
heap escape, thereby reducing heap allocations and enhancing performance.
  • Loading branch information
ijsong committed Feb 28, 2024
1 parent cd283ae commit c43198f
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions pkg/varlog/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ type subscriber struct {
closed atomic.Bool
complete atomic.Bool

lastSubscribeAt atomic.Value
lastSubscribeAt atomic.Int64

logger *zap.Logger
}
Expand All @@ -199,7 +199,7 @@ func newSubscriber(ctx context.Context, topicID types.TopicID, logStreamID types
done: make(chan struct{}),
logger: logger.Named("subscriber").With(zap.Int32("lsid", int32(logStreamID))),
}
s.lastSubscribeAt.Store(time.Now())
s.lastSubscribeAt.Store(time.Now().UnixNano())

Check warning on line 202 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L202

Added line #L202 was not covered by tests
s.closed.Store(false)
s.complete.Store(false)
return s, nil
Expand Down Expand Up @@ -237,7 +237,7 @@ func (s *subscriber) subscribe(ctx context.Context) {
needExit := r.result.Error != nil

if res.GLSN != types.InvalidGLSN {
s.lastSubscribeAt.Store(time.Now())
s.lastSubscribeAt.Store(time.Now().UnixNano())

Check warning on line 240 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L240

Added line #L240 was not covered by tests
} else if res.Error == io.EOF || errors.Is(res.Error, verrors.ErrTrimmed) {
s.complete.Store(true)
}
Expand All @@ -256,7 +256,8 @@ func (s *subscriber) subscribe(ctx context.Context) {
}

func (s *subscriber) getLastSubscribeAt() time.Time {
return s.lastSubscribeAt.Load().(time.Time)
nsec := s.lastSubscribeAt.Load()
return time.Unix(0, nsec)

Check warning on line 260 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L259-L260

Added lines #L259 - L260 were not covered by tests
}

type transmitter struct {
Expand Down

0 comments on commit c43198f

Please sign in to comment.