diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index 163e386b..f5589875 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -297,6 +297,67 @@ func (s *Spec) GetAllTables() ([]string, error) { return tables, nil } +func (s *Spec) queryResult(querySQL string, queryColumn string, errMsg string) ([]string, error) { + db, err := s.ConnectDB() + if err != nil { + return nil, err + } + + rows, err := db.Query(querySQL) + if err != nil { + return nil, xerror.Wrap(err, xerror.Normal, querySQL+" failed") + } + defer rows.Close() + + var results []string + for rows.Next() { + rowParser := utils.NewRowParser() + if err := rowParser.Parse(rows); err != nil { + return nil, xerror.Wrap(err, xerror.Normal, errMsg) + } + result, err := rowParser.GetString(queryColumn) + if err != nil { + return nil, xerror.Wrap(err, xerror.Normal, errMsg) + } + results = append(results, result) + } + + return results, nil +} + +func (s *Spec) GetAllViewsFromTable(tableName string) ([]string, error) { + log.Debugf("get all view from table %s", tableName) + + var results []string + // first, query information_schema.tables with table_schema and table_type, get all views' name + querySql := fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE table_schema = '%s' AND table_type = 'VIEW'", s.Database) + viewsFromQuery, err := s.queryResult(querySql, "table_name", "QUERY VIEWS") + if err != nil { + return nil, xerror.Wrap(err, xerror.Normal, "query views from information schema failed") + } + + // then query view's create sql, if create sql contains tableName, this view is wanted + viewRegex := regexp.MustCompile("`internal`.`(\\w+)`.`" + strings.TrimSpace(tableName) + "`") + for _, eachViewName := range viewsFromQuery { + showCreateViewSql := fmt.Sprintf("SHOW CREATE VIEW %s", eachViewName) + createViewSqlList, err := s.queryResult(showCreateViewSql, "Create View", "SHOW CREATE VIEW") + if err != nil { + return nil, xerror.Wrap(err, xerror.Normal, "show create view failed") + } + + // a view has only one create sql, so use createViewSqlList[0] as the only sql + if len(createViewSqlList) > 0 { + found := viewRegex.MatchString(createViewSqlList[0]) + if found { + results = append(results, eachViewName) + } + } + } + + log.Debugf("get view result is %s", results) + return results, nil +} + func (s *Spec) dropTable(table string) error { log.Infof("drop table %s.%s", s.Database, table) @@ -354,6 +415,28 @@ func (s *Spec) CreateTable(createTable *record.CreateTable) error { return s.DbExec(sql) } +func (s *Spec) CreateTableOrView(createTable *record.CreateTable, srcDatabase string) error { + // Creating table will only occur when sync db. + // When create view, the db name of sql is source db name, we should use dest db name to create view + createSql := createTable.Sql + viewRegex := regexp.MustCompile("(?i)^CREATE(\\s+)VIEW") + isCreateView := viewRegex.MatchString(createSql) + if isCreateView { + log.Debugf("create view, use dest db name to replace source db name") + + // replace `internal`.`source_db_name`. to `internal`.`dest_db_name`. + originalName := "`internal`.`" + strings.TrimSpace(srcDatabase) + "`." + replaceName := "`internal`.`" + strings.TrimSpace(s.Database) + "`." + createTable.Sql = strings.ReplaceAll(createTable.Sql, originalName, replaceName) + log.Debugf("original create view sql is %s, after repalce, now sql is %s", createSql, createTable.Sql) + } + + sql := createTable.Sql + log.Infof("createTableSql: %s", sql) + // HACK: for drop table + return s.DbExec(sql) +} + func (s *Spec) CheckDatabaseExists() (bool, error) { log.Debugf("check database exist by spec: %s", s.String()) db, err := s.Connect() @@ -787,6 +870,12 @@ func (s *Spec) DropTable(tableName string) error { return s.DbExec(dropSql) } +func (s *Spec) DropView(viewName string) error { + dropView := fmt.Sprintf("DROP VIEW IF EXISTS %s ", utils.FormatKeywordName(viewName)) + log.Infof("drop view sql: %s", dropView) + return s.DbExec(dropView) +} + func (s *Spec) AddPartition(destTableName string, addPartition *record.AddPartition) error { addPartitionSql := addPartition.GetSql(destTableName) addPartitionSql = correctAddPartitionSql(addPartitionSql, addPartition) diff --git a/pkg/ccr/base/specer.go b/pkg/ccr/base/specer.go index 85fa47f4..75bb5ba7 100644 --- a/pkg/ccr/base/specer.go +++ b/pkg/ccr/base/specer.go @@ -18,9 +18,11 @@ type Specer interface { IsDatabaseEnableBinlog() (bool, error) IsTableEnableBinlog() (bool, error) GetAllTables() ([]string, error) + GetAllViewsFromTable(tableName string) ([]string, error) ClearDB() error CreateDatabase() error CreateTable(createTable *record.CreateTable) error + CreateTableOrView(createTable *record.CreateTable, srcDatabase string) error CheckDatabaseExists() (bool, error) CheckTableExists() (bool, error) CreateSnapshotAndWaitForDone(tables []string) (string, error) @@ -31,6 +33,7 @@ type Specer interface { LightningSchemaChange(srcDatabase string, changes *record.ModifyTableAddOrDropColumns) error TruncateTable(destTableName string, truncateTable *record.TruncateTable) error DropTable(tableName string) error + DropView(viewName string) error AddPartition(destTableName string, addPartition *record.AddPartition) error DropPartition(destTableName string, dropPartition *record.DropPartition) error diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index ebda53fd..8664c922 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -946,7 +946,7 @@ func (j *Job) handleCreateTable(binlog *festruct.TBinlog) error { return err } - if err := j.IDest.CreateTable(createTable); err != nil { + if err := j.IDest.CreateTableOrView(createTable, j.Src.Database); err != nil { return xerror.Wrapf(err, xerror.Normal, "create table %d", createTable.TableId) } @@ -1034,14 +1034,38 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error { return nil } + // drop table dropTableSql + var destTableName string + if j.SyncType == TableSync { + destTableName = j.Dest.Table + } else { + destTableName = alterJob.TableName + } + + var allViewDeleted bool = false for { - // drop table dropTableSql - var destTableName string - if j.SyncType == TableSync { - destTableName = j.Dest.Table - } else { - destTableName = alterJob.TableName + // before drop table, drop related view firstly + if !allViewDeleted { + views, err := j.IDest.GetAllViewsFromTable(destTableName) + if err != nil { + log.Errorf("when alter job, get view from table failed, err : %v", err) + continue + } + + var dropViewFailed bool = false + for _, view := range views { + if err := j.IDest.DropView(view); err != nil { + log.Errorf("when alter job, drop view %s failed, err : %v", view, err) + dropViewFailed = true + } + } + if dropViewFailed { + continue + } + + allViewDeleted = true } + if err := j.IDest.DropTable(destTableName); err == nil { break }