Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(storagenode): add pipelined Append RPC handler
Browse files Browse the repository at this point in the history
This patch changes the Append RPC handler to support pipelined requests and does not change the
client's API. Therefore, users can use Append API transparently.

Supporting pipelined requests can lead to overhead since it is necessary to have additional
goroutines and concurrent queues. To lower additional overhead, this change uses [reader-biased
mutex](https://github.com/puzpuzpuz/xsync#rbmutex) instead of built-in RWMutex to avoid shared lock
contention. As a result of experimentations, this PR showed very little overhead. Furthermore, we
can improve the existing Append API more efficiently
[using a long-lived stream](https://grpc.io/docs/guides/performance/#general): the current
implementation creates a new stream whenever calling Append API, which leads to unnecessary tasks
such as RPC initiation. We can reuse long-lived streams by changing client API. See this issue at #458.

This PR implements server-side parts of LogStreamAppender mentioned in #433. It also can be used for
pipelining generic Append RPC said in #441.
ijsong committed Jun 1, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent dd3061c commit 5bc3f8b
Showing 22 changed files with 2,541 additions and 42 deletions.
4 changes: 4 additions & 0 deletions bin/start_varlogsn.py
Original file line number Diff line number Diff line change
@@ -172,6 +172,9 @@ def start(args: argparse.Namespace) -> None:
if args.ballast_size:
cmd.append(f"--ballast-size={args.ballast_size}")

if args.append_pipeline_size:
cmd.append(f"--append-pipeline-size={args.append_pipeline_size}")

# grpc options
if args.server_read_buffer_size:
cmd.append(
@@ -268,6 +271,7 @@ def main() -> None:
parser.add_argument("--volumes", nargs="+", required=True, action="extend",
type=str)
parser.add_argument("--ballast-size", type=str)
parser.add_argument("--append-pipeline-size", type=int)

# grpc options
parser.add_argument("--server-read-buffer-size", type=str)
1 change: 1 addition & 0 deletions cmd/varlogsn/cli.go
Original file line number Diff line number Diff line change
@@ -57,6 +57,7 @@ func newStartCommand() *cli.Command {
flagLogStreamExecutorCommitQueueCapacity.IntFlag(false, logstream.DefaultCommitQueueCapacity),
flagLogStreamExecutorReplicateclientQueueCapacity.IntFlag(false, logstream.DefaultReplicateClientQueueCapacity),
flagMaxLogStreamReplicasCount,
flagAppendPipelineSize,

// storage options
flagExperimentalStorageSeparateDB,
14 changes: 14 additions & 0 deletions cmd/varlogsn/flags.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"fmt"

"github.com/urfave/cli/v2"

"github.com/kakao/varlog/internal/flags"
@@ -39,6 +41,18 @@ var (
Value: storagenode.DefaultMaxLogStreamReplicasCount,
}

flagAppendPipelineSize = &cli.IntFlag{
Name: "append-pipeline-size",
Usage: "Append pipleline size",
Value: storagenode.DefaultAppendPipelineSize,
Action: func(_ *cli.Context, value int) error {
if value < storagenode.MinAppendPipelineSize || value > storagenode.MaxAppendPipelineSize {
return fmt.Errorf("invalid value \"%d\" for flag --append-pipeline-size", value)
}
return nil
},
}

// flags for grpc options.
flagServerReadBufferSize = flags.FlagDesc{
Name: "server-read-buffer-size",
1 change: 1 addition & 0 deletions cmd/varlogsn/varlogsn.go
Original file line number Diff line number Diff line change
@@ -137,6 +137,7 @@ func start(c *cli.Context) error {
logstream.WithReplicateClientQueueCapacity(c.Int(flagLogStreamExecutorReplicateclientQueueCapacity.Name)),
),
storagenode.WithMaxLogStreamReplicasCount(int32(c.Int(flagMaxLogStreamReplicasCount.Name))),
storagenode.WithAppendPipelineSize(int32(c.Int(flagAppendPipelineSize.Name))),
storagenode.WithDefaultStorageOptions(storageOpts...),
storagenode.WithLogger(logger),
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/lib/pq v1.10.9
github.com/pkg/errors v0.9.1
github.com/puzpuzpuz/xsync/v2 v2.4.0
github.com/smartystreets/assertions v1.13.1
github.com/smartystreets/goconvey v1.8.0
github.com/soheilhy/cmux v0.1.5
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -388,6 +388,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/puzpuzpuz/xsync/v2 v2.4.0 h1:5sXAMHrtx1bg9nbRZTOn8T4MkWe5V+o8yKRH02Eznag=
github.com/puzpuzpuz/xsync/v2 v2.4.0/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
15 changes: 15 additions & 0 deletions internal/storagenode/config.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,10 @@ const (
DefaultReplicateClientReadBufferSize = 32 << 10
DefaultReplicateClientWriteBufferSize = 32 << 10
DefaultMaxLogStreamReplicasCount = -1

DefaultAppendPipelineSize = 8
MinAppendPipelineSize = 1
MaxAppendPipelineSize = 16
)

type config struct {
@@ -44,6 +48,7 @@ type config struct {
replicateClientReadBufferSize int64
replicateClientWriteBufferSize int64
maxLogStreamReplicasCount int32
appendPipelineSize int32
volumes []string
defaultLogStreamExecutorOptions []logstream.ExecutorOption
pprofOpts []pprof.Option
@@ -59,6 +64,7 @@ func newConfig(opts []Option) (config, error) {
replicateClientReadBufferSize: DefaultReplicateClientReadBufferSize,
replicateClientWriteBufferSize: DefaultReplicateClientWriteBufferSize,
maxLogStreamReplicasCount: DefaultMaxLogStreamReplicasCount,
appendPipelineSize: DefaultAppendPipelineSize,
logger: zap.NewNop(),
}
for _, opt := range opts {
@@ -85,6 +91,9 @@ func (cfg *config) validate() error {
if err := cfg.validateVolumes(); err != nil {
return fmt.Errorf("storage node: invalid volume: %w", err)
}
if cfg.appendPipelineSize < MinAppendPipelineSize || cfg.appendPipelineSize > MaxAppendPipelineSize {
return fmt.Errorf("storage node: invalid append pipeline size \"%d\"", cfg.appendPipelineSize)
}
return nil
}

@@ -214,6 +223,12 @@ func WithMaxLogStreamReplicasCount(maxLogStreamReplicasCount int32) Option {
})
}

func WithAppendPipelineSize(appendPipelineSize int32) Option {
return newFuncOption(func(cfg *config) {
cfg.appendPipelineSize = appendPipelineSize
})
}

func WithVolumes(volumes ...string) Option {
return newFuncOption(func(cfg *config) {
cfg.volumes = volumes
165 changes: 136 additions & 29 deletions internal/storagenode/log_server.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (

pbtypes "github.com/gogo/protobuf/types"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

@@ -23,55 +24,161 @@ type logServer struct {

var _ snpb.LogIOServer = (*logServer)(nil)

func (ls logServer) Append(stream snpb.LogIO_AppendServer) (err error) {
req, rsp := &snpb.AppendRequest{}, &snpb.AppendResponse{}
func (ls *logServer) Append(stream snpb.LogIO_AppendServer) error {
// Avoid race of Add and Wait of wgAppenders.
rt := ls.sn.mu.RLock()
ls.sn.wgAppenders.Add(2)
ls.sn.mu.RUnlock(rt)

cq := make(chan *logstream.AppendTask, ls.sn.appendPipelineSize)

go ls.appendStreamRecvLoop(stream, cq)

var eg errgroup.Group
eg.Go(func() error {
return ls.appendStreamSendLoop(stream, cq)
})
err := eg.Wait()
// The stream is finished by the client, which invokes CloseSend.
// That result from appendStreamSendLoop is nil means follows:
// - RecvMsg's return value is io.EOF.
// - Completion queue is closed.
// - AppendTasks in the completion queue are exhausted.
if err == nil {
ls.sn.wgAppenders.Done()
return nil
}

// Drain completion queue.
go ls.appendStreamDrainCQLoop(cq)

// The stream is finished by returning io.EOF after calling SendMsg.
if err == io.EOF {
return nil
}

var code codes.Code
switch err {
case verrors.ErrSealed:
code = codes.FailedPrecondition
case snerrors.ErrNotPrimary:
code = codes.Unavailable
default:
code = status.Code(err)
if code == codes.Unknown {
code = status.FromContextError(err).Code()
}

}
return status.Error(code, err.Error())
}

func (ls *logServer) appendStreamRecvLoop(stream snpb.LogIO_AppendServer, cq chan<- *logstream.AppendTask) {
defer func() {
close(cq)
ls.sn.wgAppenders.Done()
}()

var (
appendTask *logstream.AppendTask
lse *logstream.Executor
err error
loaded bool
tpid types.TopicID
lsid types.LogStreamID
)
req := &snpb.AppendRequest{}
ctx := stream.Context()

for {
req.Reset()
err = stream.RecvMsg(req)
if err == io.EOF {
return nil
return
}
appendTask = logstream.NewAppendTask()
if err != nil {
return err
goto Out
}

err = snpb.ValidateTopicLogStream(req)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
if tpid.Invalid() && lsid.Invalid() {
err = snpb.ValidateTopicLogStream(req)
if err != nil {
err = status.Error(codes.InvalidArgument, err.Error())
goto Out
}
tpid = req.TopicID
lsid = req.LogStreamID
}

lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
return status.Error(codes.NotFound, "no such log stream")
if req.TopicID != tpid || req.LogStreamID != lsid {
err = status.Error(codes.InvalidArgument, "unmatched topic or logstream")
goto Out
}
res, err := lse.Append(stream.Context(), req.Payload)
if err != nil {
var code codes.Code
switch err {
case verrors.ErrSealed:
code = codes.FailedPrecondition
case snerrors.ErrNotPrimary:
code = codes.Unavailable
default:
code = status.FromContextError(err).Code()

if lse == nil {
lse, loaded = ls.sn.executors.Load(tpid, lsid)
if !loaded {
err = status.Error(codes.NotFound, "no such log stream")
goto Out
}
return status.Error(code, err.Error())
}

rsp.Results = res
err = stream.Send(rsp)
err = lse.AppendAsync(ctx, req.Payload, appendTask)
Out:
if err != nil {
appendTask.SetError(err)
}
cq <- appendTask
if err != nil {
return err
return
}
}
}

func (ls logServer) Read(context.Context, *snpb.ReadRequest) (*snpb.ReadResponse, error) {
func (ls *logServer) appendStreamSendLoop(stream snpb.LogIO_AppendServer, cq <-chan *logstream.AppendTask) (err error) {
var res []snpb.AppendResult
rsp := &snpb.AppendResponse{}
ctx := stream.Context()

for {
select {
case <-ctx.Done():
return ctx.Err()
case appendTask, ok := <-cq:
if !ok {
return nil
}
res, err = appendTask.WaitForCompletion(ctx)
if err != nil {
appendTask.Release()
return err
}

appendTask.ReleaseWriteWaitGroups()
appendTask.Release()

rsp.Results = res
err = stream.Send(rsp)
if err != nil {
return err
}
}
}
}

func (ls *logServer) appendStreamDrainCQLoop(cq <-chan *logstream.AppendTask) {
defer ls.sn.wgAppenders.Done()
for appendTask := range cq {
appendTask.Release()
}
}

func (ls *logServer) Read(context.Context, *snpb.ReadRequest) (*snpb.ReadResponse, error) {
return nil, status.Error(codes.Unimplemented, "deprecated")
}

func (ls logServer) Subscribe(req *snpb.SubscribeRequest, stream snpb.LogIO_SubscribeServer) error {
func (ls *logServer) Subscribe(req *snpb.SubscribeRequest, stream snpb.LogIO_SubscribeServer) error {
if err := snpb.ValidateTopicLogStream(req); err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
@@ -128,7 +235,7 @@ Loop:
return status.Error(status.FromContextError(sr.Err()).Code(), sr.Err().Error())
}

func (ls logServer) SubscribeTo(req *snpb.SubscribeToRequest, stream snpb.LogIO_SubscribeToServer) (err error) {
func (ls *logServer) SubscribeTo(req *snpb.SubscribeToRequest, stream snpb.LogIO_SubscribeToServer) (err error) {
err = snpb.ValidateTopicLogStream(req)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
@@ -177,7 +284,7 @@ Loop:
return multierr.Append(err, sr.Err())
}

func (ls logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecatedRequest) (*pbtypes.Empty, error) {
func (ls *logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecatedRequest) (*pbtypes.Empty, error) {
ls.sn.executors.Range(func(_ types.LogStreamID, tpid types.TopicID, lse *logstream.Executor) bool {
if req.TopicID != tpid {
return true
@@ -188,7 +295,7 @@ func (ls logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecated
return &pbtypes.Empty{}, nil
}

func (ls logServer) LogStreamReplicaMetadata(_ context.Context, req *snpb.LogStreamReplicaMetadataRequest) (*snpb.LogStreamReplicaMetadataResponse, error) {
func (ls *logServer) LogStreamReplicaMetadata(_ context.Context, req *snpb.LogStreamReplicaMetadataRequest) (*snpb.LogStreamReplicaMetadataResponse, error) {
if err := snpb.ValidateTopicLogStream(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Loading

0 comments on commit 5bc3f8b

Please sign in to comment.