Skip to content

Commit

Permalink
[flink] Support truncate table for flink (apache#2139)
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs authored Nov 8, 2023
1 parent f09f63f commit 99e28b1
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 189 deletions.
7 changes: 7 additions & 0 deletions paimon-flink/paimon-flink-1.17/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ under the License.
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.table.Table;

import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.factories.DynamicTableFactory;

import javax.annotation.Nullable;

/** Table sink to create sink. */
public class FlinkTableSink extends SupportsRowLevelOperationFlinkTableSink {

public FlinkTableSink(
ObjectIdentifier tableIdentifier,
Table table,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
super(tableIdentifier, table, context, logStoreTableFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.table.Table;

import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.factories.DynamicTableFactory;

import javax.annotation.Nullable;

/** Table sink to create sink. */
public class FlinkTableSink extends SupportsRowLevelOperationFlinkTableSink {

public FlinkTableSink(
ObjectIdentifier tableIdentifier,
Table table,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
super(tableIdentifier, table, context, logStoreTableFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,23 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.AllPrimaryKeyEqualVisitor;
import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.PrimaryKeyFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableUtils;
import org.apache.paimon.table.sink.BatchWriteBuilder;

import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.RowLevelModificationScanContext;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.connector.sink.abilities.SupportsTruncate;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.logical.RowType;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.MERGE_ENGINE;

/** Table sink to create sink. */
public class FlinkTableSink extends FlinkTableSinkBase
implements SupportsRowLevelUpdate, SupportsRowLevelDelete, SupportsDeletePushDown {

@Nullable protected Predicate deletePredicate;
public class FlinkTableSink extends SupportsRowLevelOperationFlinkTableSink
implements SupportsTruncate {

public FlinkTableSink(
ObjectIdentifier tableIdentifier,
Expand All @@ -76,163 +45,10 @@ public FlinkTableSink(
}

@Override
public DynamicTableSink copy() {
FlinkTableSink copied =
new FlinkTableSink(tableIdentifier, table, context, logStoreTableFactory);
copied.staticPartitions = new HashMap<>(staticPartitions);
copied.overwrite = overwrite;
copied.deletePredicate = deletePredicate;
return copied;
}

@Override
public RowLevelUpdateInfo applyRowLevelUpdate(
List<Column> updatedColumns, @Nullable RowLevelModificationScanContext context) {
// Since only UPDATE_AFTER type messages can be received at present,
// AppendOnlyFileStoreTable cannot correctly handle old data, so they are marked as
// unsupported. Similarly, it is not allowed to update the primary key column when updating
// the column of PrimaryKeyFileStoreTable, because the old data cannot be handled
// correctly.
if (table instanceof PrimaryKeyFileStoreTable) {
Options options = Options.fromMap(table.options());
Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
updatedColumns.forEach(
column -> {
if (primaryKeys.contains(column.getName())) {
String errMsg =
String.format(
"Updates to primary keys are not supported, primaryKeys (%s), updatedColumns (%s)",
primaryKeys,
updatedColumns.stream()
.map(Column::getName)
.collect(Collectors.toList()));
throw new UnsupportedOperationException(errMsg);
}
});
if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE
|| options.get(MERGE_ENGINE) == MergeEngine.PARTIAL_UPDATE) {
// Even with partial-update we still need all columns. Because the topology
// structure is source -> cal -> constraintEnforcer -> sink, in the
// constraintEnforcer operator, the constraint check will be performed according to
// the index, not according to the column. So we can't return only some columns,
// which will cause problems like ArrayIndexOutOfBoundsException.
// TODO: return partial columns after FLINK-32001 is resolved.
return new RowLevelUpdateInfo() {};
}
throw new UnsupportedOperationException(
String.format(
"%s can not support update, currently only %s of %s and %s can support update.",
options.get(MERGE_ENGINE),
MERGE_ENGINE.key(),
MergeEngine.DEDUPLICATE,
MergeEngine.PARTIAL_UPDATE));
} else if (table instanceof AppendOnlyFileStoreTable) {
throw new UnsupportedOperationException(
String.format(
"%s can not support update, because there is no primary key.",
table.getClass().getName()));
} else {
throw new UnsupportedOperationException(
String.format(
"%s can not support update, because it is an unknown subclass of FileStoreTable.",
table.getClass().getName()));
}
}

@Override
public RowLevelDeleteInfo applyRowLevelDelete(
@Nullable RowLevelModificationScanContext rowLevelModificationScanContext) {
validateDeletable();
return new RowLevelDeleteInfo() {};
}

// supported filters push down please refer DeletePushDownVisitorTest

@Override
public boolean applyDeleteFilters(List<ResolvedExpression> list) {
validateDeletable();
List<Predicate> predicates = new ArrayList<>();
RowType rowType = LogicalTypeConversion.toLogicalType(table.rowType());
for (ResolvedExpression filter : list) {
Optional<Predicate> predicate = PredicateConverter.convert(rowType, filter);
if (predicate.isPresent()) {
predicates.add(predicate.get());
} else {
// convert failed, leave it to flink
return false;
}
}
deletePredicate = predicates.isEmpty() ? null : PredicateBuilder.and(predicates);
return canPushDownDeleteFilter();
}

@Override
public Optional<Long> executeDeletion() {
public void executeTruncation() {
FileStoreCommit commit =
((AbstractFileStoreTable) table).store().newCommit(UUID.randomUUID().toString());
long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
if (deletePredicate == null) {
commit.purgeTable(identifier);
return Optional.empty();
} else if (deleteIsDropPartition()) {
commit.dropPartitions(Collections.singletonList(deletePartitions()), identifier);
return Optional.empty();
} else {
return Optional.of(
TableUtils.deleteWhere(table, Collections.singletonList(deletePredicate)));
}
}

private void validateDeletable() {
if (table instanceof PrimaryKeyFileStoreTable) {
Options options = Options.fromMap(table.options());
if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE) {
return;
}
throw new UnsupportedOperationException(
String.format(
"merge engine '%s' can not support delete, currently only %s can support delete.",
options.get(MERGE_ENGINE), MergeEngine.DEDUPLICATE));
} else if (table instanceof AppendOnlyFileStoreTable) {
throw new UnsupportedOperationException(
String.format(
"table '%s' can not support delete, because there is no primary key.",
table.getClass().getName()));
} else {
throw new UnsupportedOperationException(
String.format(
"%s can not support delete, because it is an unknown subclass of FileStoreTable.",
table.getClass().getName()));
}
}

private boolean canPushDownDeleteFilter() {
return deletePredicate == null || deleteIsDropPartition() || deleteInSingleNode();
}

private boolean deleteIsDropPartition() {
if (deletePredicate == null) {
return false;
}
return deletePredicate.visit(new OnlyPartitionKeyEqualVisitor(table.partitionKeys()));
}

private Map<String, String> deletePartitions() {
if (deletePredicate == null) {
return null;
}
OnlyPartitionKeyEqualVisitor visitor =
new OnlyPartitionKeyEqualVisitor(table.partitionKeys());
deletePredicate.visit(visitor);
return visitor.partitions();
}

private boolean deleteInSingleNode() {
if (deletePredicate == null) {
return false;
}
return deletePredicate
.visit(new AllPrimaryKeyEqualVisitor(table.primaryKeys()))
.containsAll(table.primaryKeys());
commit.purgeTable(identifier);
}
}
Loading

0 comments on commit 99e28b1

Please sign in to comment.