Skip to content

Commit

Permalink
[flink] Postpone sink materializer check from compile time to runtime (
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Mar 6, 2024
1 parent 1502e83 commit d61ed7e
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import java.io.Serializable;

/** A serializable {@link Runnable}. */
@FunctionalInterface
public interface SerializableRunnable extends Runnable, Serializable {}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableRunnable;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.operators.SlotSharingGroup;
Expand Down Expand Up @@ -82,7 +83,18 @@ public FlinkSink(FileStoreTable table, boolean ignorePreviousFiles) {
}

private StoreSinkWrite.Provider createWriteProvider(
CheckpointConfig checkpointConfig, boolean isStreaming) {
CheckpointConfig checkpointConfig, boolean isStreaming, boolean hasSinkMaterializer) {
SerializableRunnable assertNoSinkMaterializer =
() ->
Preconditions.checkArgument(
!hasSinkMaterializer,
String.format(
"Sink materializer must not be used with Paimon sink. "
+ "Please set '%s' to '%s' in Flink's config.",
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE
.key(),
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));

boolean waitCompaction;
if (table.coreOptions().writeOnly()) {
waitCompaction = false;
Expand All @@ -107,32 +119,36 @@ private StoreSinkWrite.Provider createWriteProvider(

if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) {
int finalDeltaCommits = Math.max(deltaCommits, 1);
return (table, commitUser, state, ioManager, memoryPool, metricGroup) ->
new GlobalFullCompactionSinkWrite(
table,
commitUser,
state,
ioManager,
ignorePreviousFiles,
waitCompaction,
finalDeltaCommits,
isStreaming,
memoryPool,
metricGroup);
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
return new GlobalFullCompactionSinkWrite(
table,
commitUser,
state,
ioManager,
ignorePreviousFiles,
waitCompaction,
finalDeltaCommits,
isStreaming,
memoryPool,
metricGroup);
};
}
}

return (table, commitUser, state, ioManager, memoryPool, metricGroup) ->
new StoreSinkWriteImpl(
table,
commitUser,
state,
ioManager,
ignorePreviousFiles,
waitCompaction,
isStreaming,
memoryPool,
metricGroup);
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
return new StoreSinkWriteImpl(
table,
commitUser,
state,
ioManager,
ignorePreviousFiles,
waitCompaction,
isStreaming,
memoryPool,
metricGroup);
};
}

public DataStreamSink<?> sinkFrom(DataStream<T> input) {
Expand All @@ -146,37 +162,31 @@ public DataStreamSink<?> sinkFrom(DataStream<T> input) {
}

public DataStreamSink<?> sinkFrom(DataStream<T> input, String initialCommitUser) {
assertNoSinkMaterializer(input);

// do the actually writing action, no snapshot generated in this stage
DataStream<Committable> written = doWrite(input, initialCommitUser, input.getParallelism());

// commit the committable to generate a new snapshot
return doCommit(written, initialCommitUser);
}

private void assertNoSinkMaterializer(DataStream<T> input) {
private boolean hasSinkMaterializer(DataStream<T> input) {
// traverse the transformation graph with breadth first search
Set<Integer> visited = new HashSet<>();
Queue<Transformation<?>> queue = new LinkedList<>();
queue.add(input.getTransformation());
visited.add(input.getTransformation().getId());
while (!queue.isEmpty()) {
Transformation<?> transformation = queue.poll();
Preconditions.checkArgument(
!transformation.getName().startsWith("SinkMaterializer"),
String.format(
"Sink materializer must not be used with Paimon sink. "
+ "Please set '%s' to '%s' in Flink's config.",
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE.key(),
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));
if (transformation.getName().startsWith("SinkMaterializer")) {
return true;
}
for (Transformation<?> prev : transformation.getInputs()) {
if (!visited.contains(prev.getId())) {
queue.add(prev);
visited.add(prev.getId());
}
}
}
return false;
}

public DataStream<Committable> doWrite(
Expand All @@ -195,7 +205,10 @@ public DataStream<Committable> doWrite(
+ table.name(),
new CommittableTypeInfo(),
createWriteOperator(
createWriteProvider(env.getCheckpointConfig(), isStreaming),
createWriteProvider(
env.getCheckpointConfig(),
isStreaming,
hasSinkMaterializer(input)),
commitUser))
.setParallelism(parallelism == null ? input.getParallelism() : parallelism);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.utils.BlockingIterator;

import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -336,16 +337,20 @@ public void testNoSinkMaterializer() {
.set(
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
ExecutionConfigOptions.UpsertMaterialize.FORCE);
try (CloseableIterator<Row> ignored =
streamSqlIter(
"INSERT INTO dwd_orders "
+ "SELECT OrderID, OrderNumber, PersonID, CAST(NULL AS STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders "
+ "UNION ALL "
+ "SELECT OrderID, CAST(NULL AS INT), dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders ON dim_persons.PersonID = ods_orders.PersonID;")) {
sEnv.getConfig().set(RestartStrategyOptions.RESTART_STRATEGY, "none");
String sql =
"INSERT INTO dwd_orders "
+ "SELECT OrderID, OrderNumber, PersonID, CAST(NULL AS STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders "
+ "UNION ALL "
+ "SELECT OrderID, CAST(NULL AS INT), dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders ON dim_persons.PersonID = ods_orders.PersonID;";
try {
sEnv.executeSql(sql).await();
fail("Expecting exception");
} catch (Exception e) {
assertThat(e)
.hasMessageContaining("Sink materializer must not be used with Paimon sink.");
.hasRootCauseMessage(
"Sink materializer must not be used with Paimon sink. "
+ "Please set 'table.exec.sink.upsert-materialize' to 'NONE' in Flink's config.");
}
}

Expand Down

0 comments on commit d61ed7e

Please sign in to comment.