Skip to content

Commit

Permalink
support where sql for the compaction action of flink.
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed May 27, 2024
1 parent 0c0b2cd commit 46ce303
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Collections;
import java.util.Map;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* Stay compatible with 1.18 procedure which doesn't support named argument. Usage:
*
Expand Down Expand Up @@ -56,7 +58,7 @@ public String[] call(ProcedureContext procedureContext, String tableId) throws E

public String[] call(ProcedureContext procedureContext, String tableId, String partitions)
throws Exception {
return call(procedureContext, tableId, partitions, "", "", "");
return call(procedureContext, tableId, partitions, "", "", "", "");
}

public String[] call(
Expand All @@ -66,7 +68,7 @@ public String[] call(
String orderStrategy,
String orderByColumns)
throws Exception {
return call(procedureContext, tableId, partitions, orderStrategy, orderByColumns, "");
return call(procedureContext, tableId, partitions, orderStrategy, orderByColumns, "", "");
}

public String[] call(
Expand All @@ -75,8 +77,12 @@ public String[] call(
String partitions,
String orderStrategy,
String orderByColumns,
String tableOptions)
String tableOptions,
String whereSql)
throws Exception {
checkArgument(
StringUtils.isBlank(partitions) || StringUtils.isBlank(whereSql),
"partitions and where cannot be used together.");
String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
Map<String, String> tableConf =
Expand Down Expand Up @@ -115,6 +121,10 @@ public String[] call(
action.withPartitions(ParameterUtils.getPartitions(partitions.split(";")));
}

if (!StringUtils.isBlank(whereSql)) {
action.withWhereSql(whereSql);
}

return execute(procedureContext, action, jobName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
Expand All @@ -41,6 +45,8 @@ public class CompactAction extends TableActionBase {

private List<Map<String, String>> partitions;

private String whereSql;

public CompactAction(String warehouse, String database, String tableName) {
this(warehouse, database, tableName, Collections.emptyMap(), Collections.emptyMap());
}
Expand Down Expand Up @@ -72,8 +78,13 @@ public CompactAction withPartitions(List<Map<String, String>> partitions) {
return this;
}

public CompactAction withWhereSql(String whereSql) {
this.whereSql = whereSql;
return this;
}

@Override
public void build() {
public void build() throws Exception {
ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
Expand All @@ -100,6 +111,7 @@ private void buildForTraditionalCompaction(
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);

sourceBuilder.withPartitions(partitions);
sourceBuilder.withWhereSql(whereSql);
DataStreamSource<RowData> source =
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
sinkBuilder.withInput(source).build();
Expand All @@ -111,6 +123,7 @@ private void buildForUnawareBucketCompaction(
new UnawareBucketCompactionTopoBuilder(env, identifier.getFullName(), table);

unawareBucketCompactionTopoBuilder.withPartitions(partitions);
unawareBucketCompactionTopoBuilder.withWhereSql(whereSql);
unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
unawareBucketCompactionTopoBuilder.build();
}
Expand All @@ -124,4 +137,19 @@ public void run() throws Exception {
public List<Map<String, String>> getPartitions() {
return partitions;
}

public String getWhereSql() {
return whereSql;
}

// Check whether predicate contain non parition key.
public static void checkPredicateOnlyFilterParition(Predicate predicate, Table table) {
if (predicate != null) {
PartitionPredicateVisitor partitionPredicateVisitor =
new PartitionPredicateVisitor(table.partitionKeys());
Preconditions.checkArgument(
predicate.visit(partitionPredicateVisitor),
"Only parition key can be specialized in compaction action.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.utils.Preconditions;

import org.apache.flink.api.java.tuple.Tuple3;

import java.util.List;
Expand All @@ -32,13 +34,20 @@ public class CompactActionFactory implements ActionFactory {
private static final String ORDER_STRATEGY = "order_strategy";
private static final String ORDER_BY = "order_by";

private static final String WHERE = "where";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {

Preconditions.checkArgument(
params.has(PARTITION) || params.has(WHERE),
"partitions and where cannot be used together.");

Tuple3<String, String, String> tablePath = getTablePath(params);

Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
Expand Down Expand Up @@ -67,6 +76,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
if (params.has(PARTITION)) {
List<Map<String, String>> partitions = getPartitions(params);
action.withPartitions(partitions);
} else if (params.has(WHERE)) {
action.withWhereSql(params.get(WHERE));
}

return Optional.of(action);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
Expand All @@ -28,6 +29,7 @@
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;

import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -68,7 +70,7 @@ public void run() throws Exception {
}

@Override
public void build() {
public void build() throws SqlParseException {
// only support batch sort yet
if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
!= RuntimeExecutionMode.BATCH) {
Expand All @@ -92,15 +94,22 @@ public void build() {
identifier.getObjectName())
.asSummaryString());

Predicate partitionPredicate = null;
if (getPartitions() != null) {
Predicate partitionPredicate =
partitionPredicate =
PredicateBuilder.or(
getPartitions().stream()
.map(p -> PredicateBuilder.partition(p, table.rowType()))
.toArray(Predicate[]::new));
sourceBuilder.predicate(partitionPredicate);
} else if (getWhereSql() != null) {
SimpleSqlPredicateConvertor convertor =
new SimpleSqlPredicateConvertor(table.rowType());
convertor.convertSqlToPredicate(getWhereSql());
}

checkPredicateOnlyFilterParition(partitionPredicate, table);
sourceBuilder.predicate(partitionPredicate);

String scanParallelism = tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
if (scanParallelism != null) {
sourceBuilder.sourceParallelism(Integer.parseInt(scanParallelism));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@

import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
import org.apache.paimon.flink.source.BucketUnawareCompactSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;

import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.UncheckedExecutionException;

import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -53,6 +59,8 @@ public class UnawareBucketCompactionTopoBuilder {
private final String tableIdentifier;
private final FileStoreTable table;
@Nullable private List<Map<String, String>> specifiedPartitions = null;

@Nullable private String whereSql;
private boolean isContinuous = false;

public UnawareBucketCompactionTopoBuilder(
Expand All @@ -70,6 +78,10 @@ public void withPartitions(List<Map<String, String>> partitions) {
this.specifiedPartitions = partitions;
}

public void withWhereSql(String whereSql) {
this.whereSql = whereSql;
}

public void build() {
// build source from UnawareSourceFunction
DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
Expand All @@ -90,14 +102,23 @@ public DataStream<Committable> fetchUncommitted(String commitUser) {

private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis();
Predicate predicate = null;
if (specifiedPartitions != null) {
predicate = PredicateBuilder.partitions(specifiedPartitions, table.rowType());
} else if (whereSql != null) {
SimpleSqlPredicateConvertor convertor =
new SimpleSqlPredicateConvertor(table.rowType());
try {
predicate = convertor.convertSqlToPredicate(whereSql);
} catch (SqlParseException e) {
throw new UncheckedExecutionException(e);
}
}

// Check whether predicate contain non parition key.
CompactAction.checkPredicateOnlyFilterParition(predicate, table);
BucketUnawareCompactSource source =
new BucketUnawareCompactSource(
table,
isContinuous,
scanInterval,
specifiedPartitions != null
? PredicateBuilder.partitions(specifiedPartitions, table.rowType())
: null);
new BucketUnawareCompactSource(table, isContinuous, scanInterval, predicate);

return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ public Predicate convert(SqlBasicCall sqlBasicCall) {
case IN:
{
int index = getfieldIndex(left.toString());
SqlNodeList Elementslist = (SqlNodeList) right;
SqlNodeList elementslist = (SqlNodeList) right;

List<Object> list = new ArrayList<>();
for (SqlNode sqlNode : Elementslist) {
for (SqlNode sqlNode : elementslist) {
Object literal =
TypeUtils.castFromString(
((SqlLiteral) sqlNode).toValue(),
Expand Down Expand Up @@ -145,7 +145,8 @@ public Predicate visitBiFunction(
((SqlLiteral) left).toValue(), rowType.getFieldTypes().get(index)));
}

throw new UnsupportedOperationException(String.format("%s or %s not been supported.", left, right));
throw new UnsupportedOperationException(
String.format("%s or %s not been supported.", left, right));
}

public int getfieldIndex(String field) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import static org.apache.paimon.utils.ParameterUtils.getPartitions;
import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.StringUtils.isBlank;

/** Compact procedure. */
Expand All @@ -51,16 +52,22 @@ public class CompactProcedure extends ProcedureBase {
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(name = "order_by", type = @DataTypeHint("STRING"), isOptional = true),
@ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true)
@ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true),
@ArgumentHint(name = "where", type = @DataTypeHint("STRING"), isOptional = true)
})
public String[] call(
ProcedureContext procedureContext,
String tableId,
String partitions,
String orderStrategy,
String orderByColumns,
String tableOptions)
String tableOptions,
String where)
throws Exception {
checkArgument(
isBlank(partitions) || isBlank(where),
"partitions and where cannot be used together.");

String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
Map<String, String> tableConf =
Expand Down Expand Up @@ -99,6 +106,10 @@ public String[] call(
action.withPartitions(getPartitions(partitions.split(";")));
}

if (!(isBlank(where))) {
action.withWhereSql(where);
}

return execute(procedureContext, action, jobName);
}

Expand Down
Loading

0 comments on commit 46ce303

Please sign in to comment.