Skip to content

Commit

Permalink
[flink] Fix CompactDatabaseAction hasn't copied dynamic options in co…
Browse files Browse the repository at this point in the history
…mbined mode
  • Loading branch information
yuzelin committed Apr 2, 2024
1 parent 6921d41 commit b4f4097
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.manifest.WrappedManifestCommittable;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.table.data.RowData;

import java.io.Serializable;
import java.util.Map;
import java.util.UUID;

import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
Expand Down Expand Up @@ -172,8 +174,10 @@ protected OneInputStreamOperator<RowData, MultiTableCommittable> createWriteOper

protected Committer.Factory<MultiTableCommittable, WrappedManifestCommittable>
createCommitterFactory() {
Map<String, String> dynamicOptions = options.toMap();
dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
return (user, metricGroup) ->
new StoreMultiCommitter(catalogLoader, user, metricGroup, true);
new StoreMultiCommitter(catalogLoader, user, metricGroup, true, dynamicOptions);
}

protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.manifest.ManifestCommittable;
Expand Down Expand Up @@ -57,26 +56,29 @@ public class StoreMultiCommitter
// referenced by table id.
private final Map<Identifier, StoreCommitter> tableCommitters;

// compact job needs set "write-only" of a table to false
private final boolean isCompactJob;
// Currently, only compact_database job needs to ignore empty commit and set dynamic options
private final boolean ignoreEmptyCommit;
private final Map<String, String> dynamicOptions;

public StoreMultiCommitter(
Catalog.Loader catalogLoader,
String commitUser,
@Nullable OperatorMetricGroup flinkMetricGroup) {
this(catalogLoader, commitUser, flinkMetricGroup, false);
this(catalogLoader, commitUser, flinkMetricGroup, false, Collections.emptyMap());
}

public StoreMultiCommitter(
Catalog.Loader catalogLoader,
String commitUser,
@Nullable OperatorMetricGroup flinkMetricGroup,
boolean isCompactJob) {
boolean ignoreEmptyCommit,
Map<String, String> dynamicOptions) {
this.catalog = catalogLoader.load();
this.commitUser = commitUser;
this.flinkMetricGroup = flinkMetricGroup;
this.ignoreEmptyCommit = ignoreEmptyCommit;
this.dynamicOptions = dynamicOptions;
this.tableCommitters = new HashMap<>();
this.isCompactJob = isCompactJob;
}

@Override
Expand Down Expand Up @@ -191,13 +193,7 @@ private StoreCommitter getStoreCommitter(Identifier tableId) {
if (committer == null) {
FileStoreTable table;
try {
table = (FileStoreTable) catalog.getTable(tableId);
if (isCompactJob) {
table =
table.copy(
Collections.singletonMap(
CoreOptions.WRITE_ONLY.key(), "false"));
}
table = (FileStoreTable) catalog.getTable(tableId).copy(dynamicOptions);
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(
String.format(
Expand All @@ -206,7 +202,7 @@ private StoreCommitter getStoreCommitter(Identifier tableId) {
}
committer =
new StoreCommitter(
table.newCommit(commitUser).ignoreEmptyCommit(isCompactJob),
table.newCommit(commitUser).ignoreEmptyCommit(ignoreEmptyCommit),
flinkMetricGroup);
tableCommitters.put(tableId, committer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.SnapshotManager;

import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -294,8 +295,8 @@ public void testStreamingCompact(String mode) throws Exception {
() ->
snapshotManager.latestSnapshotId() - 2
== snapshotManager.earliestSnapshotId(),
Duration.ofSeconds(60_000),
Duration.ofSeconds(100),
Duration.ofSeconds(60),
Duration.ofMillis(100),
String.format(
"Cannot validate snapshot expiration in %s milliseconds.", 60_000));
write.close();
Expand Down Expand Up @@ -387,8 +388,8 @@ public void testStreamingCompact(String mode) throws Exception {
() ->
snapshotManager.latestSnapshotId() - 2
== snapshotManager.earliestSnapshotId(),
Duration.ofSeconds(60_000),
Duration.ofSeconds(100),
Duration.ofSeconds(60),
Duration.ofMillis(100),
String.format(
"Cannot validate snapshot expiration in %s milliseconds.", 60_000));
write.close();
Expand Down Expand Up @@ -702,6 +703,66 @@ public void testUnawareBucketBatchCompact() throws Exception {
}
}

@Test
public void testCombinedModeWithDynamicOptions() throws Exception {
// create table and commit data
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.WRITE_ONLY.key(), "true");
options.put(CoreOptions.BUCKET.key(), "1");
options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1000");
FileStoreTable table =
createTable(
"test_db",
"t",
Arrays.asList("dt", "hh"),
Arrays.asList("dt", "hh", "k"),
options);

StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = streamWriteBuilder.newWrite();
commit = streamWriteBuilder.newCommit();

for (int i = 0; i < 10; i++) {
writeData(rowData(1, i, 15, BinaryString.fromString("20221208")));
}
SnapshotManager snapshotManager = table.snapshotManager();
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(10);

// if CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() use default value, the cost
// time in combined mode will be over 1 min
CompactDatabaseAction action =
createAction(
CompactDatabaseAction.class,
"compact_database",
"--warehouse",
warehouse,
"--mode",
"combined",
"--table_conf",
CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s",
// test dynamic options will be copied in commit
"--table_conf",
CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key() + "=3",
"--table_conf",
CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key() + "=3");

StreamExecutionEnvironment env = buildDefaultEnv(true);
action.withStreamExecutionEnvironment(env).build();
JobClient jobClient = env.executeAsync();

CommonTestUtils.waitUtil(
() -> snapshotManager.latestSnapshotId() == 11L,
Duration.ofSeconds(60),
Duration.ofMillis(100));
jobClient.cancel();

Snapshot latest = snapshotManager.latestSnapshot();
Snapshot earliest = snapshotManager.earliestSnapshot();
assertThat(latest.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
assertThat(latest.id() - earliest.id()).isEqualTo(2);
}

private void writeData(
StreamTableWrite write,
StreamTableCommit commit,
Expand Down

0 comments on commit b4f4097

Please sign in to comment.