Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix view bug #100

Merged
merged 8 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,67 @@ func (s *Spec) GetAllTables() ([]string, error) {
return tables, nil
}

func (s *Spec) queryResult(querySQL string, queryColumn string, errMsg string) ([]string, error) {
db, err := s.ConnectDB()
if err != nil {
return nil, err
}

rows, err := db.Query(querySQL)
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, querySQL+" failed")
}
defer rows.Close()

var results []string
for rows.Next() {
rowParser := utils.NewRowParser()
if err := rowParser.Parse(rows); err != nil {
return nil, xerror.Wrap(err, xerror.Normal, errMsg)
}
result, err := rowParser.GetString(queryColumn)
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, errMsg)
}
results = append(results, result)
}

return results, nil
}

func (s *Spec) GetAllViewsFromTable(tableName string) ([]string, error) {
log.Debugf("get all view from table %s", tableName)

var results []string
// first, query information_schema.tables with table_schema and table_type, get all views' name
querySql := fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE table_schema = '%s' AND table_type = 'VIEW'", s.Database)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

desc ${tableName} all might be easier and more efficient than this.

See https://doris.apache.org/docs/1.2/query-acceleration/materialized-view/#query-materialized-views.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'desc ${tableName} all' don't support common view, materialized view is supported

viewsFromQuery, err := s.queryResult(querySql, "table_name", "QUERY VIEWS")
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, "query views from information schema failed")
}

// then query view's create sql, if create sql contains tableName, this view is wanted
viewRegex := regexp.MustCompile("`internal`.`(\\w+)`.`" + strings.TrimSpace(tableName) + "`")
for _, eachViewName := range viewsFromQuery {
showCreateViewSql := fmt.Sprintf("SHOW CREATE VIEW %s", eachViewName)
createViewSqlList, err := s.queryResult(showCreateViewSql, "Create View", "SHOW CREATE VIEW")
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, "show create view failed")
}

// a view has only one create sql, so use createViewSqlList[0] as the only sql
if len(createViewSqlList) > 0 {
found := viewRegex.MatchString(createViewSqlList[0])
if found {
results = append(results, eachViewName)
}
}
}

log.Debugf("get view result is %s", results)
return results, nil
}

func (s *Spec) dropTable(table string) error {
log.Infof("drop table %s.%s", s.Database, table)

Expand Down Expand Up @@ -354,6 +415,28 @@ func (s *Spec) CreateTable(createTable *record.CreateTable) error {
return s.DbExec(sql)
}

func (s *Spec) CreateTableOrView(createTable *record.CreateTable, srcDatabase string) error {
// Creating table will only occur when sync db.
// When create view, the db name of sql is source db name, we should use dest db name to create view
createSql := createTable.Sql
viewRegex := regexp.MustCompile("(?i)^CREATE(\\s+)VIEW")
isCreateView := viewRegex.MatchString(createSql)
if isCreateView {
log.Debugf("create view, use dest db name to replace source db name")

// replace `internal`.`source_db_name`. to `internal`.`dest_db_name`.
originalName := "`internal`.`" + strings.TrimSpace(srcDatabase) + "`."
replaceName := "`internal`.`" + strings.TrimSpace(s.Database) + "`."
createTable.Sql = strings.ReplaceAll(createTable.Sql, originalName, replaceName)
log.Debugf("original create view sql is %s, after repalce, now sql is %s", createSql, createTable.Sql)
}

sql := createTable.Sql
log.Infof("createTableSql: %s", sql)
// HACK: for drop table
return s.DbExec(sql)
}

func (s *Spec) CheckDatabaseExists() (bool, error) {
log.Debugf("check database exist by spec: %s", s.String())
db, err := s.Connect()
Expand Down Expand Up @@ -787,6 +870,12 @@ func (s *Spec) DropTable(tableName string) error {
return s.DbExec(dropSql)
}

func (s *Spec) DropView(viewName string) error {
dropView := fmt.Sprintf("DROP VIEW IF EXISTS %s ", utils.FormatKeywordName(viewName))
log.Infof("drop view sql: %s", dropView)
return s.DbExec(dropView)
}

func (s *Spec) AddPartition(destTableName string, addPartition *record.AddPartition) error {
addPartitionSql := addPartition.GetSql(destTableName)
addPartitionSql = correctAddPartitionSql(addPartitionSql, addPartition)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ type Specer interface {
IsDatabaseEnableBinlog() (bool, error)
IsTableEnableBinlog() (bool, error)
GetAllTables() ([]string, error)
GetAllViewsFromTable(tableName string) ([]string, error)
ClearDB() error
CreateDatabase() error
CreateTable(createTable *record.CreateTable) error
CreateTableOrView(createTable *record.CreateTable, srcDatabase string) error
CheckDatabaseExists() (bool, error)
CheckTableExists() (bool, error)
CreateSnapshotAndWaitForDone(tables []string) (string, error)
Expand All @@ -31,6 +33,7 @@ type Specer interface {
LightningSchemaChange(srcDatabase string, changes *record.ModifyTableAddOrDropColumns) error
TruncateTable(destTableName string, truncateTable *record.TruncateTable) error
DropTable(tableName string) error
DropView(viewName string) error

AddPartition(destTableName string, addPartition *record.AddPartition) error
DropPartition(destTableName string, dropPartition *record.DropPartition) error
Expand Down
38 changes: 31 additions & 7 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ func (j *Job) handleCreateTable(binlog *festruct.TBinlog) error {
return err
}

if err := j.IDest.CreateTable(createTable); err != nil {
if err := j.IDest.CreateTableOrView(createTable, j.Src.Database); err != nil {
return xerror.Wrapf(err, xerror.Normal, "create table %d", createTable.TableId)
}

Expand Down Expand Up @@ -1034,14 +1034,38 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error {
return nil
}

// drop table dropTableSql
var destTableName string
if j.SyncType == TableSync {
destTableName = j.Dest.Table
} else {
destTableName = alterJob.TableName
}

var allViewDeleted bool = false
for {
// drop table dropTableSql
var destTableName string
if j.SyncType == TableSync {
destTableName = j.Dest.Table
} else {
destTableName = alterJob.TableName
// before drop table, drop related view firstly
if !allViewDeleted {
views, err := j.IDest.GetAllViewsFromTable(destTableName)
if err != nil {
log.Errorf("when alter job, get view from table failed, err : %v", err)
continue
}

var dropViewFailed bool = false
for _, view := range views {
if err := j.IDest.DropView(view); err != nil {
log.Errorf("when alter job, drop view %s failed, err : %v", view, err)
dropViewFailed = true
}
}
if dropViewFailed {
continue
}

allViewDeleted = true
}

if err := j.IDest.DropTable(destTableName); err == nil {
break
}
Expand Down
Loading