From 63d0537ce8f6610de0d341fa7c24fd8008240bd9 Mon Sep 17 00:00:00 2001
From: w41ter <w41ter.l@gmail.com>
Date: Wed, 18 Dec 2024 17:24:27 +0800
Subject: [PATCH 1/2] Force partial snapshot if a session variable is required

---
 pkg/ccr/base/spec.go                          | 20 ++---
 pkg/ccr/job.go                                | 16 +++-
 pkg/ccr/job_test.go                           | 23 ++++++
 .../test_cds_tbl_create_vars.groovy           | 82 +++++++++++++++++++
 4 files changed, 124 insertions(+), 17 deletions(-)
 create mode 100644 pkg/ccr/job_test.go
 create mode 100644 regression-test/suites/cross_ds/table/create_vars/test_cds_tbl_create_vars.groovy

diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go
index 7b60d695..83eba67f 100644
--- a/pkg/ccr/base/spec.go
+++ b/pkg/ccr/base/spec.go
@@ -641,15 +641,7 @@ func (s *Spec) CreateTableOrView(createTable *record.CreateTable, srcDatabase st
 	}
 
 	log.Infof("create table or view sql: %s", createSql)
-
-	// FIXME(walter) avoid set session variables in the reusable connection.
-	list := []string{}
-	if strings.Contains(createSql, "agg_state<") {
-		log.Infof("agg_state is exists in the create table sql, set enable_agg_state=true")
-		list = append(list, "SET enable_agg_state=true")
-	}
-	list = append(list, createSql)
-	return s.Exec(list...)
+	return s.Exec(createSql)
 }
 
 func (s *Spec) CheckDatabaseExists() (bool, error) {
@@ -1161,17 +1153,15 @@ func (s *Spec) WaitTransactionDone(txnId int64) {
 }
 
 // Exec sql
-func (s *Spec) Exec(sqls ...string) error {
+func (s *Spec) Exec(sql string) error {
 	db, err := s.Connect()
 	if err != nil {
 		return err
 	}
 
-	for _, sql := range sqls {
-		_, err = db.Exec(sql)
-		if err != nil {
-			return xerror.Wrapf(err, xerror.Normal, "exec sql %s failed", sql)
-		}
+	_, err = db.Exec(sql)
+	if err != nil {
+		return xerror.Wrapf(err, xerror.Normal, "exec sql %s failed", sql)
 	}
 	return nil
 }
diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go
index a4aa6fb1..c4a38b76 100644
--- a/pkg/ccr/job.go
+++ b/pkg/ccr/job.go
@@ -1865,10 +1865,17 @@ func (j *Job) handleCreateTable(binlog *festruct.TBinlog) error {
 	}
 
 	if err = j.IDest.CreateTableOrView(createTable, j.Src.Database); err != nil {
-		if strings.Contains(err.Error(), "Can not found function") {
-			log.Warnf("skip creating table/view because the UDF function is not supported yet: %s", err.Error())
+		errMsg := err.Error()
+		if strings.Contains(errMsg, "Can not found function") {
+			log.Warnf("skip creating table/view because the UDF function is not supported yet: %s", errMsg)
 			return nil
 		}
+		if len(createTable.TableName) > 0 && IsSessionVariableRequired(errMsg) { // ignore doris 2.0.3
+			log.Infof("a session variable is required to create table %s, force partial snapshot, commit seq: %d, msg: %s",
+				createTable.TableName, binlog.GetCommitSeq(), errMsg)
+			replace := false // new table no need to replace
+			return j.newPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace)
+		}
 		return xerror.Wrapf(err, xerror.Normal, "create table %d", createTable.TableId)
 	}
 
@@ -3514,3 +3521,8 @@ func isStatusContainsAny(status *tstatus.TStatus, patterns ...string) bool {
 	}
 	return false
 }
+
+func IsSessionVariableRequired(msg string) bool {
+	re := regexp.MustCompile(`set enable_.+=.+`)
+	return re.MatchString(msg)
+}
diff --git a/pkg/ccr/job_test.go b/pkg/ccr/job_test.go
new file mode 100644
index 00000000..1295e368
--- /dev/null
+++ b/pkg/ccr/job_test.go
@@ -0,0 +1,23 @@
+package ccr_test
+
+import (
+	"testing"
+
+	"github.com/selectdb/ccr_syncer/pkg/ccr"
+)
+
+func TestIsSessionVariableRequired(t *testing.T) {
+	tests := []string{
+		"If you want to specify column names, please `set enable_nereids_planner=true`",
+		"set enable_variant_access_in_original_planner = true in session variable",
+		"Please enable the session variable 'enable_projection' through `set enable_projection = true",
+		"agg state not enable, need set enable_agg_state=true",
+		"which is greater than 38 is disabled by default. set enable_decimal256 = true to enable it",
+		"if we have a column with decimalv3 type and set enable_decimal_conversion = false",
+	}
+	for i, test := range tests {
+		if !ccr.IsSessionVariableRequired(test) {
+			t.Errorf("test %d failed", i)
+		}
+	}
+}
diff --git a/regression-test/suites/cross_ds/table/create_vars/test_cds_tbl_create_vars.groovy b/regression-test/suites/cross_ds/table/create_vars/test_cds_tbl_create_vars.groovy
new file mode 100644
index 00000000..609743ed
--- /dev/null
+++ b/regression-test/suites/cross_ds/table/create_vars/test_cds_tbl_create_vars.groovy
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+suite('test_cds_tbl_create_vars') {
+    def helper = new GroovyShell(new Binding(['suite': delegate]))
+            .evaluate(new File("${context.config.suitePath}/../common", 'helper.groovy'))
+
+    def tableName = 'tbl_' + helper.randomSuffix()
+    def exist = { res -> Boolean
+        return res.size() != 0
+    }
+    def notExist = { res -> Boolean
+        return res.size() == 0
+    }
+
+    helper.enableDbBinlog()
+    helper.ccrJobDelete()
+    helper.ccrJobCreate()
+
+    try {
+        logger.info('create table with agg state')
+        sql 'set enable_agg_state=true'
+        sql """
+        CREATE TABLE ${tableName}_agg (
+          `k1` largeint NOT NULL,
+          `k2` varchar(20) NULL,
+          `v_sum` bigint SUM NULL DEFAULT "0",
+          `v_max` int MAX NULL DEFAULT "0",
+          `v_min` int MIN NULL DEFAULT "99999",
+          `v_generic` agg_state<avg(int null)> GENERIC NOT NULL,
+          `v_hll` hll HLL_UNION NOT NULL,
+          `v_bitmap` bitmap BITMAP_UNION NOT NULL DEFAULT BITMAP_EMPTY,
+          `v_quantile_union` quantile_state QUANTILE_UNION NOT NULL
+        ) ENGINE=OLAP
+        AGGREGATE KEY(`k1`, `k2`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY RANDOM BUCKETS 10
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "min_load_replica_num" = "-1",
+        "light_schema_change" = "true",
+        "binlog.enable" = "true"
+        )
+        """
+
+        assertTrue(helper.checkShowTimesOf("SHOW CREATE TABLE ${tableName}_agg", exist, 30, 'target'))
+    } finally { }
+
+    try {
+        logger.info('create table with decimal 256')
+        sql 'set enable_decimal256 = true'
+        sql """
+        CREATE TABLE ${tableName}_decimal_256 (
+          `id1` int NULL,
+          `id2` int NULL,
+          `result` decimal(76,20) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id1`, `id2`)
+        DISTRIBUTED BY HASH(`id1`) BUCKETS 10
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "min_load_replica_num" = "-1",
+        "light_schema_change" = "true",
+        "binlog.enable" = "true"
+        )
+        """
+        assertTrue(helper.checkShowTimesOf("SHOW CREATE TABLE ${tableName}_decimal_256", exist, 30, 'target'))
+    } finally { }
+    }

From 0fc99dcf6d3eed5a4a852e30220e512446ab9811 Mon Sep 17 00:00:00 2001
From: w41ter <w41ter.l@gmail.com>
Date: Wed, 18 Dec 2024 18:26:41 +0800
Subject: [PATCH 2/2] fixup

---
 .../table/create_vars/test_cds_tbl_create_vars.groovy         | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/regression-test/suites/cross_ds/table/create_vars/test_cds_tbl_create_vars.groovy b/regression-test/suites/cross_ds/table/create_vars/test_cds_tbl_create_vars.groovy
index 609743ed..8588fa48 100644
--- a/regression-test/suites/cross_ds/table/create_vars/test_cds_tbl_create_vars.groovy
+++ b/regression-test/suites/cross_ds/table/create_vars/test_cds_tbl_create_vars.groovy
@@ -57,7 +57,7 @@ suite('test_cds_tbl_create_vars') {
         """
 
         assertTrue(helper.checkShowTimesOf("SHOW CREATE TABLE ${tableName}_agg", exist, 30, 'target'))
-    } finally { }
+    } catch (Exception) { }
 
     try {
         logger.info('create table with decimal 256')
@@ -78,5 +78,5 @@ suite('test_cds_tbl_create_vars') {
         )
         """
         assertTrue(helper.checkShowTimesOf("SHOW CREATE TABLE ${tableName}_decimal_256", exist, 30, 'target'))
-    } finally { }
+    } catch (Exception) { }
     }