Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support txn insert when db sync #290

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 53 additions & 19 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func init() {
"compress the snapshot job info and meta")
flag.BoolVar(&featureSkipRollupBinlogs, "feature_skip_rollup_binlogs", false,
"skip the rollup related binlogs")
flag.BoolVar(&featureTxnInsert, "feature_txn_insert", false,
flag.BoolVar(&featureTxnInsert, "feature_txn_insert", true,
"enable txn insert support")
}

Expand Down Expand Up @@ -1339,6 +1339,40 @@ func (j *Job) getDbSyncTableRecords(upsert *record.Upsert) []*record.TableRecord
return tableRecords
}

func (j *Job) getStidsByDestTableId(destTableId int64, tableRecords []*record.TableRecord, stidMaps map[int64]int64) ([]int64, error) {
destStids := make([]int64, 0, 1)
uniqStids := make(map[int64]int64)

// first, get the source table id from j.progress.TableMapping
for sourceId, destId := range j.progress.TableMapping {
if destId != destTableId {
continue
}

// second, get the source stids from tableRecords
for _, tableRecord := range tableRecords {
if tableRecord.Id != sourceId {
continue
}

// third, get dest stids from partition
for _, partition := range tableRecord.PartitionRecords {
destStid := stidMaps[partition.Stid]
if destStid != 0 {
uniqStids[destStid] = 1
}
}
}
}

// dest stids may be repeated, get the unique stids
for key := range uniqStids {
destStid := key
destStids = append(destStids, destStid)
}
return destStids, nil
}

func (j *Job) getReleatedTableRecords(upsert *record.Upsert) ([]*record.TableRecord, error) {
var tableRecords []*record.TableRecord //, 0, len(upsert.TableRecords))

Expand Down Expand Up @@ -1406,11 +1440,13 @@ func (j *Job) ingestBinlogForTxnInsert(txnId int64, tableRecords []*record.Table

stidToCommitInfos := ingestBinlogJob.SubTxnToCommitInfos()
subTxnInfos := make([]*festruct.TSubTxnInfo, 0, len(stidMap))
for sourceStid, destStid := range stidMap {
destStid := destStid // if no this line, every element in subTxnInfos is the last tSubTxnInfo
destStids, err := j.getStidsByDestTableId(destTableId, tableRecords, stidMap)

for _, destStid := range destStids {
destStid := destStid
commitInfos := stidToCommitInfos[destStid]
if commitInfos == nil {
log.Warnf("no commit infos from source stid: %d; dest stid %d, just skip", sourceStid, destStid)
log.Warnf("no commit infos from dest stid %d, just skip", destStid)
continue
}

Expand Down Expand Up @@ -1515,10 +1551,6 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error {
log.Warnf("The txn insert is not supported yet")
return xerror.Errorf(xerror.Normal, "The txn insert is not supported yet")
}
if j.SyncType == DBSync {
log.Warnf("Txn insert is NOT supported when DBSync")
return xerror.Errorf(xerror.Normal, "Txn insert is NOT supported when DBSync")
}
isTxnInsert = true
}

Expand Down Expand Up @@ -1627,18 +1659,20 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error {

// Step 3: ingest binlog
if isTxnInsert {
// When txn insert, only one table can be inserted, so use the first DestTableId
destTableId := inMemoryData.DestTableIds[0]

// When txn insert, use subTxnInfos to commit rather than commitInfos.
subTxnInfos, err := j.ingestBinlogForTxnInsert(txnId, tableRecords, stidMap, destTableId)
if err != nil {
rollback(err, inMemoryData)
return err
} else {
inMemoryData.SubTxnInfos = subTxnInfos
j.progress.NextSubCheckpoint(CommitTransaction, inMemoryData)
var allSubTxnInfos = make([]*festruct.TSubTxnInfo, 0, len(stidMap))
for _, destTableId := range inMemoryData.DestTableIds {
// When txn insert, use subTxnInfos to commit rather than commitInfos.
subTxnInfos, err := j.ingestBinlogForTxnInsert(txnId, tableRecords, stidMap, destTableId)
if err != nil {
rollback(err, inMemoryData)
return err
} else {
subTxnInfos := subTxnInfos
allSubTxnInfos = append(allSubTxnInfos, subTxnInfos...)
j.progress.NextSubCheckpoint(CommitTransaction, inMemoryData)
}
}
inMemoryData.SubTxnInfos = allSubTxnInfos
} else {
commitInfos, err := j.ingestBinlog(txnId, tableRecords)
if err != nil {
Expand Down
186 changes: 186 additions & 0 deletions regression-test/suites/db_sync/txn_insert/test_txn_insert_db.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_txn_insert_db") {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", "helper.groovy"))

if (!helper.has_feature("feature_txn_insert")) {
logger.info("Skip the test because the feature is not supported.")
return
}

def tableName1 = "t1_" + helper.randomSuffix()
def tableName2 = "t2_" + helper.randomSuffix()
def tableName3 = "t3_" + helper.randomSuffix()
def tableName4 = "t4_" + helper.randomSuffix()
def test_num = 0
def insert_num = 10

def exist = { res -> Boolean
return res.size() != 0
}
def notExist = { res -> Boolean
return res.size() == 0
}

def hasRollupFull = { res -> Boolean
for (List<Object> row : res) {
if ((row[0] as String) == "${new_rollup_name}") {
return true
}
}
return false
}

helper.enableDbBinlog()

sql """
CREATE TABLE IF NOT EXISTS ${tableName1}
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市"
) ENGINE = olap
unique KEY(`user_id`, `date`)
PARTITION BY RANGE (`date`)
(
PARTITION `p201701` VALUES LESS THAN ("2017-02-01"),
PARTITION `p201702` VALUES LESS THAN ("2017-03-01"),
PARTITION `p201703` VALUES LESS THAN ("2017-04-01")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES ("replication_num" = "1", "binlog.enable" = "true","enable_unique_key_merge_on_write" = "false");
"""

sql """
CREATE TABLE IF NOT EXISTS ${tableName2} (`id` int)
ENGINE = olap unique KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES
("replication_allocation" = "tag.location.default: 1", "binlog.enable" = "true", "enable_unique_key_merge_on_write" = "false");
"""

sql """
CREATE TABLE IF NOT EXISTS ${tableName3}
(
id int,
name varchar(20)
) ENGINE = olap
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"binlog.enable" = "true", "enable_unique_key_merge_on_write" = "false","replication_allocation" = "tag.location.default: 1");
"""

sql """
CREATE TABLE IF NOT EXISTS ${tableName4}
(
id int,
name varchar(20)
) ENGINE = olap
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"binlog.enable" = "true", "enable_unique_key_merge_on_write" = "false","replication_allocation" = "tag.location.default: 1");
"""

sql """ insert into ${tableName1} values (1, '2017-03-31', 'a'), (2, '2017-02-28', 'b'), (3, '2017-02-28', 'c'); """
sql """ insert into ${tableName2} values (3),(4),(5); """
sql """ insert into ${tableName3} values (111, 'aa'),(222, 'bb'),(333, 'cc'); """
sql """ insert into ${tableName4} values (12, 'xx'),(23, 'yy'),(34, 'aa'), (45, 'bb'), (56, 'cc'), (67, 'cc') """

helper.ccrJobDelete()
helper.ccrJobCreate()

assertTrue(helper.checkRestoreFinishTimesOf("${tableName1}", 60))
assertTrue(helper.checkRestoreFinishTimesOf("${tableName2}", 60))
assertTrue(helper.checkRestoreFinishTimesOf("${tableName3}", 60))
assertTrue(helper.checkRestoreFinishTimesOf("${tableName4}", 60))


logger.info("=== Test 0: Db sync ===")
sql "sync"
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName1} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName1}", 3, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName2} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName2}", 3, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName3} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName3}", 3, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName4} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName4}", 6, 30))

