From 4567ba507e527016dcde4b22b9468462b9f57c6e Mon Sep 17 00:00:00 2001 From: jiweixiao <43225348@qq.com> Date: Mon, 12 Aug 2024 19:07:20 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9C=A8SQL=E5=AE=A1?= =?UTF-8?q?=E6=A0=B8=E9=98=B6=E6=AE=B5=E8=87=AA=E5=8A=A8=E8=AF=86=E5=88=AB?= =?UTF-8?q?=E5=B9=B6=E5=90=88=E5=B9=B6=E7=9B=B8=E5=90=8C=E8=A1=A8=E7=9A=84?= =?UTF-8?q?alter=20table=E8=AF=AD=E5=8F=A5=E7=9A=84=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- session/common.go | 3 + session/inception_result.go | 13 +++- session/session.go | 12 ++++ session/session_inception.go | 116 +++++++++++++++++++++++++++++++++-- 4 files changed, 139 insertions(+), 5 deletions(-) diff --git a/session/common.go b/session/common.go index 2b2ddb6b3..4f40a7a96 100644 --- a/session/common.go +++ b/session/common.go @@ -159,6 +159,9 @@ type SourceOptions struct { // // 扩展参数,支持一次性会话设置 // extendParams string + + // jwx added 查看提交的SQL中是否有一些可以合并成一条 + checkMerge bool } // ExplainInfo 执行计划信息 diff --git a/session/inception_result.go b/session/inception_result.go index 1d4a14746..eb9804890 100644 --- a/session/inception_result.go +++ b/session/inception_result.go @@ -297,7 +297,7 @@ func NewRecordSets() *MyRecordSets { fieldCount: 0, } - rc.fields = make([]*ast.ResultField, 12) + rc.fields = make([]*ast.ResultField, 13) // 序号 rc.CreateFiled("order_id", mysql.TypeLong) @@ -321,6 +321,8 @@ func NewRecordSets() *MyRecordSets { rc.CreateFiled("sqlsha1", mysql.TypeString) // 备份用时 rc.CreateFiled("backup_time", mysql.TypeString) + // SQL语句类型(select, alter, create index .....) + rc.CreateFiled("type", mysql.TypeString) t.rc = rc return t @@ -394,6 +396,15 @@ func (s *MyRecordSets) setFields(r *Record) { row[11].SetString(r.BackupCostTime) } + _, isAlterTable := r.Type.(*ast.AlterTableStmt) + _, isCreateIndex := r.Type.(*ast.CreateIndexStmt) + _, isDropIndex := r.Type.(*ast.DropIndexStmt) + if isAlterTable || isCreateIndex || isDropIndex { + row[12].SetString("alterTable") + } else { + row[12].SetNull() + } + s.rc.data[s.rc.count] = row s.rc.count++ } diff --git a/session/session.go b/session/session.go index d8de3f24f..f7eb3078a 100644 --- a/session/session.go +++ b/session/session.go @@ -135,7 +135,19 @@ func (h *StmtHistory) Count() int { return len(h.history) } +// jwx added +type alterTableInfo struct { + Name string + alterCount int + alterStmtList []ast.AlterTableStmt + mergedSql string +} + type session struct { + + //jwx added + alterTableInfoList []alterTableInfo + // processInfo is used by ShowProcess(), and should be modified atomically. processInfo atomic.Value txn TxnState diff --git a/session/session_inception.go b/session/session_inception.go index bb7c0805f..65cf3f0dc 100644 --- a/session/session_inception.go +++ b/session/session_inception.go @@ -318,6 +318,30 @@ func (s *session) executeInc(ctx context.Context, sql string) (recordSets []sqle s.initDisableTypes() continue case *ast.InceptionCommitStmt: + /******* jwx added 将对同一个表的多条alter语句合并成一条 ******/ + if s.opt.checkMerge { + for _, info := range s.alterTableInfoList { + merged := info.alterStmtList[0] + for seq, alterStmt := range info.alterStmtList { + if seq > 0 { + merged.Specs = append(merged.Specs, alterStmt.Specs...) + } + } + var builder strings.Builder + _ = merged.Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, &builder)) + info.mergedSql = builder.String() + mergedRecord := &Record{ + Sql: info.mergedSql, + Buf: new(bytes.Buffer), + Type: &merged, + Stage: StageCheck, + ErrorMessage: "MERGED", + } + + s.recordSets.Append(mergedRecord) + } + } + /****************/ if !s.haveBegin { s.appendErrorMsg("Must start as begin statement.") @@ -629,11 +653,24 @@ func (s *session) processCommand(ctx context.Context, stmtNode ast.StmtNode, if node.KeyType == ast.IndexKeyTypeFullText { tp = ast.ConstraintFulltext } - s.checkCreateIndex(node.Table, node.IndexName, - node.IndexColNames, node.IndexOption, nil, node.Unique, tp) + if s.opt == nil || !s.opt.checkMerge { // jwx added + s.checkCreateIndex(node.Table, node.IndexName, + node.IndexColNames, node.IndexOption, nil, node.Unique, tp) + } else { + alter := s.convertCreateIndexToAddConstrain(node) + s.checkAlterTable(alter, node.Text()) + s.checkCreateIndex(node.Table, node.IndexName, + node.IndexColNames, node.IndexOption, nil, node.Unique, tp) + } case *ast.DropIndexStmt: - s.checkDropIndex(node, currentSql) + if s.opt == nil || !s.opt.checkMerge { // jwx added + s.checkDropIndex(node, currentSql) + } else { + alter := s.convertDropIndexToDropIndex(node) + s.checkAlterTable(alter, node.Text()) + s.checkDropIndex(node, currentSql) + } case *ast.CreateViewStmt: s.checkCreateView(node, currentSql) @@ -2134,7 +2171,8 @@ func (s *session) parseOptions(sql string) { sslKey: viper.GetString("sslKey"), // 开启事务功能,设置一次提交多少记录 - tranBatch: viper.GetInt("trans"), + tranBatch: viper.GetInt("trans"), + checkMerge: viper.GetBool("checkMerge"), // jwx added } if s.opt.split || s.opt.Check || s.opt.Print || s.opt.Masking { @@ -3310,6 +3348,30 @@ func (s *session) checkAlterTable(node *ast.AlterTableStmt, sql string) { return } + /*********** jwx added **********/ + if s.opt != nil && s.opt.checkMerge { + tableNameInString := fmt.Sprintf("%s.%s", node.Table.Schema.O, node.Table.Name.O) + var found bool = false + var seq int = 0 + for j, i := range s.alterTableInfoList { + if tableNameInString == i.Name { + found = true + seq = j + break + } + } + if found { + s.alterTableInfoList[seq].alterCount++ + s.alterTableInfoList[seq].alterStmtList = append(s.alterTableInfoList[seq].alterStmtList, *node) + } else { + var info alterTableInfo = alterTableInfo{Name: tableNameInString, alterCount: 0} + info.alterStmtList = append(info.alterStmtList, *node) + s.alterTableInfoList = append(s.alterTableInfoList, info) + } + return + } + /******************************/ + table.AlterCount += 1 if table.AlterCount > 1 { @@ -5508,6 +5570,52 @@ func (s *session) checkAddConstraint(t *TableInfo, c *ast.AlterTableSpec) { } } +func (s *session) convertCreateIndexToAddConstrain(node *ast.CreateIndexStmt) *ast.AlterTableStmt { + log.Debug("convertCreateIndexToAddConstrain") + var alter *ast.AlterTableStmt = &ast.AlterTableStmt{Specs: []*ast.AlterTableSpec{}} + var spec *ast.AlterTableSpec = &ast.AlterTableSpec{Tp: ast.AlterTableAddConstraint, Constraint: &ast.Constraint{}} + spec.IfNotExists = node.IfNotExists + spec.Constraint.Name = node.IndexName + if node.Unique { + spec.Constraint.Tp = ast.ConstraintUniq + } else { + spec.Constraint.Tp = ast.ConstraintIndex + } + spec.Constraint.Keys = node.IndexColNames + spec.Constraint.Option = node.IndexOption + if node.LockAlg != nil { + spec.LockType = node.LockAlg.LockTp + spec.Algorithm = node.LockAlg.AlgorithmTp + } else { + spec.LockType = 0 + spec.Algorithm = 0 + } + spec.Partition = node.Partition + alter.SetText(node.Text()) + alter.Table = node.Table + alter.Specs = append(alter.Specs, spec) + return alter +} + +func (s *session) convertDropIndexToDropIndex(node *ast.DropIndexStmt) *ast.AlterTableStmt { + log.Debug("convertDropIndexToDropIndex") + var alter *ast.AlterTableStmt = &ast.AlterTableStmt{Specs: []*ast.AlterTableSpec{}} + var spec *ast.AlterTableSpec = &ast.AlterTableSpec{Tp: ast.AlterTableDropIndex} + spec.IfExists = node.IfExists + spec.Name = node.IndexName + if node.LockAlg != nil { + spec.LockType = node.LockAlg.LockTp + spec.Algorithm = node.LockAlg.AlgorithmTp + } else { + spec.LockType = 0 + spec.Algorithm = 0 + } + alter.SetText(node.Text()) + alter.Table = node.Table + alter.Specs = append(alter.Specs, spec) + return alter +} + func (s *session) checkDBExists(db string, reportNotExists bool) bool { if db == "" { From d4b2c627a188bd9fc93dd20b333eeb68254ceb6c Mon Sep 17 00:00:00 2001 From: jiweixiao <43225348@qq.com> Date: Mon, 12 Aug 2024 20:55:36 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=86convertCreateInd?= =?UTF-8?q?exToAddConstrain=E5=92=8CconvertDropIndexToDropIndex=E6=96=B9?= =?UTF-8?q?=E6=B3=95=E7=9A=84=E6=96=B9=E6=B3=95=E5=90=8D=E3=80=82=20checkA?= =?UTF-8?q?lterTable=E6=96=B9=E6=B3=95=E5=A2=9E=E5=8A=A0=E4=BA=86mergeOnly?= =?UTF-8?q?=E5=8F=82=E6=95=B0=EF=BC=8C=E8=AE=BE=E7=BD=AE=E4=B8=BAtrue?= =?UTF-8?q?=E6=97=B6=E4=B8=8D=E8=BF=9B=E8=A1=8C=E5=90=8E=E7=BB=AD=E7=9A=84?= =?UTF-8?q?=E6=A3=80=E6=9F=A5=E3=80=82=20=E5=8E=BB=E6=8E=89=E4=BA=86alterT?= =?UTF-8?q?ableInfo=E4=B8=AD=E7=9A=84alterCount=E5=B1=9E=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- session/session_inception.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/session/session_inception.go b/session/session_inception.go index 65cf3f0dc..9ec4cda6d 100644 --- a/session/session_inception.go +++ b/session/session_inception.go @@ -630,7 +630,7 @@ func (s *session) processCommand(ctx context.Context, stmtNode ast.StmtNode, case *ast.CreateTableStmt: s.checkCreateTable(node, currentSql) case *ast.AlterTableStmt: - s.checkAlterTable(node, currentSql) + s.checkAlterTable(node, currentSql, false) case *ast.DropTableStmt: s.checkDropTable(node, currentSql) case *ast.RenameTableStmt: @@ -657,8 +657,8 @@ func (s *session) processCommand(ctx context.Context, stmtNode ast.StmtNode, s.checkCreateIndex(node.Table, node.IndexName, node.IndexColNames, node.IndexOption, nil, node.Unique, tp) } else { - alter := s.convertCreateIndexToAddConstrain(node) - s.checkAlterTable(alter, node.Text()) + alter := s.convertCreateIndexToAlterTable(node) + s.checkAlterTable(alter, node.Text(), true) s.checkCreateIndex(node.Table, node.IndexName, node.IndexColNames, node.IndexOption, nil, node.Unique, tp) } @@ -667,8 +667,8 @@ func (s *session) processCommand(ctx context.Context, stmtNode ast.StmtNode, if s.opt == nil || !s.opt.checkMerge { // jwx added s.checkDropIndex(node, currentSql) } else { - alter := s.convertDropIndexToDropIndex(node) - s.checkAlterTable(alter, node.Text()) + alter := s.convertDropIndexToAlterTable(node) + s.checkAlterTable(alter, node.Text(), true) s.checkDropIndex(node, currentSql) } @@ -3332,7 +3332,7 @@ func (s *session) checkTableCharsetCollation(character, collation string) { } } -func (s *session) checkAlterTable(node *ast.AlterTableStmt, sql string) { +func (s *session) checkAlterTable(node *ast.AlterTableStmt, sql string, mergeOnly bool) { log.Debug("checkAlterTable") if node.Table.Schema.O == "" { @@ -3361,14 +3361,15 @@ func (s *session) checkAlterTable(node *ast.AlterTableStmt, sql string) { } } if found { - s.alterTableInfoList[seq].alterCount++ s.alterTableInfoList[seq].alterStmtList = append(s.alterTableInfoList[seq].alterStmtList, *node) } else { - var info alterTableInfo = alterTableInfo{Name: tableNameInString, alterCount: 0} + var info alterTableInfo = alterTableInfo{Name: tableNameInString} info.alterStmtList = append(info.alterStmtList, *node) s.alterTableInfoList = append(s.alterTableInfoList, info) } - return + if mergeOnly { + return + } } /******************************/ @@ -5570,8 +5571,8 @@ func (s *session) checkAddConstraint(t *TableInfo, c *ast.AlterTableSpec) { } } -func (s *session) convertCreateIndexToAddConstrain(node *ast.CreateIndexStmt) *ast.AlterTableStmt { - log.Debug("convertCreateIndexToAddConstrain") +func (s *session) convertCreateIndexToAlterTable(node *ast.CreateIndexStmt) *ast.AlterTableStmt { + log.Debug("convertCreateIndexToAlterTable") var alter *ast.AlterTableStmt = &ast.AlterTableStmt{Specs: []*ast.AlterTableSpec{}} var spec *ast.AlterTableSpec = &ast.AlterTableSpec{Tp: ast.AlterTableAddConstraint, Constraint: &ast.Constraint{}} spec.IfNotExists = node.IfNotExists @@ -5597,8 +5598,8 @@ func (s *session) convertCreateIndexToAddConstrain(node *ast.CreateIndexStmt) *a return alter } -func (s *session) convertDropIndexToDropIndex(node *ast.DropIndexStmt) *ast.AlterTableStmt { - log.Debug("convertDropIndexToDropIndex") +func (s *session) convertDropIndexToAlterTable(node *ast.DropIndexStmt) *ast.AlterTableStmt { + log.Debug("convertDropIndexToAlterTable") var alter *ast.AlterTableStmt = &ast.AlterTableStmt{Specs: []*ast.AlterTableSpec{}} var spec *ast.AlterTableSpec = &ast.AlterTableSpec{Tp: ast.AlterTableDropIndex} spec.IfExists = node.IfExists From bfbeb8b72150e3acc39847f88e0f343f9bec2ee8 Mon Sep 17 00:00:00 2001 From: jiweixiao <43225348@qq.com> Date: Tue, 13 Aug 2024 15:43:48 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E5=B0=86=E5=8F=82=E6=95=B0=E5=90=8D?= =?UTF-8?q?=E6=94=B9=E4=B8=BAalter-auto-merge,=20=E4=BB=A5=E4=BE=BF?= =?UTF-8?q?=E5=92=8Ccheck-*=E5=8F=82=E6=95=B0=E5=8C=BA=E5=88=86=E5=BC=80?= =?UTF-8?q?=20=E5=8F=82=E6=95=B0=E4=BB=8ESession=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E5=88=B0=E4=BA=86config.Inc=EF=BC=8C=E5=8F=AF=E4=BB=A5?= =?UTF-8?q?=E5=8A=A8=E6=80=81=E9=85=8D=E7=BD=AE=20=E5=B0=86recordSets?= =?UTF-8?q?=E8=BF=94=E5=9B=9E=E7=9A=84type=E5=AD=97=E6=AE=B5=E6=94=B9?= =?UTF-8?q?=E4=B8=BAneedMerge=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 1 + session/common.go | 3 --- session/inception_result.go | 20 +++++++++++--------- session/session.go | 1 - session/session_inception.go | 11 +++++------ session/tidb.go | 3 ++- 6 files changed, 19 insertions(+), 20 deletions(-) diff --git a/config/config.go b/config/config.go index 84ce2fedd..de15bb05d 100644 --- a/config/config.go +++ b/config/config.go @@ -221,6 +221,7 @@ type Binlog struct { // Inc is the inception section of the config. type Inc struct { + AlterAutoMerge bool `toml:"alter_auto_merge" json:"alter_auto_merge"` BackupHost string `toml:"backup_host" json:"backup_host"` // 远程备份库信息 BackupPassword string `toml:"backup_password" json:"backup_password"` BackupPort uint `toml:"backup_port" json:"backup_port"` diff --git a/session/common.go b/session/common.go index 4f40a7a96..2b2ddb6b3 100644 --- a/session/common.go +++ b/session/common.go @@ -159,9 +159,6 @@ type SourceOptions struct { // // 扩展参数,支持一次性会话设置 // extendParams string - - // jwx added 查看提交的SQL中是否有一些可以合并成一条 - checkMerge bool } // ExplainInfo 执行计划信息 diff --git a/session/inception_result.go b/session/inception_result.go index eb9804890..7e3a759b2 100644 --- a/session/inception_result.go +++ b/session/inception_result.go @@ -321,8 +321,8 @@ func NewRecordSets() *MyRecordSets { rc.CreateFiled("sqlsha1", mysql.TypeString) // 备份用时 rc.CreateFiled("backup_time", mysql.TypeString) - // SQL语句类型(select, alter, create index .....) - rc.CreateFiled("type", mysql.TypeString) + // 该语句是否是需要被合并的(只有 alter table, create index, drop index三种语句需要被合并),需要为1,不需要为0,已经被合并过的SQL会被设置为2 + rc.CreateFiled("needMerge", mysql.TypeTiny) t.rc = rc return t @@ -396,13 +396,15 @@ func (s *MyRecordSets) setFields(r *Record) { row[11].SetString(r.BackupCostTime) } - _, isAlterTable := r.Type.(*ast.AlterTableStmt) - _, isCreateIndex := r.Type.(*ast.CreateIndexStmt) - _, isDropIndex := r.Type.(*ast.DropIndexStmt) - if isAlterTable || isCreateIndex || isDropIndex { - row[12].SetString("alterTable") - } else { - row[12].SetNull() + switch r.Type.(type) { + case *ast.AlterTableStmt, *ast.CreateIndexStmt, *ast.DropIndexStmt: + if r.ErrorMessage == "MERGED" { + row[12].SetValue(2) + } else { + row[12].SetValue(1) + } + default: + row[12].SetValue(0) } s.rc.data[s.rc.count] = row diff --git a/session/session.go b/session/session.go index f7eb3078a..0584fa424 100644 --- a/session/session.go +++ b/session/session.go @@ -138,7 +138,6 @@ func (h *StmtHistory) Count() int { // jwx added type alterTableInfo struct { Name string - alterCount int alterStmtList []ast.AlterTableStmt mergedSql string } diff --git a/session/session_inception.go b/session/session_inception.go index 9ec4cda6d..5c879f70a 100644 --- a/session/session_inception.go +++ b/session/session_inception.go @@ -319,7 +319,7 @@ func (s *session) executeInc(ctx context.Context, sql string) (recordSets []sqle continue case *ast.InceptionCommitStmt: /******* jwx added 将对同一个表的多条alter语句合并成一条 ******/ - if s.opt.checkMerge { + if s.inc.AlterAutoMerge { for _, info := range s.alterTableInfoList { merged := info.alterStmtList[0] for seq, alterStmt := range info.alterStmtList { @@ -653,7 +653,7 @@ func (s *session) processCommand(ctx context.Context, stmtNode ast.StmtNode, if node.KeyType == ast.IndexKeyTypeFullText { tp = ast.ConstraintFulltext } - if s.opt == nil || !s.opt.checkMerge { // jwx added + if !s.inc.AlterAutoMerge { // jwx added s.checkCreateIndex(node.Table, node.IndexName, node.IndexColNames, node.IndexOption, nil, node.Unique, tp) } else { @@ -664,7 +664,7 @@ func (s *session) processCommand(ctx context.Context, stmtNode ast.StmtNode, } case *ast.DropIndexStmt: - if s.opt == nil || !s.opt.checkMerge { // jwx added + if !s.inc.AlterAutoMerge { // jwx added s.checkDropIndex(node, currentSql) } else { alter := s.convertDropIndexToAlterTable(node) @@ -2171,8 +2171,7 @@ func (s *session) parseOptions(sql string) { sslKey: viper.GetString("sslKey"), // 开启事务功能,设置一次提交多少记录 - tranBatch: viper.GetInt("trans"), - checkMerge: viper.GetBool("checkMerge"), // jwx added + tranBatch: viper.GetInt("trans"), } if s.opt.split || s.opt.Check || s.opt.Print || s.opt.Masking { @@ -3349,7 +3348,7 @@ func (s *session) checkAlterTable(node *ast.AlterTableStmt, sql string, mergeOnl } /*********** jwx added **********/ - if s.opt != nil && s.opt.checkMerge { + if s.inc.AlterAutoMerge { tableNameInString := fmt.Sprintf("%s.%s", node.Table.Schema.O, node.Table.Name.O) var found bool = false var seq int = 0 diff --git a/session/tidb.go b/session/tidb.go index 9df1c734e..78ecff67d 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -238,7 +238,8 @@ func RegisterStore(name string, driver kv.Driver) error { // session.Open() but with the dbname cut off. // Examples: // goleveldb://relative/path -// boltdb:///absolute/path + +// boltdb:///absolute/path // // The engine should be registered before creating storage. func NewStore(path string) (kv.Storage, error) { From ff687572d164890bdde4e2cf443c903c71f6c5b7 Mon Sep 17 00:00:00 2001 From: jiweixiao <43225348@qq.com> Date: Thu, 15 Aug 2024 15:17:01 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=BF=94=E5=9B=9E?= =?UTF-8?q?=E5=80=BC=E4=B8=AD=E6=9C=80=E5=90=8E=E4=B8=80=E4=B8=AA=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=EF=BC=88=E5=8D=B3needMerge=E6=98=AF=E5=90=A6=E9=9C=80?= =?UTF-8?q?=E8=A6=81=E5=90=88=E5=B9=B6=E8=AF=AD=E5=8F=A5=EF=BC=89=E7=9A=84?= =?UTF-8?q?=E5=90=AB=E4=B9=89=E3=80=820=E4=B8=BA=E4=B8=8D=E9=9C=80?= =?UTF-8?q?=E8=A6=81=EF=BC=8C-1=E4=BB=A3=E8=A1=A8=E8=AF=A5=E8=A1=8C?= =?UTF-8?q?=E4=B8=BA=E5=90=88=E5=B9=B6=E5=90=8E=E7=9A=84SQL=EF=BC=8C?= =?UTF-8?q?=E9=9C=80=E8=A6=81=E5=90=88=E5=B9=B6=E7=9A=84=E8=A1=8CneedMerge?= =?UTF-8?q?=E5=80=BC=E4=B8=BA=E5=90=88=E5=B9=B6=E5=90=8E=E7=9A=84SQL?= =?UTF-8?q?=E8=A1=8C=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- session/inception_result.go | 16 +++++---------- session/session.go | 7 ++++--- session/session_inception.go | 39 ++++++++++++++++++++++-------------- 3 files changed, 33 insertions(+), 29 deletions(-) diff --git a/session/inception_result.go b/session/inception_result.go index 7e3a759b2..ec8abed94 100644 --- a/session/inception_result.go +++ b/session/inception_result.go @@ -97,6 +97,9 @@ type Record struct { // delete多表时,默认delete后第一个表为主表,其余表才会记录到该处 // 仅在发现多表操作时,初始化该参数 MultiTables map[string]*TableInfo + + // 判断该语句是否是需要被合并的(只有 alter table, create index, drop index三种语句需要被合并),不需要为0,已经被合并过的SQL会被设置为-1,需要的数字为对应的合并后的SQL的行号 + NeedMerge int } func (r *Record) appendWarningMessage(msg string) { @@ -321,7 +324,7 @@ func NewRecordSets() *MyRecordSets { rc.CreateFiled("sqlsha1", mysql.TypeString) // 备份用时 rc.CreateFiled("backup_time", mysql.TypeString) - // 该语句是否是需要被合并的(只有 alter table, create index, drop index三种语句需要被合并),需要为1,不需要为0,已经被合并过的SQL会被设置为2 + // 判断该语句是否是需要被合并的(只有 alter table, create index, drop index三种语句需要被合并),不需要为0,已经被合并过的SQL会被设置为-1,需要的数字为对应的合并后的SQL的行号 rc.CreateFiled("needMerge", mysql.TypeTiny) t.rc = rc @@ -396,16 +399,7 @@ func (s *MyRecordSets) setFields(r *Record) { row[11].SetString(r.BackupCostTime) } - switch r.Type.(type) { - case *ast.AlterTableStmt, *ast.CreateIndexStmt, *ast.DropIndexStmt: - if r.ErrorMessage == "MERGED" { - row[12].SetValue(2) - } else { - row[12].SetValue(1) - } - default: - row[12].SetValue(0) - } + row[12].SetValue(r.NeedMerge) s.rc.data[s.rc.count] = row s.rc.count++ diff --git a/session/session.go b/session/session.go index 0584fa424..80227fef1 100644 --- a/session/session.go +++ b/session/session.go @@ -137,9 +137,10 @@ func (h *StmtHistory) Count() int { // jwx added type alterTableInfo struct { - Name string - alterStmtList []ast.AlterTableStmt - mergedSql string + Name string + alterStmtList []ast.AlterTableStmt + mergedSql string + recordSetsPosList []int // 记录当前语句在s.recordSets里的位置,用于修改needMerge字段 } type session struct { diff --git a/session/session_inception.go b/session/session_inception.go index 5c879f70a..007cac74b 100644 --- a/session/session_inception.go +++ b/session/session_inception.go @@ -321,24 +321,30 @@ func (s *session) executeInc(ctx context.Context, sql string) (recordSets []sqle /******* jwx added 将对同一个表的多条alter语句合并成一条 ******/ if s.inc.AlterAutoMerge { for _, info := range s.alterTableInfoList { - merged := info.alterStmtList[0] - for seq, alterStmt := range info.alterStmtList { - if seq > 0 { - merged.Specs = append(merged.Specs, alterStmt.Specs...) + if len(info.alterStmtList) >= 2 { + merged := info.alterStmtList[0] + for seq, alterStmt := range info.alterStmtList { + if seq > 0 { + merged.Specs = append(merged.Specs, alterStmt.Specs...) + } + } + var builder strings.Builder + _ = merged.Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, &builder)) + info.mergedSql = builder.String() + mergedRecord := &Record{ + Sql: info.mergedSql, + Buf: new(bytes.Buffer), + Type: &merged, + Stage: StageCheck, + ErrorMessage: "MERGED", + NeedMerge: -1, + } + s.recordSets.Append(mergedRecord) + for _, pos := range info.recordSetsPosList { + s.recordSets.records[pos].NeedMerge = s.recordSets.SeqNo } - } - var builder strings.Builder - _ = merged.Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, &builder)) - info.mergedSql = builder.String() - mergedRecord := &Record{ - Sql: info.mergedSql, - Buf: new(bytes.Buffer), - Type: &merged, - Stage: StageCheck, - ErrorMessage: "MERGED", } - s.recordSets.Append(mergedRecord) } } /****************/ @@ -3361,11 +3367,14 @@ func (s *session) checkAlterTable(node *ast.AlterTableStmt, sql string, mergeOnl } if found { s.alterTableInfoList[seq].alterStmtList = append(s.alterTableInfoList[seq].alterStmtList, *node) + s.alterTableInfoList[seq].recordSetsPosList = append(s.alterTableInfoList[seq].recordSetsPosList, s.recordSets.SeqNo) } else { var info alterTableInfo = alterTableInfo{Name: tableNameInString} info.alterStmtList = append(info.alterStmtList, *node) + info.recordSetsPosList = append(info.recordSetsPosList, s.recordSets.SeqNo) s.alterTableInfoList = append(s.alterTableInfoList, info) } + if mergeOnly { return }