Skip to content

Commit

Permalink
[hotfix] Clean redundant dynamic option copy code in cdc action (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuangchong authored Jan 8, 2024
1 parent 75b1cf5 commit d59f119
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,9 @@ public class CdcMultiplexRecordChannelComputer implements ChannelComputer<CdcMul

private Map<Identifier, CdcRecordChannelComputer> channelComputers;
private Catalog catalog;
private final Map<String, String> dynamicOptions;

public CdcMultiplexRecordChannelComputer(
Catalog.Loader catalogLoader, Map<String, String> dynamicOptions) {
public CdcMultiplexRecordChannelComputer(Catalog.Loader catalogLoader) {
this.catalogLoader = catalogLoader;
this.dynamicOptions = dynamicOptions;
}

@Override
Expand Down Expand Up @@ -76,7 +73,6 @@ private ChannelComputer<CdcRecord> computeChannelComputer(CdcMultiplexRecord rec
FileStoreTable table;
try {
table = (FileStoreTable) catalog.getTable(id);
table.copy(dynamicOptions);
} catch (Catalog.TableNotExistException e) {
LOG.error("Failed to get table " + id.getFullName());
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,16 @@ public class CdcRecordStoreMultiWriteOperator
private Map<Identifier, StoreSinkWrite> writes;
private String commitUser;
private ExecutorService compactExecutor;
private final Map<String, String> dynamicOptions;

public CdcRecordStoreMultiWriteOperator(
Catalog.Loader catalogLoader,
StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
String initialCommitUser,
Options options,
Map<String, String> dynamicOptions) {
Options options) {
super(options);
this.catalogLoader = catalogLoader;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.initialCommitUser = initialCommitUser;
this.dynamicOptions = dynamicOptions;
}

@Override
Expand Down Expand Up @@ -181,7 +178,6 @@ private FileStoreTable getTable(Identifier tableId) throws InterruptedException
while (true) {
try {
table = (FileStoreTable) catalog.getTable(tableId);
table.copy(dynamicOptions);
tables.put(tableId, table);
break;
} catch (Catalog.TableNotExistException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Map;
import java.util.UUID;

import static org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
Expand Down Expand Up @@ -91,30 +90,28 @@ private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
metricGroup);
}

public DataStreamSink<?> sinkFrom(
DataStream<CdcMultiplexRecord> input, Map<String, String> dynamicOptions) {
public DataStreamSink<?> sinkFrom(DataStream<CdcMultiplexRecord> input) {
// This commitUser is valid only for new jobs.
// After the job starts, this commitUser will be recorded into the states of write and
// commit operators.
// When the job restarts, commitUser will be recovered from states and this value is
// ignored.
String initialCommitUser = UUID.randomUUID().toString();
return sinkFrom(input, initialCommitUser, createWriteProvider(), dynamicOptions);
return sinkFrom(input, initialCommitUser, createWriteProvider());
}

public DataStreamSink<?> sinkFrom(
DataStream<CdcMultiplexRecord> input,
String commitUser,
StoreSinkWrite.WithWriteBufferProvider sinkProvider,
Map<String, String> dynamicOptions) {
StoreSinkWrite.WithWriteBufferProvider sinkProvider) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
assertStreamingConfiguration(env);
MultiTableCommittableTypeInfo typeInfo = new MultiTableCommittableTypeInfo();
SingleOutputStreamOperator<MultiTableCommittable> written =
input.transform(
WRITER_NAME,
typeInfo,
createWriteOperator(sinkProvider, commitUser, dynamicOptions))
createWriteOperator(sinkProvider, commitUser))
.setParallelism(input.getParallelism());

// shuffle committables by table
Expand Down Expand Up @@ -144,11 +141,9 @@ public DataStreamSink<?> sinkFrom(
}

protected OneInputStreamOperator<CdcMultiplexRecord, MultiTableCommittable> createWriteOperator(
StoreSinkWrite.WithWriteBufferProvider writeProvider,
String commitUser,
Map<String, String> dynamicOptions) {
StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) {
return new CdcRecordStoreMultiWriteOperator(
catalogLoader, writeProvider, commitUser, new Options(), dynamicOptions);
catalogLoader, writeProvider, commitUser, new Options());
}

// Table committers are dynamically created at runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
@Nullable private Integer parallelism;
private double committerCpu;
@Nullable private MemorySize committerMemory;
private Map<String, String> dynamicOptions;

// Paimon catalog used to check and create tables. There will be two
// places where this catalog is used. 1) in processing function,
Expand Down Expand Up @@ -98,7 +97,6 @@ public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Map<String, String> o
}

public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Options options) {
this.dynamicOptions = options.toMap();
this.parallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM);
this.committerCpu = options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU);
this.committerMemory = options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY);
Expand Down Expand Up @@ -157,12 +155,12 @@ private void buildCombinedCdcSink() {
DataStream<CdcMultiplexRecord> partitioned =
partition(
newlyAddedTableStream,
new CdcMultiplexRecordChannelComputer(catalogLoader, dynamicOptions),
new CdcMultiplexRecordChannelComputer(catalogLoader),
parallelism);

FlinkCdcMultiTableSink sink =
new FlinkCdcMultiTableSink(catalogLoader, committerCpu, committerMemory);
sink.sinkFrom(partitioned, dynamicOptions);
sink.sinkFrom(partitioned);
}

private void buildForFixedBucket(FileStoreTable table, DataStream<CdcRecord> parsed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private void testImpl(Identifier tableId, List<Map<String, String>> input) {

int numChannels = random.nextInt(10) + 1;
CdcMultiplexRecordChannelComputer channelComputer =
new CdcMultiplexRecordChannelComputer(catalogLoader, new HashMap<>());
new CdcMultiplexRecordChannelComputer(catalogLoader);
channelComputer.setup(numChannels);

// assert that insert and delete records are routed into same channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,7 @@ public void testSingleTableCompactionMetrics() throws Exception {
memoryPoolFactory,
metricGroup),
commitUser,
Options.fromMap(new HashMap<>()),
new HashMap<>());
Options.fromMap(new HashMap<>()));
TypeSerializer<CdcMultiplexRecord> inputSerializer = new JavaSerializer<>();
TypeSerializer<MultiTableCommittable> outputSerializer =
new MultiTableCommittableTypeInfo().createSerializer(new ExecutionConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -60,7 +59,7 @@ public void cancel() {}
() -> FlinkCatalogFactory.createPaimonCatalog(new Options()),
FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue(),
null);
DataStreamSink<?> dataStreamSink = sink.sinkFrom(input, Collections.emptyMap());
DataStreamSink<?> dataStreamSink = sink.sinkFrom(input);

// check the transformation graph
LegacySinkTransformation<?> end =
Expand Down

0 comments on commit d59f119

Please sign in to comment.