Skip to content

Commit

Permalink
feat(storagenode): add pipelined Append RPC handler
Browse files Browse the repository at this point in the history
This patch changes the Append RPC handler to support pipelined requests and does not change the
client's API. Therefore, users can use Append API transparently.

Supporting pipelined requests can lead to overhead since it is necessary to have additional
goroutines and concurrent queues. To lower additional overhead, this change uses [reader-biased
mutex](https://github.com/puzpuzpuz/xsync#rbmutex) instead of built-in RWMutex to avoid shared lock
contention. As a result of experimentations, this PR showed very little overhead. Furthermore, we
can improve the existing Append API more efficiently
[using a long-lived stream](https://grpc.io/docs/guides/performance/#general): the current
implementation creates a new stream whenever calling Append API, which leads to unnecessary tasks
such as RPC initiation. We can reuse long-lived streams by changing client API. See this issue at #458.

This PR implements server-side parts of LogStreamAppender mentioned in #433. It also can be used for
pipelining generic Append RPC said in #441.
  • Loading branch information
ijsong committed May 24, 2023
1 parent 82c1e97 commit 80abf51
Show file tree
Hide file tree
Showing 22 changed files with 2,541 additions and 42 deletions.
4 changes: 4 additions & 0 deletions bin/start_varlogsn.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ def start(args: argparse.Namespace) -> None:
if args.ballast_size:
cmd.append(f"--ballast-size={args.ballast_size}")

if args.append_pipeline_size:
cmd.append(f"--append-pipeline-size={args.append_pipeline_size}")

# grpc options
if args.server_read_buffer_size:
cmd.append(
Expand Down Expand Up @@ -268,6 +271,7 @@ def main() -> None:
parser.add_argument("--volumes", nargs="+", required=True, action="extend",
type=str)
parser.add_argument("--ballast-size", type=str)
parser.add_argument("--append-pipeline-size", type=int)

# grpc options
parser.add_argument("--server-read-buffer-size", type=str)
Expand Down
1 change: 1 addition & 0 deletions cmd/varlogsn/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func newStartCommand() *cli.Command {
flagLogStreamExecutorCommitQueueCapacity.IntFlag(false, logstream.DefaultCommitQueueCapacity),
flagLogStreamExecutorReplicateclientQueueCapacity.IntFlag(false, logstream.DefaultReplicateClientQueueCapacity),
flagMaxLogStreamReplicasCount,
flagAppendPipelineSize,

// storage options
flagExperimentalStorageSeparateDB,
Expand Down
14 changes: 14 additions & 0 deletions cmd/varlogsn/flags.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"fmt"

"github.com/urfave/cli/v2"

"github.com/kakao/varlog/internal/flags"
Expand Down Expand Up @@ -39,6 +41,18 @@ var (
Value: storagenode.DefaultMaxLogStreamReplicasCount,
}

flagAppendPipelineSize = &cli.IntFlag{
Name: "append-pipeline-size",
Usage: "Append pipleline size",
Value: storagenode.DefaultAppendPipelineSize,
Action: func(_ *cli.Context, value int) error {
if value < storagenode.MinAppendPipelineSize || value > storagenode.MaxAppendPipelineSize {
return fmt.Errorf("invalid value \"%d\" for flag --append-pipeline-size", value)
}
return nil
},
}

// flags for grpc options.
flagServerReadBufferSize = flags.FlagDesc{
Name: "server-read-buffer-size",
Expand Down
1 change: 1 addition & 0 deletions cmd/varlogsn/varlogsn.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func start(c *cli.Context) error {
logstream.WithReplicateClientQueueCapacity(c.Int(flagLogStreamExecutorReplicateclientQueueCapacity.Name)),
),
storagenode.WithMaxLogStreamReplicasCount(int32(c.Int(flagMaxLogStreamReplicasCount.Name))),
storagenode.WithAppendPipelineSize(int32(c.Int(flagAppendPipelineSize.Name))),
storagenode.WithDefaultStorageOptions(storageOpts...),
storagenode.WithLogger(logger),
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/lib/pq v1.10.9
github.com/pkg/errors v0.9.1
github.com/puzpuzpuz/xsync/v2 v2.4.0
github.com/smartystreets/assertions v1.13.1
github.com/smartystreets/goconvey v1.8.0
github.com/soheilhy/cmux v0.1.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/puzpuzpuz/xsync/v2 v2.4.0 h1:5sXAMHrtx1bg9nbRZTOn8T4MkWe5V+o8yKRH02Eznag=
github.com/puzpuzpuz/xsync/v2 v2.4.0/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
Expand Down
15 changes: 15 additions & 0 deletions internal/storagenode/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ const (
DefaultReplicateClientReadBufferSize = 32 << 10
DefaultReplicateClientWriteBufferSize = 32 << 10
DefaultMaxLogStreamReplicasCount = -1

DefaultAppendPipelineSize = 8
MinAppendPipelineSize = 1
MaxAppendPipelineSize = 16
)

type config struct {
Expand All @@ -44,6 +48,7 @@ type config struct {
replicateClientReadBufferSize int64
replicateClientWriteBufferSize int64
maxLogStreamReplicasCount int32
appendPipelineSize int32
volumes []string
defaultLogStreamExecutorOptions []logstream.ExecutorOption
pprofOpts []pprof.Option
Expand All @@ -59,6 +64,7 @@ func newConfig(opts []Option) (config, error) {
replicateClientReadBufferSize: DefaultReplicateClientReadBufferSize,
replicateClientWriteBufferSize: DefaultReplicateClientWriteBufferSize,
maxLogStreamReplicasCount: DefaultMaxLogStreamReplicasCount,
appendPipelineSize: DefaultAppendPipelineSize,
logger: zap.NewNop(),
}
for _, opt := range opts {
Expand All @@ -85,6 +91,9 @@ func (cfg *config) validate() error {
if err := cfg.validateVolumes(); err != nil {
return fmt.Errorf("storage node: invalid volume: %w", err)
}
if cfg.appendPipelineSize < MinAppendPipelineSize || cfg.appendPipelineSize > MaxAppendPipelineSize {
return fmt.Errorf("storage node: invalid append pipeline size \"%d\"", cfg.appendPipelineSize)
}
return nil
}

Expand Down Expand Up @@ -214,6 +223,12 @@ func WithMaxLogStreamReplicasCount(maxLogStreamReplicasCount int32) Option {
})
}

func WithAppendPipelineSize(appendPipelineSize int32) Option {
return newFuncOption(func(cfg *config) {
cfg.appendPipelineSize = appendPipelineSize
})
}

func WithVolumes(volumes ...string) Option {
return newFuncOption(func(cfg *config) {
cfg.volumes = volumes
Expand Down
165 changes: 136 additions & 29 deletions internal/storagenode/log_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

pbtypes "github.com/gogo/protobuf/types"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -23,55 +24,161 @@ type logServer struct {

var _ snpb.LogIOServer = (*logServer)(nil)

func (ls logServer) Append(stream snpb.LogIO_AppendServer) (err error) {
req, rsp := &snpb.AppendRequest{}, &snpb.AppendResponse{}
func (ls *logServer) Append(stream snpb.LogIO_AppendServer) error {
// Avoid race of Add and Wait of wgAppenders.
rt := ls.sn.mu.RLock()
ls.sn.wgAppenders.Add(2)
ls.sn.mu.RUnlock(rt)

cq := make(chan *logstream.AppendTask, ls.sn.appendPipelineSize)

go ls.appendStreamRecvLoop(stream, cq)

var eg errgroup.Group
eg.Go(func() error {
return ls.appendStreamSendLoop(stream, cq)
})
err := eg.Wait()
// The stream is finished by the client, which invokes CloseSend.
// That result from appendStreamSendLoop is nil means follows:
// - RecvMsg's return value is io.EOF.
// - Completion queue is closed.
// - AppendTasks in the completion queue are exhausted.
if err == nil {
ls.sn.wgAppenders.Done()
return nil
}

// Drain completion queue.
go ls.appendStreamDrainCQLoop(cq)

// The stream is finished by returning io.EOF after calling SendMsg.
if err == io.EOF {
return nil
}

var code codes.Code
switch err {
case verrors.ErrSealed:
code = codes.FailedPrecondition
case snerrors.ErrNotPrimary:
code = codes.Unavailable
default:
code = status.Code(err)
if code == codes.Unknown {
code = status.FromContextError(err).Code()
}

}
return status.Error(code, err.Error())
}

func (ls *logServer) appendStreamRecvLoop(stream snpb.LogIO_AppendServer, cq chan<- *logstream.AppendTask) {
defer func() {
close(cq)
ls.sn.wgAppenders.Done()
}()

var (
appendTask *logstream.AppendTask
lse *logstream.Executor
err error
loaded bool
tpid types.TopicID
lsid types.LogStreamID
)
req := &snpb.AppendRequest{}
ctx := stream.Context()

for {
req.Reset()
err = stream.RecvMsg(req)
if err == io.EOF {
return nil
return
}
appendTask = logstream.NewAppendTask()
if err != nil {
return err
goto Out
}

err = snpb.ValidateTopicLogStream(req)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
if tpid.Invalid() && lsid.Invalid() {
err = snpb.ValidateTopicLogStream(req)
if err != nil {
err = status.Error(codes.InvalidArgument, err.Error())
goto Out
}
tpid = req.TopicID
lsid = req.LogStreamID
}

lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
return status.Error(codes.NotFound, "no such log stream")
if req.TopicID != tpid || req.LogStreamID != lsid {
err = status.Error(codes.InvalidArgument, "unmatched topic or logstream")
goto Out
}
res, err := lse.Append(stream.Context(), req.Payload)
if err != nil {
var code codes.Code
switch err {
case verrors.ErrSealed:
code = codes.FailedPrecondition
case snerrors.ErrNotPrimary:
code = codes.Unavailable
default:
code = status.FromContextError(err).Code()

if lse == nil {
lse, loaded = ls.sn.executors.Load(tpid, lsid)
if !loaded {
err = status.Error(codes.NotFound, "no such log stream")
goto Out
}
return status.Error(code, err.Error())
}

rsp.Results = res
err = stream.Send(rsp)
err = lse.AppendAsync(ctx, req.Payload, appendTask)
Out:
if err != nil {
appendTask.SetError(err)
}
cq <- appendTask
if err != nil {
return err
return
}
}
}

func (ls logServer) Read(context.Context, *snpb.ReadRequest) (*snpb.ReadResponse, error) {
func (ls *logServer) appendStreamSendLoop(stream snpb.LogIO_AppendServer, cq <-chan *logstream.AppendTask) (err error) {
var res []snpb.AppendResult
rsp := &snpb.AppendResponse{}
ctx := stream.Context()

for {
select {
case <-ctx.Done():
return ctx.Err()
case appendTask, ok := <-cq:
if !ok {
return nil
}
res, err = appendTask.WaitForCompletion(ctx)
if err != nil {
appendTask.Release()
return err
}

appendTask.ReleaseWriteWaitGroups()
appendTask.Release()

rsp.Results = res
err = stream.Send(rsp)
if err != nil {
return err
}
}
}
}

func (ls *logServer) appendStreamDrainCQLoop(cq <-chan *logstream.AppendTask) {
defer ls.sn.wgAppenders.Done()
for appendTask := range cq {
appendTask.Release()
}
}

func (ls *logServer) Read(context.Context, *snpb.ReadRequest) (*snpb.ReadResponse, error) {
return nil, status.Error(codes.Unimplemented, "deprecated")
}

func (ls logServer) Subscribe(req *snpb.SubscribeRequest, stream snpb.LogIO_SubscribeServer) error {
func (ls *logServer) Subscribe(req *snpb.SubscribeRequest, stream snpb.LogIO_SubscribeServer) error {
if err := snpb.ValidateTopicLogStream(req); err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -128,7 +235,7 @@ Loop:
return status.Error(status.FromContextError(sr.Err()).Code(), sr.Err().Error())
}

func (ls logServer) SubscribeTo(req *snpb.SubscribeToRequest, stream snpb.LogIO_SubscribeToServer) (err error) {
func (ls *logServer) SubscribeTo(req *snpb.SubscribeToRequest, stream snpb.LogIO_SubscribeToServer) (err error) {
err = snpb.ValidateTopicLogStream(req)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
Expand Down Expand Up @@ -177,7 +284,7 @@ Loop:
return multierr.Append(err, sr.Err())
}

func (ls logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecatedRequest) (*pbtypes.Empty, error) {
func (ls *logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecatedRequest) (*pbtypes.Empty, error) {
ls.sn.executors.Range(func(_ types.LogStreamID, tpid types.TopicID, lse *logstream.Executor) bool {
if req.TopicID != tpid {
return true
Expand All @@ -188,7 +295,7 @@ func (ls logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecated
return &pbtypes.Empty{}, nil
}

func (ls logServer) LogStreamReplicaMetadata(_ context.Context, req *snpb.LogStreamReplicaMetadataRequest) (*snpb.LogStreamReplicaMetadataResponse, error) {
func (ls *logServer) LogStreamReplicaMetadata(_ context.Context, req *snpb.LogStreamReplicaMetadataRequest) (*snpb.LogStreamReplicaMetadataResponse, error) {
if err := snpb.ValidateTopicLogStream(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
Loading

0 comments on commit 80abf51

Please sign in to comment.