From 26608b486f42852c5f572f2fb2d69ecf3fcdd562 Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Mon, 18 Jul 2022 10:33:07 +0100 Subject: [PATCH] Fixed race in ddl commands cleanup (#481) * Fixed race in ddl commands cleanup * Call cleanup on error --- cluster/dragon/logadaptor/logrus_adaptor.go | 54 ++++++++++++++++++++ cluster/dragon/logrus_adaptor.go | 56 --------------------- command/create_index_command.go | 2 + command/create_mv_command.go | 2 + command/create_source_command.go | 2 + command/ddl_runner.go | 3 ++ 6 files changed, 63 insertions(+), 56 deletions(-) create mode 100644 cluster/dragon/logadaptor/logrus_adaptor.go delete mode 100644 cluster/dragon/logrus_adaptor.go diff --git a/cluster/dragon/logadaptor/logrus_adaptor.go b/cluster/dragon/logadaptor/logrus_adaptor.go new file mode 100644 index 00000000..18c8f8dc --- /dev/null +++ b/cluster/dragon/logadaptor/logrus_adaptor.go @@ -0,0 +1,54 @@ +package logadaptor + +import ( + "github.com/lni/dragonboat/v3/logger" + log "github.com/sirupsen/logrus" +) + +/* +This adaptor allows us to plug the dragonboat logging into the logrus logger we use in Prana. +*/ + +func init() { + logger.SetLoggerFactory(logrusLogFactory) +} + +func logrusLogFactory(pkgName string) logger.ILogger { + return &LogrusILogger{} +} + +type LogrusILogger struct { + level logger.LogLevel +} + +func (l *LogrusILogger) SetLevel(level logger.LogLevel) { + l.level = level +} + +func (l *LogrusILogger) Debugf(format string, args ...interface{}) { + if l.level >= logger.DEBUG { + log.Debugf(format, args...) + } +} + +func (l *LogrusILogger) Infof(format string, args ...interface{}) { + if l.level >= logger.INFO { + log.Infof(format, args...) + } +} + +func (l *LogrusILogger) Warningf(format string, args ...interface{}) { + if l.level >= logger.WARNING { + log.Warnf(format, args...) + } +} + +func (l *LogrusILogger) Errorf(format string, args ...interface{}) { + if l.level >= logger.ERROR { + log.Errorf(format, args...) + } +} + +func (l *LogrusILogger) Panicf(format string, args ...interface{}) { + log.Fatalf(format, args...) +} diff --git a/cluster/dragon/logrus_adaptor.go b/cluster/dragon/logrus_adaptor.go deleted file mode 100644 index 5ad3cf6b..00000000 --- a/cluster/dragon/logrus_adaptor.go +++ /dev/null @@ -1,56 +0,0 @@ -package dragon - -import ( - "github.com/lni/dragonboat/v3/logger" - log "github.com/sirupsen/logrus" -) - -/* -This adaptor allows us to plug the dragonboat logging into the logrus logger we use in Prana. -*/ - -func init() { - logger.SetLoggerFactory(logrusLogFactory) -} - -func logrusLogFactory(pkgName string) logger.ILogger { - return &logrusILogger{} -} - -type logrusILogger struct { -} - -func (l *logrusILogger) SetLevel(level logger.LogLevel) { - switch level { - case logger.CRITICAL: - log.SetLevel(log.FatalLevel) - case logger.ERROR: - log.SetLevel(log.ErrorLevel) - case logger.WARNING: - log.SetLevel(log.WarnLevel) - case logger.DEBUG: - log.SetLevel(log.DebugLevel) - case logger.INFO: - log.SetLevel(log.InfoLevel) - } -} - -func (l *logrusILogger) Debugf(format string, args ...interface{}) { - log.Debugf(format, args...) -} - -func (l *logrusILogger) Infof(format string, args ...interface{}) { - log.Infof(format, args...) -} - -func (l *logrusILogger) Warningf(format string, args ...interface{}) { - log.Warnf(format, args...) -} - -func (l *logrusILogger) Errorf(format string, args ...interface{}) { - log.Errorf(format, args...) -} - -func (l *logrusILogger) Panicf(format string, args ...interface{}) { - log.Fatalf(format, args...) -} diff --git a/command/create_index_command.go b/command/create_index_command.go index 4a3e9162..cb26ca6e 100644 --- a/command/create_index_command.go +++ b/command/create_index_command.go @@ -148,6 +148,8 @@ func (c *CreateIndexCommand) AfterPhase(phase int32) error { } func (c *CreateIndexCommand) Cleanup() { + c.lock.Lock() + defer c.lock.Unlock() if c.indexInfo == nil { return } diff --git a/command/create_mv_command.go b/command/create_mv_command.go index e0c63011..d71e3eed 100644 --- a/command/create_mv_command.go +++ b/command/create_mv_command.go @@ -218,6 +218,8 @@ func (c *CreateMVCommand) AfterPhase(phase int32) error { } func (c *CreateMVCommand) Cleanup() { + c.lock.Lock() + defer c.lock.Unlock() if c.mv == nil { return } diff --git a/command/create_source_command.go b/command/create_source_command.go index 798b4922..7a2f6c44 100644 --- a/command/create_source_command.go +++ b/command/create_source_command.go @@ -211,6 +211,8 @@ func (c *CreateSourceCommand) AfterPhase(phase int32) error { } func (c *CreateSourceCommand) Cleanup() { + c.lock.Lock() + defer c.lock.Unlock() if c.sourceInfo == nil { return } diff --git a/command/ddl_runner.go b/command/ddl_runner.go index 22fe8869..2fcc9db3 100644 --- a/command/ddl_runner.go +++ b/command/ddl_runner.go @@ -216,6 +216,9 @@ func (d *DDLCommandRunner) HandleDdlMessage(notification remoting.ClusterMessage return nil } err := com.OnPhase(phase) + if err != nil { + com.Cleanup() + } if phase == int32(com.NumPhases()-1) { // Final phase so delete the command d.commands.Delete(skey)