Skip to content

Commit

Permalink
Fixed race in ddl commands cleanup (#481)
Browse files Browse the repository at this point in the history
* Fixed race in ddl commands cleanup

* Call cleanup on error
  • Loading branch information
purplefox authored Jul 18, 2022
1 parent 2364c13 commit 26608b4
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 56 deletions.
54 changes: 54 additions & 0 deletions cluster/dragon/logadaptor/logrus_adaptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package logadaptor

import (
"github.com/lni/dragonboat/v3/logger"
log "github.com/sirupsen/logrus"
)

/*
This adaptor allows us to plug the dragonboat logging into the logrus logger we use in Prana.
*/

func init() {
logger.SetLoggerFactory(logrusLogFactory)
}

func logrusLogFactory(pkgName string) logger.ILogger {
return &LogrusILogger{}
}

type LogrusILogger struct {
level logger.LogLevel
}

func (l *LogrusILogger) SetLevel(level logger.LogLevel) {
l.level = level
}

func (l *LogrusILogger) Debugf(format string, args ...interface{}) {
if l.level >= logger.DEBUG {
log.Debugf(format, args...)
}
}

func (l *LogrusILogger) Infof(format string, args ...interface{}) {
if l.level >= logger.INFO {
log.Infof(format, args...)
}
}

func (l *LogrusILogger) Warningf(format string, args ...interface{}) {
if l.level >= logger.WARNING {
log.Warnf(format, args...)
}
}

func (l *LogrusILogger) Errorf(format string, args ...interface{}) {
if l.level >= logger.ERROR {
log.Errorf(format, args...)
}
}

func (l *LogrusILogger) Panicf(format string, args ...interface{}) {
log.Fatalf(format, args...)
}
56 changes: 0 additions & 56 deletions cluster/dragon/logrus_adaptor.go

This file was deleted.

2 changes: 2 additions & 0 deletions command/create_index_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ func (c *CreateIndexCommand) AfterPhase(phase int32) error {
}

func (c *CreateIndexCommand) Cleanup() {
c.lock.Lock()
defer c.lock.Unlock()
if c.indexInfo == nil {
return
}
Expand Down
2 changes: 2 additions & 0 deletions command/create_mv_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ func (c *CreateMVCommand) AfterPhase(phase int32) error {
}

func (c *CreateMVCommand) Cleanup() {
c.lock.Lock()
defer c.lock.Unlock()
if c.mv == nil {
return
}
Expand Down
2 changes: 2 additions & 0 deletions command/create_source_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ func (c *CreateSourceCommand) AfterPhase(phase int32) error {
}

func (c *CreateSourceCommand) Cleanup() {
c.lock.Lock()
defer c.lock.Unlock()
if c.sourceInfo == nil {
return
}
Expand Down
3 changes: 3 additions & 0 deletions command/ddl_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ func (d *DDLCommandRunner) HandleDdlMessage(notification remoting.ClusterMessage
return nil
}
err := com.OnPhase(phase)
if err != nil {
com.Cleanup()
}
if phase == int32(com.NumPhases()-1) {
// Final phase so delete the command
d.commands.Delete(skey)
Expand Down

0 comments on commit 26608b4

Please sign in to comment.