diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index 163e386b..f1a70b76 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -297,6 +297,67 @@ func (s *Spec) GetAllTables() ([]string, error) { return tables, nil } +func (s *Spec) queryResult(querySQL string, queryColumn string, errMsg string) ([]string, error) { + db, err := s.ConnectDB() + if err != nil { + return nil, err + } + + rows, err := db.Query(querySQL) + if err != nil { + return nil, xerror.Wrap(err, xerror.Normal, querySQL+" failed") + } + defer rows.Close() + + var results []string + for rows.Next() { + rowParser := utils.NewRowParser() + if err := rowParser.Parse(rows); err != nil { + return nil, xerror.Wrap(err, xerror.Normal, errMsg) + } + result, err := rowParser.GetString(queryColumn) + if err != nil { + return nil, xerror.Wrap(err, xerror.Normal, errMsg) + } + results = append(results, result) + } + + return results, nil +} + +func (s *Spec) GetAllViewsFromTable(tableName string) ([]string, error) { + log.Debugf("get all view from table %s", tableName) + + var results []string + // first, query information_schema.tables with table_schema and table_type, get all views' name + querySql := fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE table_schema = '%s' AND table_type = 'VIEW'", s.Database) + viewsFromQuery, err := s.queryResult(querySql, "table_name", "QUERY VIEWS") + if err != nil { + return nil, xerror.Wrap(err, xerror.Normal, "query views from information schema failed") + } + + // then query view's create sql, if create sql contains tableName, this view is wanted + viewRegex := regexp.MustCompile("`internal`.`(\\w+)`.`" + strings.TrimSpace(tableName) + "`") + for _, eachViewName := range viewsFromQuery { + showCreateViewSql := fmt.Sprintf("SHOW CREATE VIEW %s", eachViewName) + createViewSqlList, err := s.queryResult(showCreateViewSql, "Create View", "SHOW CREATE VIEW") + if err != nil { + return nil, xerror.Wrap(err, xerror.Normal, "show create view failed") + } + + // a view has only one create sql, so use createViewSqlList[0] as the only sql + if len(createViewSqlList) > 0 { + found := viewRegex.MatchString(createViewSqlList[0]) + if found { + results = append(results, eachViewName) + } + } + } + + log.Debugf("get view result is %s", results) + return results, nil +} + func (s *Spec) dropTable(table string) error { log.Infof("drop table %s.%s", s.Database, table) @@ -787,6 +848,12 @@ func (s *Spec) DropTable(tableName string) error { return s.DbExec(dropSql) } +func (s *Spec) DropView(viewName string) error { + dropView := fmt.Sprintf("DROP VIEW IF EXISTS %s ", utils.FormatKeywordName(viewName)) + log.Infof("drop view sql: %s", dropView) + return s.DbExec(dropView) +} + func (s *Spec) AddPartition(destTableName string, addPartition *record.AddPartition) error { addPartitionSql := addPartition.GetSql(destTableName) addPartitionSql = correctAddPartitionSql(addPartitionSql, addPartition) diff --git a/pkg/ccr/base/specer.go b/pkg/ccr/base/specer.go index 85fa47f4..1986d86a 100644 --- a/pkg/ccr/base/specer.go +++ b/pkg/ccr/base/specer.go @@ -18,6 +18,7 @@ type Specer interface { IsDatabaseEnableBinlog() (bool, error) IsTableEnableBinlog() (bool, error) GetAllTables() ([]string, error) + GetAllViewsFromTable(tableName string) ([]string, error) ClearDB() error CreateDatabase() error CreateTable(createTable *record.CreateTable) error @@ -31,6 +32,7 @@ type Specer interface { LightningSchemaChange(srcDatabase string, changes *record.ModifyTableAddOrDropColumns) error TruncateTable(destTableName string, truncateTable *record.TruncateTable) error DropTable(tableName string) error + DropView(viewName string) error AddPartition(destTableName string, addPartition *record.AddPartition) error DropPartition(destTableName string, dropPartition *record.DropPartition) error diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index ebda53fd..a59cd0c7 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -946,6 +946,24 @@ func (j *Job) handleCreateTable(binlog *festruct.TBinlog) error { return err } + /* + Creating table will only occur when sync db. + When create view, the db name of sql is source db name, we should use dest db name to create view + */ + createSql := createTable.Sql + viewRegex := regexp.MustCompile("(?i)^CREATE(\\s+)VIEW") + isCreateView := viewRegex.MatchString(createSql) + if isCreateView { + log.Debugf("create view, use dest db name to replace source db name") + + // replace `internal`.`source_db_name`. to `internal`.`dest_db_name`. + originalName := "`internal`.`" + strings.TrimSpace(j.Src.Database) + "`." + dbNameRegex := regexp.MustCompile(originalName) + replaceName := "`internal`.`" + strings.TrimSpace(j.Dest.Database) + "`." + createTable.Sql = dbNameRegex.ReplaceAllString(createSql, replaceName) + log.Debugf("original create view sql is %s, after repalce, now sql is %s", createSql, createTable.Sql) + } + if err := j.IDest.CreateTable(createTable); err != nil { return xerror.Wrapf(err, xerror.Normal, "create table %d", createTable.TableId) } @@ -1034,14 +1052,31 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error { return nil } + // drop table dropTableSql + var destTableName string + if j.SyncType == TableSync { + destTableName = j.Dest.Table + } else { + destTableName = alterJob.TableName + } + + var allViewDeleted bool = false for { - // drop table dropTableSql - var destTableName string - if j.SyncType == TableSync { - destTableName = j.Dest.Table - } else { - destTableName = alterJob.TableName + // before drop table, drop related view firstly + if !allViewDeleted { + views, err := j.IDest.GetAllViewsFromTable(destTableName) + if err != nil { + return err + } + + for _, view := range views { + if err := j.IDest.DropView(view); err != nil { + return err + } + } + allViewDeleted = true } + if err := j.IDest.DropTable(destTableName); err == nil { break }