logger.info("=== Test 1: insert only ===")
sql """
begin;
insert into ${tableName1} select id, '2017-02-28', 'y1' from ${tableName4} where id = 23;
insert into ${tableName2} select id from ${tableName4} where id = 12;
commit;
"""
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName1} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName1} ", 4, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName2} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName2} ", 4, 30))


logger.info("=== Test 2: insert A + delete B ===")
sql """
set delete_without_partition = true;
begin;
insert into ${tableName2} select id from ${tableName4} where id = 23;
delete from ${tableName1} where user_id = 1;
commit;
"""
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName1} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName1} ", 3, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName2} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName2} ", 5, 30))


logger.info("=== Test 3: insert A + delete B + update B ===")
sql """
set delete_without_partition = true;
begin;
insert into ${tableName2} select id from ${tableName4} where id = 34;
delete from ${tableName1} where user_id = 2;
update ${tableName1} set city = 'new' where user_id = 3;
commit;
"""
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName1} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName1} where city = 'new'", 1, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName2} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName2} where id = 34", 1, 30))



logger.info("=== Test 4: insert A + update B + delete C ===")
sql """
begin;
insert into ${tableName1} select id,'2017-03-01','xyz' from ${tableName4} where id = 45;
delete from ${tableName2} where id = 34;
update ${tableName3} set name = 'new' where id = 111;
commit;
"""
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName1} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName1} where city = 'xyz' and date = '2017-03-01'", 1, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName2} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName2}", 5, 30))
assertTrue(helper.checkShowTimesOf("SELECT * FROM ${tableName3} ", exist, 60, "target"))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName3} where name = 'new'", 1, 30))
}


Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("test_txn_insert") {
suite("test_txn_insert_table") {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", "helper.groovy"))

Expand Down
Loading