Skip to content

Commit

Permalink
Flink: support limit pushdown in FLIP-27 source (apache#10748)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenzwu authored Jul 29, 2024
1 parent e0a464f commit f758593
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,8 @@ public IcebergSource<T> build() {
context.caseSensitive(),
table.io(),
table.encryption(),
context.filters());
context.filters(),
context.limit());
this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.source.reader;

import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.FileScanTaskReader;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class LimitableDataIterator<T> extends DataIterator<T> {
private final RecordLimiter limiter;

LimitableDataIterator(
FileScanTaskReader<T> fileScanTaskReader,
CombinedScanTask task,
FileIO io,
EncryptionManager encryption,
RecordLimiter limiter) {
super(fileScanTaskReader, task, io, encryption);
Preconditions.checkArgument(limiter != null, "Invalid record limiter: null");
this.limiter = limiter;
}

@Override
public boolean hasNext() {
if (limiter.reachedLimit()) {
return false;
}

return super.hasNext();
}

@Override
public T next() {
limiter.increment();
return super.next();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.source.reader;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;

@Internal
class RecordLimiter {
private final long limit;
private final AtomicLong counter;

static RecordLimiter create(long limit) {
return new RecordLimiter(limit);
}

private RecordLimiter(long limit) {
this.limit = limit;
this.counter = new AtomicLong(0);
}

public boolean reachedLimit() {
return limit > 0 && counter.get() >= limit;
}

public void increment() {
counter.incrementAndGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> {
private final FileIO io;
private final EncryptionManager encryption;
private final List<Expression> filters;
private final long limit;

private transient RecordLimiter recordLimiter = null;

public RowDataReaderFunction(
ReadableConfig config,
Expand All @@ -49,6 +52,28 @@ public RowDataReaderFunction(
FileIO io,
EncryptionManager encryption,
List<Expression> filters) {
this(
config,
tableSchema,
projectedSchema,
nameMapping,
caseSensitive,
io,
encryption,
filters,
-1L);
}

public RowDataReaderFunction(
ReadableConfig config,
Schema tableSchema,
Schema projectedSchema,
String nameMapping,
boolean caseSensitive,
FileIO io,
EncryptionManager encryption,
List<Expression> filters,
long limit) {
super(
new ArrayPoolDataIteratorBatcher<>(
config,
Expand All @@ -61,19 +86,30 @@ public RowDataReaderFunction(
this.io = io;
this.encryption = encryption;
this.filters = filters;
this.limit = limit;
}

@Override
public DataIterator<RowData> createDataIterator(IcebergSourceSplit split) {
return new DataIterator<>(
return new LimitableDataIterator<>(
new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive, filters),
split.task(),
io,
encryption);
encryption,
lazyLimiter());
}

private static Schema readSchema(Schema tableSchema, Schema projectedSchema) {
Preconditions.checkNotNull(tableSchema, "Table schema can't be null");
return projectedSchema == null ? tableSchema : projectedSchema;
}

/** Lazily create RecordLimiter to avoid the need to make it serializable */
private RecordLimiter lazyLimiter() {
if (recordLimiter == null) {
this.recordLimiter = RecordLimiter.create(limit);
}

return recordLimiter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.util.List;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -49,11 +48,11 @@ public void testFlinkHintConfig() {

@TestTemplate
public void testReadOptionHierarchy() {
// TODO: FLIP-27 source doesn't implement limit pushdown yet
assumeThat(useFlip27Source).isFalse();

getTableEnv().getConfig().set(FlinkReadOptions.LIMIT_OPTION, 1L);
List<Row> result = sql("SELECT * FROM %s", TABLE);
// Note that this query doesn't have the limit clause in the SQL.
// This assertions works because limit is pushed down to the reader and
// reader parallelism is 1.
assertThat(result).hasSize(1);

result = sql("SELECT * FROM %s /*+ OPTIONS('limit'='3')*/", TABLE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.source.reader;

import static org.assertj.core.api.Assertions.assertThat;

import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.encryption.PlaintextEncryptionManager;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class TestLimitableDataIterator {
@TempDir private static Path temporaryFolder;

private final RowDataFileScanTaskReader reader =
new RowDataFileScanTaskReader(
TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true, Collections.emptyList());
private final HadoopFileIO fileIO = new HadoopFileIO(new org.apache.hadoop.conf.Configuration());
private final EncryptionManager encryptionManager = PlaintextEncryptionManager.instance();

private static CombinedScanTask combinedScanTask;
private static int totalRecords;

@BeforeAll
public static void beforeClass() throws Exception {
GenericAppenderFactory appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);
List<List<Record>> recordBatchList =
ReaderUtil.createRecordBatchList(TestFixtures.SCHEMA, 3, 2);
combinedScanTask =
ReaderUtil.createCombinedScanTask(
recordBatchList, temporaryFolder, FileFormat.PARQUET, appenderFactory);
totalRecords = 3 * 2;
}

@ParameterizedTest
@ValueSource(longs = {-1L, 0L, 1L, 6L, 7L})
public void testUnlimited(long limit) {
LimitableDataIterator<RowData> dataIterator =
new LimitableDataIterator<>(
reader, combinedScanTask, fileIO, encryptionManager, RecordLimiter.create(limit));

List<RowData> result = Lists.newArrayList();
while (dataIterator.hasNext()) {
result.add(dataIterator.next());
}

if (limit <= 0 || limit > totalRecords) {
// read all records
assertThat(result).hasSize(totalRecords);
} else {
assertThat(result).hasSize((int) limit);
}
}
}

0 comments on commit f758593

Please sign in to comment.