Skip to content

Commit

Permalink
[flink] Bypass operator should never block checkpoint (apache#4039)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Aug 23, 2024
1 parent 813cb6f commit 16cf881
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.flink.source.AppendBypassCoordinateOperator;
import org.apache.paimon.flink.source.AppendBypassCoordinateOperatorFactory;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.common.RuntimeExecutionMode;
Expand Down Expand Up @@ -73,7 +73,7 @@ public DataStream<Committable> doWrite(
new EitherTypeInfo<>(
new CommittableTypeInfo(),
new CompactionTaskTypeInfo()),
new AppendBypassCoordinateOperator<>(table))
new AppendBypassCoordinateOperatorFactory<>(table))
.forceNonParallel()
.transform(
"Compact Worker: " + table.name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,26 @@
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorUtils;

import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.types.Either;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory;

/**
* A {@link OneInputStreamOperator} to accept commit messages and send append compact coordinate
Expand All @@ -40,14 +49,24 @@
public class AppendBypassCoordinateOperator<CommitT>
extends AbstractStreamOperator<Either<CommitT, UnawareAppendCompactionTask>>
implements OneInputStreamOperator<CommitT, Either<CommitT, UnawareAppendCompactionTask>>,
ProcessingTimeService.ProcessingTimeCallback {
ProcessingTimeCallback {

private static final long MAX_PENDING_TASKS = 5000;
private static final long EMIT_PER_BATCH = 100;

private final FileStoreTable table;
private final MailboxExecutorImpl mailbox;

private transient UnawareAppendTableCompactionCoordinator coordinator;
private transient ScheduledExecutorService executorService;
private transient LinkedBlockingQueue<UnawareAppendCompactionTask> compactTasks;

public AppendBypassCoordinateOperator(FileStoreTable table) {
public AppendBypassCoordinateOperator(
FileStoreTable table,
ProcessingTimeService processingTimeService,
MailboxExecutor mailbox) {
this.table = table;
this.processingTimeService = processingTimeService;
this.mailbox = (MailboxExecutorImpl) mailbox;
this.chainingStrategy = ChainingStrategy.NEVER;
}

Expand All @@ -57,27 +76,49 @@ public void open() throws Exception {
checkArgument(
getRuntimeContext().getNumberOfParallelSubtasks() == 1,
"Compaction Coordinator parallelism in paimon MUST be one.");
this.coordinator = new UnawareAppendTableCompactionCoordinator(table, true, null);
long intervalMs = table.coreOptions().continuousDiscoveryInterval().toMillis();
getProcessingTimeService().scheduleWithFixedDelay(this, 0, intervalMs);
this.compactTasks = new LinkedBlockingQueue<>();
UnawareAppendTableCompactionCoordinator coordinator =
new UnawareAppendTableCompactionCoordinator(table, true, null);
this.executorService =
Executors.newSingleThreadScheduledExecutor(
newDaemonThreadFactory("Compaction Coordinator"));
this.executorService.scheduleWithFixedDelay(
() -> asyncPlan(coordinator), 0, intervalMs, TimeUnit.MILLISECONDS);
this.getProcessingTimeService().scheduleWithFixedDelay(this, 0, intervalMs);
}

@Override
public void onProcessingTime(long time) {
while (true) {
private void asyncPlan(UnawareAppendTableCompactionCoordinator coordinator) {
while (compactTasks.size() < MAX_PENDING_TASKS) {
List<UnawareAppendCompactionTask> tasks = coordinator.run();
for (UnawareAppendCompactionTask task : tasks) {
output.collect(new StreamRecord<>(Either.Right(task)));
}

compactTasks.addAll(tasks);
if (tasks.isEmpty()) {
break;
}
}
}

@Override
public void onProcessingTime(long time) {
while (mailbox.isIdle()) {
for (int i = 0; i < EMIT_PER_BATCH; i++) {
UnawareAppendCompactionTask task = compactTasks.poll();
if (task == null) {
return;
}
output.collect(new StreamRecord<>(Either.Right(task)));
}
}
}

@Override
public void processElement(StreamRecord<CommitT> record) throws Exception {
output.collect(new StreamRecord<>(Either.Left(record.getValue())));
}

@Override
public void close() throws Exception {
ExecutorUtils.gracefulShutdown(1, TimeUnit.MINUTES, executorService);
super.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.source;

import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.types.Either;

/** Factory of {@link AppendBypassCoordinateOperator}. */
public class AppendBypassCoordinateOperatorFactory<CommitT>
extends AbstractStreamOperatorFactory<Either<CommitT, UnawareAppendCompactionTask>>
implements YieldingOperatorFactory<Either<CommitT, UnawareAppendCompactionTask>>,
OneInputStreamOperatorFactory<
CommitT, Either<CommitT, UnawareAppendCompactionTask>> {

private final FileStoreTable table;

public AppendBypassCoordinateOperatorFactory(FileStoreTable table) {
this.table = table;
}

@Override
public <T extends StreamOperator<Either<CommitT, UnawareAppendCompactionTask>>>
T createStreamOperator(
StreamOperatorParameters<Either<CommitT, UnawareAppendCompactionTask>>
parameters) {
AppendBypassCoordinateOperator<CommitT> operator =
new AppendBypassCoordinateOperator<>(
table, processingTimeService, getMailboxExecutor());
operator.setup(
parameters.getContainingTask(),
parameters.getStreamConfig(),
parameters.getOutput());
return (T) operator;
}

@Override
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return AppendBypassCoordinateOperator.class;
}
}

0 comments on commit 16cf881

Please sign in to comment.