Skip to content

Commit

Permalink
fix create/drop view bug (#100)
Browse files Browse the repository at this point in the history

Co-authored-by: walter <[email protected]>
  • Loading branch information
lsy3993 and w41ter committed Jun 7, 2024
1 parent e7e8eb1 commit a4c5bbd
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 7 deletions.
89 changes: 89 additions & 0 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,67 @@ func (s *Spec) GetAllTables() ([]string, error) {
return tables, nil
}

func (s *Spec) queryResult(querySQL string, queryColumn string, errMsg string) ([]string, error) {
db, err := s.ConnectDB()
if err != nil {
return nil, err
}

rows, err := db.Query(querySQL)
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, querySQL+" failed")
}
defer rows.Close()

var results []string
for rows.Next() {
rowParser := utils.NewRowParser()
if err := rowParser.Parse(rows); err != nil {
return nil, xerror.Wrap(err, xerror.Normal, errMsg)
}
result, err := rowParser.GetString(queryColumn)
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, errMsg)
}
results = append(results, result)
}

return results, nil
}

func (s *Spec) GetAllViewsFromTable(tableName string) ([]string, error) {
log.Debugf("get all view from table %s", tableName)

var results []string
// first, query information_schema.tables with table_schema and table_type, get all views' name
querySql := fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE table_schema = '%s' AND table_type = 'VIEW'", s.Database)
viewsFromQuery, err := s.queryResult(querySql, "table_name", "QUERY VIEWS")
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, "query views from information schema failed")
}

// then query view's create sql, if create sql contains tableName, this view is wanted
viewRegex := regexp.MustCompile("`internal`.`(\\w+)`.`" + strings.TrimSpace(tableName) + "`")
for _, eachViewName := range viewsFromQuery {
showCreateViewSql := fmt.Sprintf("SHOW CREATE VIEW %s", eachViewName)
createViewSqlList, err := s.queryResult(showCreateViewSql, "Create View", "SHOW CREATE VIEW")
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, "show create view failed")
}

// a view has only one create sql, so use createViewSqlList[0] as the only sql
if len(createViewSqlList) > 0 {
found := viewRegex.MatchString(createViewSqlList[0])
if found {
results = append(results, eachViewName)
}
}
}

log.Debugf("get view result is %s", results)
return results, nil
}

func (s *Spec) dropTable(table string) error {
log.Infof("drop table %s.%s", s.Database, table)

Expand Down Expand Up @@ -354,6 +415,28 @@ func (s *Spec) CreateTable(createTable *record.CreateTable) error {
return s.DbExec(sql)
}

func (s *Spec) CreateTableOrView(createTable *record.CreateTable, srcDatabase string) error {
// Creating table will only occur when sync db.
// When create view, the db name of sql is source db name, we should use dest db name to create view
createSql := createTable.Sql
viewRegex := regexp.MustCompile("(?i)^CREATE(\\s+)VIEW")
isCreateView := viewRegex.MatchString(createSql)
if isCreateView {
log.Debugf("create view, use dest db name to replace source db name")

// replace `internal`.`source_db_name`. to `internal`.`dest_db_name`.
originalName := "`internal`.`" + strings.TrimSpace(srcDatabase) + "`."
replaceName := "`internal`.`" + strings.TrimSpace(s.Database) + "`."
createTable.Sql = strings.ReplaceAll(createTable.Sql, originalName, replaceName)
log.Debugf("original create view sql is %s, after repalce, now sql is %s", createSql, createTable.Sql)
}

sql := createTable.Sql
log.Infof("createTableSql: %s", sql)
// HACK: for drop table
return s.DbExec(sql)
}

func (s *Spec) CheckDatabaseExists() (bool, error) {
log.Debugf("check database exist by spec: %s", s.String())
db, err := s.Connect()
Expand Down Expand Up @@ -779,6 +862,12 @@ func (s *Spec) DropTable(tableName string) error {
return s.DbExec(dropSql)
}

func (s *Spec) DropView(viewName string) error {
dropView := fmt.Sprintf("DROP VIEW IF EXISTS %s ", utils.FormatKeywordName(viewName))
log.Infof("drop view sql: %s", dropView)
return s.DbExec(dropView)
}

func (s *Spec) AddPartition(destTableName string, addPartition *record.AddPartition) error {
addPartitionSql := addPartition.GetSql(destTableName)
addPartitionSql = correctAddPartitionSql(addPartitionSql, addPartition)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ type Specer interface {
IsDatabaseEnableBinlog() (bool, error)
IsTableEnableBinlog() (bool, error)
GetAllTables() ([]string, error)
GetAllViewsFromTable(tableName string) ([]string, error)
ClearDB() error
CreateDatabase() error
CreateTable(createTable *record.CreateTable) error
CreateTableOrView(createTable *record.CreateTable, srcDatabase string) error
CheckDatabaseExists() (bool, error)
CheckTableExists() (bool, error)
CreateSnapshotAndWaitForDone(tables []string) (string, error)
Expand All @@ -31,6 +33,7 @@ type Specer interface {
LightningSchemaChange(srcDatabase string, changes *record.ModifyTableAddOrDropColumns) error
TruncateTable(destTableName string, truncateTable *record.TruncateTable) error
DropTable(tableName string) error
DropView(viewName string) error

AddPartition(destTableName string, addPartition *record.AddPartition) error
DropPartition(destTableName string, dropPartition *record.DropPartition) error
Expand Down
38 changes: 31 additions & 7 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ func (j *Job) handleCreateTable(binlog *festruct.TBinlog) error {
return err
}

if err := j.IDest.CreateTable(createTable); err != nil {
if err := j.IDest.CreateTableOrView(createTable, j.Src.Database); err != nil {
return xerror.Wrapf(err, xerror.Normal, "create table %d", createTable.TableId)
}

Expand Down Expand Up @@ -1037,14 +1037,38 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error {
return nil
}

// drop table dropTableSql
var destTableName string
if j.SyncType == TableSync {
destTableName = j.Dest.Table
} else {
destTableName = alterJob.TableName
}

var allViewDeleted bool = false
for {
// drop table dropTableSql
var destTableName string
if j.SyncType == TableSync {
destTableName = j.Dest.Table
} else {
destTableName = alterJob.TableName
// before drop table, drop related view firstly
if !allViewDeleted {
views, err := j.IDest.GetAllViewsFromTable(destTableName)
if err != nil {
log.Errorf("when alter job, get view from table failed, err : %v", err)
continue
}

var dropViewFailed bool = false
for _, view := range views {
if err := j.IDest.DropView(view); err != nil {
log.Errorf("when alter job, drop view %s failed, err : %v", view, err)
dropViewFailed = true
}
}
if dropViewFailed {
continue
}

allViewDeleted = true
}

if err := j.IDest.DropTable(destTableName); err == nil {
break
}
Expand Down

0 comments on commit a4c5bbd

Please sign in to comment.