Skip to content

Commit

Permalink
feat(storagenode): add pipelined Append RPC handler
Browse files Browse the repository at this point in the history
This patch changes the Append RPC handler to support pipelined requests and does not change the
client's API. Therefore, users can use Append API transparently.

Supporting pipelined requests can lead to overhead since it is necessary to have additional
goroutines and concurrent queues. As a result of experimentations, this PR showed little overhead.
This change uses [reader-biased mutex](https://github.com/puzpuzpuz/xsync#rbmutex) instead of
built-in RWMutex to avoid shared lock contention.

This PR implements server-side parts of LogStreamAppender mentioned in #433. It also can be used for
pipelining generic Append RPC said in #441.
  • Loading branch information
ijsong committed May 23, 2023
1 parent 82c1e97 commit 2a8118a
Show file tree
Hide file tree
Showing 17 changed files with 2,498 additions and 44 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/lib/pq v1.10.9
github.com/pkg/errors v0.9.1
github.com/puzpuzpuz/xsync/v2 v2.4.0
github.com/smartystreets/assertions v1.13.1
github.com/smartystreets/goconvey v1.8.0
github.com/soheilhy/cmux v0.1.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/puzpuzpuz/xsync/v2 v2.4.0 h1:5sXAMHrtx1bg9nbRZTOn8T4MkWe5V+o8yKRH02Eznag=
github.com/puzpuzpuz/xsync/v2 v2.4.0/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
Expand Down
159 changes: 128 additions & 31 deletions internal/storagenode/log_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

pbtypes "github.com/gogo/protobuf/types"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -23,55 +24,151 @@ type logServer struct {

var _ snpb.LogIOServer = (*logServer)(nil)

func (ls logServer) Append(stream snpb.LogIO_AppendServer) (err error) {
req, rsp := &snpb.AppendRequest{}, &snpb.AppendResponse{}
func (ls *logServer) Append(stream snpb.LogIO_AppendServer) error {
// Avoid race of Add and Wait of wgAppenders.
rt := ls.sn.mu.RLock()
defer ls.sn.mu.RUnlock(rt)

cq := make(chan *logstream.AppendTask, 10)

ls.sn.wgAppenders.Add(1)
go func() {
defer ls.sn.wgAppenders.Done()
ls.appendStreamRecvLoop(stream, cq)
}()

var eg errgroup.Group
eg.Go(func() error {
return ls.appendStreamSendLoop(stream, cq)
})
err := eg.Wait()
if err == nil {
return nil
}

// drain cq
ls.sn.wgAppenders.Add(1)
go func() {
defer ls.sn.wgAppenders.Done()
for appendTask := range cq {
appendTask.Release()
}
}()

if err == io.EOF {
return nil
}

var code codes.Code
switch err {
case verrors.ErrSealed:
code = codes.FailedPrecondition
case snerrors.ErrNotPrimary:
code = codes.Unavailable
default:
code = status.Code(err)
if code == codes.Unknown {
code = status.FromContextError(err).Code()
}

}
return status.Error(code, err.Error())
}

func (ls *logServer) appendStreamRecvLoop(stream snpb.LogIO_AppendServer, cq chan<- *logstream.AppendTask) {
defer close(cq)

var (
appendTask *logstream.AppendTask
lse *logstream.Executor
err error
loaded bool
tpid types.TopicID
lsid types.LogStreamID
)
req := &snpb.AppendRequest{}
ctx := stream.Context()

for {
appendTask = logstream.NewAppendTask()

req.Reset()
err = stream.RecvMsg(req)
if err == io.EOF {
return nil
}
if err != nil {
return err
goto Out
}

err = snpb.ValidateTopicLogStream(req)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
if tpid.Invalid() && lsid.Invalid() {
err = snpb.ValidateTopicLogStream(req)
if err != nil {
err = status.Error(codes.InvalidArgument, err.Error())
goto Out
}
tpid = req.TopicID
lsid = req.LogStreamID
}

lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
return status.Error(codes.NotFound, "no such log stream")
if req.TopicID != tpid || req.LogStreamID != lsid {
err = status.Error(codes.InvalidArgument, "unmatched topic or logstream")
goto Out
}
res, err := lse.Append(stream.Context(), req.Payload)
if err != nil {
var code codes.Code
switch err {
case verrors.ErrSealed:
code = codes.FailedPrecondition
case snerrors.ErrNotPrimary:
code = codes.Unavailable
default:
code = status.FromContextError(err).Code()

if lse == nil {
lse, loaded = ls.sn.executors.Load(tpid, lsid)
if !loaded {
err = status.Error(codes.NotFound, "no such log stream")
goto Out
}
return status.Error(code, err.Error())
}

rsp.Results = res
err = stream.Send(rsp)
err = lse.AppendAsync(ctx, req.Payload, appendTask)
Out:
if err != nil {
appendTask.SetError(err)
}
cq <- appendTask
if err != nil {
return err
return
}
}
}

func (ls *logServer) appendStreamSendLoop(stream snpb.LogIO_AppendServer, cq <-chan *logstream.AppendTask) (err error) {
var res []snpb.AppendResult
rsp := &snpb.AppendResponse{}
ctx := stream.Context()

for {
select {
case <-ctx.Done():
return ctx.Err()
case appendTask, ok := <-cq:
if !ok {
return nil
}
res, err = appendTask.WaitForCompletion(ctx)
if err != nil {
appendTask.Release()
return err
}

appendTask.ReleaseWriteWaitGroups()
appendTask.Release()

rsp.Results = res
err = stream.Send(rsp)
if err != nil {
return err
}
}
}
}

func (ls logServer) Read(context.Context, *snpb.ReadRequest) (*snpb.ReadResponse, error) {
func (ls *logServer) Read(context.Context, *snpb.ReadRequest) (*snpb.ReadResponse, error) {
return nil, status.Error(codes.Unimplemented, "deprecated")
}

func (ls logServer) Subscribe(req *snpb.SubscribeRequest, stream snpb.LogIO_SubscribeServer) error {
func (ls *logServer) Subscribe(req *snpb.SubscribeRequest, stream snpb.LogIO_SubscribeServer) error {
if err := snpb.ValidateTopicLogStream(req); err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -128,7 +225,7 @@ Loop:
return status.Error(status.FromContextError(sr.Err()).Code(), sr.Err().Error())
}

func (ls logServer) SubscribeTo(req *snpb.SubscribeToRequest, stream snpb.LogIO_SubscribeToServer) (err error) {
func (ls *logServer) SubscribeTo(req *snpb.SubscribeToRequest, stream snpb.LogIO_SubscribeToServer) (err error) {
err = snpb.ValidateTopicLogStream(req)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
Expand Down Expand Up @@ -177,7 +274,7 @@ Loop:
return multierr.Append(err, sr.Err())
}

func (ls logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecatedRequest) (*pbtypes.Empty, error) {
func (ls *logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecatedRequest) (*pbtypes.Empty, error) {
ls.sn.executors.Range(func(_ types.LogStreamID, tpid types.TopicID, lse *logstream.Executor) bool {
if req.TopicID != tpid {
return true
Expand All @@ -188,7 +285,7 @@ func (ls logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecated
return &pbtypes.Empty{}, nil
}

func (ls logServer) LogStreamReplicaMetadata(_ context.Context, req *snpb.LogStreamReplicaMetadataRequest) (*snpb.LogStreamReplicaMetadataResponse, error) {
func (ls *logServer) LogStreamReplicaMetadata(_ context.Context, req *snpb.LogStreamReplicaMetadataRequest) (*snpb.LogStreamReplicaMetadataResponse, error) {
if err := snpb.ValidateTopicLogStream(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
127 changes: 127 additions & 0 deletions internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logstream

import (
"context"
"sync"
"time"

"go.uber.org/zap"
Expand All @@ -12,13 +13,139 @@ import (
"github.com/kakao/varlog/proto/snpb"
)

var appendTaskPool = sync.Pool{
New: func() any {
return &AppendTask{}
},
}

type AppendTask struct {
lse *Executor
deferredFunc func(*AppendTask)
err error
start time.Time
apc appendContext
dataBatchLen int
}

func NewAppendTask() *AppendTask {
at := appendTaskPool.Get().(*AppendTask)
return at
}

func (at *AppendTask) SetError(err error) {
at.err = err
}

func (at *AppendTask) Release() {
if at.deferredFunc != nil {
at.deferredFunc(at)
}
*at = AppendTask{}
appendTaskPool.Put(at)
}

func (at *AppendTask) ReleaseWriteWaitGroups() {
for i := range at.apc.wwgs {
at.apc.wwgs[i].release()
}
}

func (at *AppendTask) WaitForCompletion(ctx context.Context) (res []snpb.AppendResult, err error) {
if at.err != nil {
return nil, at.err
}

res = make([]snpb.AppendResult, at.dataBatchLen)
for i := range at.apc.awgs {
cerr := at.apc.awgs[i].wait(ctx)
if cerr != nil {
res[i].Error = cerr.Error()
if err == nil {
err = cerr
}
continue
}
if err != nil {
at.lse.logger.Panic("Results of batch requests of Append RPC must not be interleaved with success and failure", zap.Error(err))
}
res[i].Meta.TopicID = at.lse.tpid
res[i].Meta.LogStreamID = at.lse.lsid
res[i].Meta.GLSN = at.apc.awgs[i].glsn
res[i].Meta.LLSN = at.apc.awgs[i].llsn
at.apc.awgs[i].release()
}
if res[0].Meta.GLSN.Invalid() {
return nil, err
}
return res, nil
}

func appendTaskDeferredFunc(at *AppendTask) {
at.lse.inflight.Add(-1)
at.lse.inflightAppend.Add(-1)
if at.lse.lsm != nil {
at.lse.lsm.AppendDuration.Add(time.Since(at.start).Microseconds())
}
}

type appendContext struct {
sts []*sequenceTask
wwgs []*writeWaitGroup
awgs []*appendWaitGroup
totalBytes int64
}

func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, appendTask *AppendTask) error {
lse.inflight.Add(1)
lse.inflightAppend.Add(1)

startTime := time.Now()
dataBatchLen := len(dataBatch)

appendTask.start = startTime
appendTask.lse = lse
appendTask.dataBatchLen = dataBatchLen
appendTask.deferredFunc = appendTaskDeferredFunc

switch lse.esm.load() {
case executorStateSealing, executorStateSealed, executorStateLearning:
return verrors.ErrSealed
case executorStateClosed:
return verrors.ErrClosed
}
if !lse.isPrimary() {
return snerrors.ErrNotPrimary
}

_, batchletLen := batchlet.SelectLengthClass(dataBatchLen)
batchletCount := dataBatchLen / batchletLen
if dataBatchLen%batchletLen > 0 {
batchletCount++
}

appendTask.apc = appendContext{
sts: make([]*sequenceTask, 0, batchletCount),
wwgs: make([]*writeWaitGroup, 0, batchletCount),
awgs: make([]*appendWaitGroup, 0, dataBatchLen),
}

var preparationDuration time.Duration
defer func() {
if lse.lsm != nil {
lse.lsm.AppendLogs.Add(int64(dataBatchLen))
lse.lsm.AppendBytes.Add(appendTask.apc.totalBytes)
lse.lsm.AppendOperations.Add(1)
lse.lsm.AppendPreparationMicro.Add(preparationDuration.Microseconds())
}
}()

lse.prepareAppendContext(dataBatch, &appendTask.apc)
preparationDuration = time.Since(startTime)
lse.sendSequenceTasks(ctx, appendTask.apc.sts)
return nil
}

// Append appends a batch of logs to the log stream.
func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.AppendResult, error) {
lse.inflight.Add(1)
Expand Down
Loading

0 comments on commit 2a8118a

Please sign in to comment.