Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force partial snapshot if a session variable is required #331

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,15 +641,7 @@ func (s *Spec) CreateTableOrView(createTable *record.CreateTable, srcDatabase st
}

log.Infof("create table or view sql: %s", createSql)

// FIXME(walter) avoid set session variables in the reusable connection.
list := []string{}
if strings.Contains(createSql, "agg_state<") {
log.Infof("agg_state is exists in the create table sql, set enable_agg_state=true")
list = append(list, "SET enable_agg_state=true")
}
list = append(list, createSql)
return s.Exec(list...)
return s.Exec(createSql)
}

func (s *Spec) CheckDatabaseExists() (bool, error) {
Expand Down Expand Up @@ -1161,17 +1153,15 @@ func (s *Spec) WaitTransactionDone(txnId int64) {
}

// Exec sql
func (s *Spec) Exec(sqls ...string) error {
func (s *Spec) Exec(sql string) error {
db, err := s.Connect()
if err != nil {
return err
}

for _, sql := range sqls {
_, err = db.Exec(sql)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "exec sql %s failed", sql)
}
_, err = db.Exec(sql)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "exec sql %s failed", sql)
}
return nil
}
Expand Down
16 changes: 14 additions & 2 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1865,10 +1865,17 @@ func (j *Job) handleCreateTable(binlog *festruct.TBinlog) error {
}

if err = j.IDest.CreateTableOrView(createTable, j.Src.Database); err != nil {
if strings.Contains(err.Error(), "Can not found function") {
log.Warnf("skip creating table/view because the UDF function is not supported yet: %s", err.Error())
errMsg := err.Error()
if strings.Contains(errMsg, "Can not found function") {
log.Warnf("skip creating table/view because the UDF function is not supported yet: %s", errMsg)
return nil
}
if len(createTable.TableName) > 0 && IsSessionVariableRequired(errMsg) { // ignore doris 2.0.3
log.Infof("a session variable is required to create table %s, force partial snapshot, commit seq: %d, msg: %s",
createTable.TableName, binlog.GetCommitSeq(), errMsg)
replace := false // new table no need to replace
return j.newPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace)
}
return xerror.Wrapf(err, xerror.Normal, "create table %d", createTable.TableId)
}

Expand Down Expand Up @@ -3514,3 +3521,8 @@ func isStatusContainsAny(status *tstatus.TStatus, patterns ...string) bool {
}
return false
}

func IsSessionVariableRequired(msg string) bool {
re := regexp.MustCompile(`set enable_.+=.+`)
return re.MatchString(msg)
}
23 changes: 23 additions & 0 deletions pkg/ccr/job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package ccr_test

import (
"testing"

"github.com/selectdb/ccr_syncer/pkg/ccr"
)

func TestIsSessionVariableRequired(t *testing.T) {
tests := []string{
"If you want to specify column names, please `set enable_nereids_planner=true`",
"set enable_variant_access_in_original_planner = true in session variable",
"Please enable the session variable 'enable_projection' through `set enable_projection = true",
"agg state not enable, need set enable_agg_state=true",
"which is greater than 38 is disabled by default. set enable_decimal256 = true to enable it",
"if we have a column with decimalv3 type and set enable_decimal_conversion = false",
}
for i, test := range tests {
if !ccr.IsSessionVariableRequired(test) {
t.Errorf("test %d failed", i)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite('test_cds_tbl_create_vars') {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", 'helper.groovy'))

def tableName = 'tbl_' + helper.randomSuffix()
def exist = { res -> Boolean
return res.size() != 0
}
def notExist = { res -> Boolean
return res.size() == 0
}

helper.enableDbBinlog()
helper.ccrJobDelete()
helper.ccrJobCreate()

try {
logger.info('create table with agg state')
sql 'set enable_agg_state=true'
sql """
CREATE TABLE ${tableName}_agg (
`k1` largeint NOT NULL,
`k2` varchar(20) NULL,
`v_sum` bigint SUM NULL DEFAULT "0",
`v_max` int MAX NULL DEFAULT "0",
`v_min` int MIN NULL DEFAULT "99999",
`v_generic` agg_state<avg(int null)> GENERIC NOT NULL,
`v_hll` hll HLL_UNION NOT NULL,
`v_bitmap` bitmap BITMAP_UNION NOT NULL DEFAULT BITMAP_EMPTY,
`v_quantile_union` quantile_state QUANTILE_UNION NOT NULL
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
COMMENT 'OLAP'
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"min_load_replica_num" = "-1",
"light_schema_change" = "true",
"binlog.enable" = "true"
)
"""

assertTrue(helper.checkShowTimesOf("SHOW CREATE TABLE ${tableName}_agg", exist, 30, 'target'))
} catch (Exception) { }

try {
logger.info('create table with decimal 256')
sql 'set enable_decimal256 = true'
sql """
CREATE TABLE ${tableName}_decimal_256 (
`id1` int NULL,
`id2` int NULL,
`result` decimal(76,20) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id1`, `id2`)
DISTRIBUTED BY HASH(`id1`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"min_load_replica_num" = "-1",
"light_schema_change" = "true",
"binlog.enable" = "true"
)
"""
assertTrue(helper.checkShowTimesOf("SHOW CREATE TABLE ${tableName}_decimal_256", exist, 30, 'target'))
} catch (Exception) { }
}
Loading