Skip to content

Commit

Permalink
refactor(admin): prevent the large lock from mrmanager
Browse files Browse the repository at this point in the history
This PR changes the role of `internal/admin/mrmanager.(*mrManager).mu`. The large lock in the
mrmanager made the mrmanager run sequentially. However, from now, it only guards the dirty flag for
the cluster metadata. Since the metadata repository works concurrently, adopting a large lock on the
client side is unnecessary.
  • Loading branch information
ijsong committed Oct 4, 2023
1 parent 2480922 commit efe2d44
Showing 1 changed file with 44 additions and 107 deletions.
151 changes: 44 additions & 107 deletions internal/admin/mrmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ var (

type mrManager struct {
config

mu sync.RWMutex
connector mrconnector.Connector

sfg singleflight.Group
sfg singleflight.Group
// mu guards two fields - dirty and updated.
mu sync.RWMutex
dirty bool
updated time.Time
meta *varlogpb.MetadataDescriptor
// meta is a cached metadata descriptor.
meta *varlogpb.MetadataDescriptor
}

const (
Expand Down Expand Up @@ -145,9 +145,6 @@ func (mrm *mrManager) mc() (mrc.MetadataRepositoryManagementClient, error) {
}

func (mrm *mrManager) Close() error {
mrm.mu.Lock()
defer mrm.mu.Unlock()

return errors.Wrap(mrm.connector.Close(), "mrmanager")
}

Expand All @@ -171,14 +168,6 @@ func (mrm *mrManager) clusterMetadata(ctx context.Context) (*varlogpb.MetadataDe
}

func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta *varlogpb.StorageNodeDescriptor) (err error) {
mrm.mu.Lock()
defer func() {
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

cli, err := mrm.c()
if err != nil {
return errors.WithMessage(err, "mrmanager: not accessible")
Expand All @@ -190,18 +179,11 @@ func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta *
return err
}

return err
mrm.setDirty()
return nil
}

func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID types.StorageNodeID) (err error) {
mrm.mu.Lock()
defer func() {
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID types.StorageNodeID) error {
cli, err := mrm.c()
if err != nil {
return errors.WithMessage(err, "mrmanager: not accessible")
Expand All @@ -213,18 +195,11 @@ func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID t
return err
}

return err
mrm.setDirty()
return nil
}

func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) (err error) {
mrm.mu.Lock()
defer func() {
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) error {
cli, err := mrm.c()
if err != nil {
return errors.WithMessage(err, "mrmanager: not accessible")
Expand All @@ -236,18 +211,11 @@ func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID)
return err
}

return err
mrm.setDirty()
return nil
}

func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID) (err error) {
mrm.mu.Lock()
defer func() {
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID) error {
cli, err := mrm.c()
if err != nil {
return errors.WithMessage(err, "mrmanager: not accessible")
Expand All @@ -259,18 +227,11 @@ func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID
return err
}

return err
mrm.setDirty()
return nil
}

func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) (err error) {
mrm.mu.Lock()
defer func() {
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) error {
cli, err := mrm.c()
if err != nil {
return errors.WithMessage(err, "mrmanager: not accessible")
Expand All @@ -281,18 +242,12 @@ func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varl
_ = cli.Close()
return err
}

mrm.setDirty()
return nil
}

func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types.LogStreamID) (err error) {
mrm.mu.Lock()
defer func() {
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types.LogStreamID) error {
cli, err := mrm.c()
if err != nil {
return errors.WithMessage(err, "mrmanager: not accessible")
Expand All @@ -303,18 +258,12 @@ func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types
_ = cli.Close()
return err
}
return err
}

func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) (err error) {
mrm.mu.Lock()
defer func() {
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()
mrm.setDirty()
return nil
}

func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) error {
cli, err := mrm.c()
if err != nil {
return errors.WithMessage(err, "mrmanager: not accessible")
Expand All @@ -325,40 +274,29 @@ func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlog
_ = cli.Close()
return err
}
return err

mrm.setDirty()
return nil
}

// It implements MetadataRepositoryManager.Seal method.
func (mrm *mrManager) Seal(ctx context.Context, logStreamID types.LogStreamID) (lastCommittedGLSN types.GLSN, err error) {
mrm.mu.Lock()
defer func() {
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

func (mrm *mrManager) Seal(ctx context.Context, logStreamID types.LogStreamID) (types.GLSN, error) {
cli, err := mrm.c()
if err != nil {
return types.InvalidGLSN, errors.WithMessage(err, "mrmanager: not accessible")
}

if lastCommittedGLSN, err = cli.Seal(ctx, logStreamID); err != nil {
lastCommittedGLSN, err := cli.Seal(ctx, logStreamID)
if err != nil {
_ = cli.Close()
return types.InvalidGLSN, err
}
return lastCommittedGLSN, err
}

func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) (err error) {
mrm.mu.Lock()
defer func() {
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()
mrm.setDirty()
return lastCommittedGLSN, nil
}

func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) error {
cli, err := mrm.c()
if err != nil {
return errors.WithMessage(err, "mrmanager: not accessible")
Expand All @@ -369,13 +307,12 @@ func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID)
_ = cli.Close()
return err
}
return err

mrm.setDirty()
return nil
}

func (mrm *mrManager) GetClusterInfo(ctx context.Context) (*mrpb.ClusterInfo, error) {
mrm.mu.RLock()
defer mrm.mu.RUnlock()

cli, err := mrm.mc()
if err != nil {
return nil, errors.WithMessage(err, "mrmanager: not accessible")
Expand All @@ -386,13 +323,10 @@ func (mrm *mrManager) GetClusterInfo(ctx context.Context) (*mrpb.ClusterInfo, er
_ = cli.Close()
return nil, err
}
return rsp.GetClusterInfo(), err
return rsp.GetClusterInfo(), nil
}

func (mrm *mrManager) AddPeer(ctx context.Context, nodeID types.NodeID, peerURL, rpcURL string) error {
mrm.mu.Lock()
defer mrm.mu.Unlock()

cli, err := mrm.mc()
if err != nil {
return errors.WithMessage(err, "mrmanager: not accessible")
Expand All @@ -411,9 +345,6 @@ func (mrm *mrManager) AddPeer(ctx context.Context, nodeID types.NodeID, peerURL,
}

func (mrm *mrManager) RemovePeer(ctx context.Context, nodeID types.NodeID) error {
mrm.mu.Lock()
defer mrm.mu.Unlock()

cli, err := mrm.mc()
if err != nil {
return errors.WithMessage(err, "mrmanager: not accessible")
Expand Down Expand Up @@ -473,3 +404,9 @@ func (mrm *mrManager) StorageNode(ctx context.Context, storageNodeID types.Stora
func (mrm *mrManager) NumberOfMR() int {
return mrm.connector.NumberOfMR()
}

func (mrm *mrManager) setDirty() {
mrm.mu.Lock()
mrm.dirty = true
mrm.mu.Unlock()
}

0 comments on commit efe2d44

Please sign in to comment.