Skip to content

Commit

Permalink
fix view bug
Browse files Browse the repository at this point in the history
  • Loading branch information
lsy3993 committed Jun 2, 2024
1 parent 2dc9ed1 commit cd85ab3
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 6 deletions.
67 changes: 67 additions & 0 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,67 @@ func (s *Spec) GetAllTables() ([]string, error) {
return tables, nil
}

func (s *Spec) queryResult(querySQL string, queryColumn string, errMsg string) ([]string, error) {
db, err := s.ConnectDB()
if err != nil {
return nil, err
}

rows, err := db.Query(querySQL)
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, querySQL+" failed")
}
defer rows.Close()

var results []string
for rows.Next() {
rowParser := utils.NewRowParser()
if err := rowParser.Parse(rows); err != nil {
return nil, xerror.Wrap(err, xerror.Normal, errMsg)
}
result, err := rowParser.GetString(queryColumn)
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, errMsg)
}
results = append(results, result)
}

return results, nil
}

func (s *Spec) GetAllViewsFromTable(tableName string) ([]string, error) {
log.Debugf("get all view from table %s", tableName)

var results []string
// first, query information_schema.tables with table_schema and table_type, get all views' name
querySql := fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE table_schema = '%s' AND table_type = 'VIEW'", s.Database)
viewsFromQuery, err := s.queryResult(querySql, "table_name", "QUERY VIEWS")
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, "query views from information schema failed")
}

// then query view's create sql, if create sql contains tableName, this view is wanted
viewRegex := regexp.MustCompile("`internal`.`(\\w+)`.`" + strings.TrimSpace(tableName) + "`")
for _, eachViewName := range viewsFromQuery {
showCreateViewSql := fmt.Sprintf("SHOW CREATE VIEW %s", eachViewName)
createViewSqlList, err := s.queryResult(showCreateViewSql, "Create View", "SHOW CREATE VIEW")
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, "show create view failed")
}

// a view has only one create sql, so use createViewSqlList[0] as the only sql
if len(createViewSqlList) > 0 {
found := viewRegex.MatchString(createViewSqlList[0])
if found {
results = append(results, eachViewName)
}
}
}

log.Debugf("get view result is %s", results)
return results, nil
}

func (s *Spec) dropTable(table string) error {
log.Infof("drop table %s.%s", s.Database, table)

Expand Down Expand Up @@ -787,6 +848,12 @@ func (s *Spec) DropTable(tableName string) error {
return s.DbExec(dropSql)
}

func (s *Spec) DropView(viewName string) error {
dropView := fmt.Sprintf("DROP VIEW IF EXISTS %s ", utils.FormatKeywordName(viewName))
log.Infof("drop view sql: %s", dropView)
return s.DbExec(dropView)
}

func (s *Spec) AddPartition(destTableName string, addPartition *record.AddPartition) error {
addPartitionSql := addPartition.GetSql(destTableName)
addPartitionSql = correctAddPartitionSql(addPartitionSql, addPartition)
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Specer interface {
IsDatabaseEnableBinlog() (bool, error)
IsTableEnableBinlog() (bool, error)
GetAllTables() ([]string, error)
GetAllViewsFromTable(tableName string) ([]string, error)
ClearDB() error
CreateDatabase() error
CreateTable(createTable *record.CreateTable) error
Expand All @@ -31,6 +32,7 @@ type Specer interface {
LightningSchemaChange(srcDatabase string, changes *record.ModifyTableAddOrDropColumns) error
TruncateTable(destTableName string, truncateTable *record.TruncateTable) error
DropTable(tableName string) error
DropView(viewName string) error

AddPartition(destTableName string, addPartition *record.AddPartition) error
DropPartition(destTableName string, dropPartition *record.DropPartition) error
Expand Down
47 changes: 41 additions & 6 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,24 @@ func (j *Job) handleCreateTable(binlog *festruct.TBinlog) error {
return err
}

/*
Creating table will only occur when sync db.
When create view, the db name of sql is source db name, we should use dest db name to create view
*/
createSql := createTable.Sql
viewRegex := regexp.MustCompile("(?i)^CREATE(\\s+)VIEW")
isCreateView := viewRegex.MatchString(createSql)
if isCreateView {
log.Debugf("create view, use dest db name to replace source db name")

// replace `internal`.`source_db_name`. to `internal`.`dest_db_name`.
originalName := "`internal`.`" + strings.TrimSpace(j.Src.Database) + "`."
dbNameRegex := regexp.MustCompile(originalName)
replaceName := "`internal`.`" + strings.TrimSpace(j.Dest.Database) + "`."
createTable.Sql = dbNameRegex.ReplaceAllString(createSql, replaceName)
log.Debugf("original create view sql is %s, after repalce, now sql is %s", createSql, createTable.Sql)
}

if err := j.IDest.CreateTable(createTable); err != nil {
return xerror.Wrapf(err, xerror.Normal, "create table %d", createTable.TableId)
}
Expand Down Expand Up @@ -1034,14 +1052,31 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error {
return nil
}

// drop table dropTableSql
var destTableName string
if j.SyncType == TableSync {
destTableName = j.Dest.Table
} else {
destTableName = alterJob.TableName
}

var allViewDeleted bool = false
for {
// drop table dropTableSql
var destTableName string
if j.SyncType == TableSync {
destTableName = j.Dest.Table
} else {
destTableName = alterJob.TableName
// before drop table, drop related view firstly
if !allViewDeleted {
views, err := j.IDest.GetAllViewsFromTable(destTableName)
if err != nil {
return err
}

for _, view := range views {
if err := j.IDest.DropView(view); err != nil {
return err
}
}
allViewDeleted = true
}

if err := j.IDest.DropTable(destTableName); err == nil {
break
}
Expand Down

0 comments on commit cd85ab3

Please sign in to comment.