Skip to content

Commit

Permalink
[fix](suites) Fix syncer ingest binlog with multiple replicas (#44444)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored and Your Name committed Nov 22, 2024
1 parent eeda1ce commit 152e50f
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,13 @@ class Syncer {
// step 3.2: get partition/indexId/tabletId
partitionSQl += "/" + meta.indexId.toString()
sqlInfo = sendSql.call(partitionSQl, toSrc)
Map<Long, Long> replicaMap = Maps.newHashMap()
for (List<Object> row : sqlInfo) {
meta.tabletMeta.put(row[0] as Long, row[2] as Long)
Long tabletId = row[0] as Long
if (!meta.tabletMeta.containsKey(tabletId)) {
meta.tabletMeta.put(tabletId, new TabletMeta())
}
meta.tabletMeta[tabletId].replicas.put(row[1] as Long, row[2] as Long)
}
if (meta.tabletMeta.isEmpty()) {
logger.error("Target cluster get (partitionId/indexId)-(${info.key}/${meta.indexId}) tabletIds fault.")
Expand Down Expand Up @@ -816,49 +821,57 @@ class Syncer {
while (srcTabletIter.hasNext()) {
Entry srcTabletMap = srcTabletIter.next()
Entry tarTabletMap = tarTabletIter.next()
TabletMeta srcTabletMeta = srcTabletMap.value
TabletMeta tarTabletMeta = tarTabletMap.value

Iterator srcReplicaIter = srcTabletMeta.replicas.iterator()
Iterator tarReplicaIter = tarTabletMeta.replicas.iterator()
while (srcReplicaIter.hasNext()) {
Entry srcReplicaMap = srcReplicaIter.next()
Entry tarReplicaMap = tarReplicaIter.next()
BackendClientImpl srcClient = context.sourceBackendClients.get(srcReplicaMap.value)
if (srcClient == null) {
logger.error("Can't find src tabletId-${srcReplicaMap.key} -> beId-${srcReplicaMap.value}")
return false
}
BackendClientImpl tarClient = context.targetBackendClients.get(tarReplicaMap.value)
if (tarClient == null) {
logger.error("Can't find target tabletId-${tarReplicaMap.key} -> beId-${tarReplicaMap.value}")
return false
}

BackendClientImpl srcClient = context.sourceBackendClients.get(srcTabletMap.value)
if (srcClient == null) {
logger.error("Can't find src tabletId-${srcTabletMap.key} -> beId-${srcTabletMap.value}")
return false
}
BackendClientImpl tarClient = context.targetBackendClients.get(tarTabletMap.value)
if (tarClient == null) {
logger.error("Can't find target tabletId-${tarTabletMap.key} -> beId-${tarTabletMap.value}")
return false
}

tarPartition.value.version = srcPartition.value.version
long partitionId = fakePartitionId == -1 ? tarPartition.key : fakePartitionId
long version = fakeVersion == -1 ? partitionRecord.version : fakeVersion

TIngestBinlogRequest request = new TIngestBinlogRequest()
TUniqueId uid = new TUniqueId(-1, -1)
request.setTxnId(txnId)
request.setRemoteTabletId(srcTabletMap.key)
request.setBinlogVersion(version)
request.setRemoteHost(srcClient.address.hostname)
request.setRemotePort(srcClient.httpPort.toString())
request.setPartitionId(partitionId)
request.setLocalTabletId(tarTabletMap.key)
request.setLoadId(uid)
logger.info("request -> ${request}")
TIngestBinlogResult result = tarClient.client.ingestBinlog(request)
if (!checkIngestBinlog(result)) {
logger.error("Ingest binlog error! result: ${result}")
return false
}
tarPartition.value.version = srcPartition.value.version
long partitionId = fakePartitionId == -1 ? tarPartition.key : fakePartitionId
long version = fakeVersion == -1 ? partitionRecord.version : fakeVersion

TIngestBinlogRequest request = new TIngestBinlogRequest()
TUniqueId uid = new TUniqueId(-1, -1)
request.setTxnId(txnId)
request.setRemoteTabletId(srcTabletMap.key)
request.setBinlogVersion(version)
request.setRemoteHost(srcClient.address.hostname)
request.setRemotePort(srcClient.httpPort.toString())
request.setPartitionId(partitionId)
request.setLocalTabletId(tarTabletMap.key)
request.setLoadId(uid)
logger.info("request -> ${request}")
TIngestBinlogResult result = tarClient.client.ingestBinlog(request)
if (!checkIngestBinlog(result)) {
logger.error("Ingest binlog error! result: ${result}")
return false
}

if (context.txnInsert) {
List<TTabletCommitInfo> tabletCommitInfos = subTxnIdToTabletCommitInfos.get(txnId)
if (tabletCommitInfos == null) {
tabletCommitInfos = new ArrayList<TTabletCommitInfo>()
subTxnIdToTabletCommitInfos.put(txnId, tabletCommitInfos)
subTxnIdToTableId.put(txnId, tarTableMeta.id)
if (context.txnInsert) {
List<TTabletCommitInfo> tabletCommitInfos = subTxnIdToTabletCommitInfos.get(txnId)
if (tabletCommitInfos == null) {
tabletCommitInfos = new ArrayList<TTabletCommitInfo>()
subTxnIdToTabletCommitInfos.put(txnId, tabletCommitInfos)
subTxnIdToTableId.put(txnId, tarTableMeta.id)
}
tabletCommitInfos.add(new TTabletCommitInfo(tarTabletMap.key, tarReplicaMap.value))
} else {
addCommitInfo(tarTabletMap.key, tarReplicaMap.value)
}
tabletCommitInfos.add(new TTabletCommitInfo(tarTabletMap.key, tarTabletMap.value))
} else {
addCommitInfo(tarTabletMap.key, tarTabletMap.value)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,22 @@ import groovy.util.logging.Slf4j

import java.sql.Connection

class TabletMeta {
public TreeMap<Long, Long> replicas

TabletMeta() {
this.replicas = new TreeMap<Long, Long>()
}

String toString() {
return "TabletMeta: { replicas: " + replicas.toString() + " }"
}
}

class PartitionMeta {
public long version
public long indexId
public TreeMap<Long, Long> tabletMeta
public TreeMap<Long, TabletMeta> tabletMeta

PartitionMeta(long indexId, long version) {
this.indexId = indexId
Expand Down Expand Up @@ -219,6 +231,19 @@ class SyncerContext {
} else if (srcTabletMeta.size() != tarTabletMeta.size()) {
return false
}

Iterator srcTabletIter = srcTabletMeta.iterator()
Iterator tarTabletIter = tarTabletMeta.iterator()
while (srcTabletIter.hasNext()) {
Map srcReplicaMap = srcTabletIter.next().value.replicas
Map tarReplicaMap = tarTabletIter.next().value.replicas

if (srcReplicaMap.isEmpty() || tarReplicaMap.isEmpty()) {
return false
} else if (srcReplicaMap.size() != tarReplicaMap.size()) {
return false
}
}
}
})

Expand Down
36 changes: 18 additions & 18 deletions regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,36 @@ suite("test_mow_ingest_binlog") {
def test_num = 0
sql "DROP TABLE IF EXISTS ${tableName}"
sql """
CREATE TABLE if NOT EXISTS ${tableName}
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"replication_allocation" = "tag.location.default: 1"
)
"""
sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""

target_sql "DROP TABLE IF EXISTS ${tableName}"
target_sql """
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"replication_allocation" = "tag.location.default: 1"
)
"""
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"replication_allocation" = "tag.location.default: 1"
)
"""
assertTrue(syncer.getTargetMeta("${tableName}"))


Expand Down Expand Up @@ -124,4 +124,4 @@ suite("test_mow_ingest_binlog") {

// End Test 2
syncer.closeBackendClients()
}
}

0 comments on commit 152e50f

Please sign in to comment.