Skip to content

Commit

Permalink
branch-3.0: [enchement](iceberg)support read iceberg partition evolut…
Browse files Browse the repository at this point in the history
…ion table. #45367 (#45563)

Cherry-picked from #45367

Co-authored-by: daidai <[email protected]>
  • Loading branch information
github-actions[bot] and hubgeter authored Dec 20, 2024
1 parent e76c612 commit 0671640
Show file tree
Hide file tree
Showing 6 changed files with 517 additions and 43 deletions.
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/iceberg/partition_transformers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ std::string PartitionColumnTransform::get_partition_value(const TypeDescriptor&
if (value.has_value()) {
switch (type.type) {
case TYPE_BOOLEAN: {
return std::to_string(std::any_cast<bool>(value));
return std::any_cast<bool>(value) ? "true" : "false";
}
case TYPE_TINYINT: {
return std::to_string(std::any_cast<Int8>(value));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@

use demo.test_db;
CREATE TABLE iceberg_add_partition (
id INT,
name STRING,
age INT
) USING iceberg;
INSERT INTO iceberg_add_partition VALUES(1, 'Alice', 30),(2, 'Bob', 25);
ALTER TABLE iceberg_add_partition ADD PARTITION FIELD age;
ALTER TABLE iceberg_add_partition ADD COLUMNS address STRING;
INSERT INTO iceberg_add_partition VALUES (4, 'Charlie', 45, '123 Street Name');
ALTER TABLE iceberg_add_partition ADD PARTITION FIELD bucket(10, id);
INSERT INTO iceberg_add_partition VALUES (5, 'Eve', 29, '789 Third St');
ALTER TABLE iceberg_add_partition ADD PARTITION FIELD truncate(5, address);
INSERT INTO iceberg_add_partition VALUES (6, 'Frank', 33,"xx"),(7, 'Grace', 28,"yyyyyyyyy");



CREATE TABLE iceberg_drop_partition (
id INT,
name STRING,
amount DOUBLE,
created_date DATE
)
USING iceberg
PARTITIONED BY (year(created_date),bucket(10,created_date));
INSERT INTO iceberg_drop_partition VALUES
(1, 'Alice', 100.0, DATE '2023-12-01'),
(2, 'Bob', 200.0, DATE '2023-12-02'),
(3, 'Charlie', 300.0, DATE '2024-12-03');
ALTER TABLE iceberg_drop_partition DROP PARTITION FIELD year(created_date);
INSERT INTO iceberg_drop_partition VALUES
(4, 'David', 400.0, DATE '2023-12-02'),
(5, 'Eve', 500.0, DATE '2024-12-03');
ALTER TABLE iceberg_drop_partition DROP PARTITION FIELD bucket(10,created_date);
INSERT INTO iceberg_drop_partition VALUES
(6, 'David', 400.0, DATE '2025-12-12'),
(7, 'Eve', 500.0, DATE '2025-12-23');


CREATE TABLE iceberg_replace_partition (
id INT,
name STRING,
amount DOUBLE,
created_date DATE
)
USING iceberg
PARTITIONED BY (year(created_date),bucket(10,created_date));
INSERT INTO iceberg_replace_partition VALUES
(1, 'Alice', 100.0, DATE '2023-01-01'),
(2, 'Bob', 200.0, DATE '2023-12-02'),
(3, 'Charlie', 300.0, DATE '2024-12-03');
ALTER TABLE iceberg_replace_partition REPLACE PARTITION FIELD year(created_date) WITH month(created_date);
INSERT INTO iceberg_replace_partition VALUES
(4, 'David', 400.0, DATE '2023-12-02'),
(5, 'Eve', 500.0, DATE '2024-07-03');
ALTER TABLE iceberg_replace_partition REPLACE PARTITION FIELD bucket(10,created_date) WITH bucket(10,id);
INSERT INTO iceberg_replace_partition VALUES
(6, 'David', 400.0, DATE '2025-10-12'),
(7, 'Eve', 500.0, DATE '2025-09-23');




CREATE TABLE iceberg_evolution_partition (
id INT,
name STRING,
age INT
) USING iceberg;
INSERT INTO iceberg_evolution_partition VALUES(1, 'Alice', 30),(2, 'Bob', 25);
ALTER TABLE iceberg_evolution_partition ADD PARTITION FIELD age;
ALTER TABLE iceberg_evolution_partition ADD COLUMNS address STRING;
INSERT INTO iceberg_evolution_partition VALUES (4, 'Charlie', 45, '123 Street Name');
ALTER TABLE iceberg_evolution_partition ADD PARTITION FIELD bucket(10, id);
INSERT INTO iceberg_evolution_partition VALUES (5, 'Eve', 29, '789 Third St');
ALTER TABLE iceberg_evolution_partition REPLACE PARTITION FIELD bucket(10, id) WITH truncate(5, address);
INSERT INTO iceberg_evolution_partition VALUES (6, 'Frank', 33,"xx"),(7, 'Grace', 28,"yyyyyyyyy");
ALTER TABLE iceberg_evolution_partition DROP PARTITION FIELD truncate(5, address);
INSERT INTO iceberg_evolution_partition VALUES (8, 'Hank', 40, "zz"), (9, 'Ivy', 22, "aaaaaa");
ALTER TABLE iceberg_evolution_partition DROP COLUMNS address;
-- INSERT INTO iceberg_evolution_partition VALUES (10, 'Jack', 35), (11, 'Kara', 30);
-- spark error.

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
Expand All @@ -56,19 +57,17 @@
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -78,11 +77,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;

public class IcebergScanNode extends FileQueryScanNode {

public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
private static final Logger LOG = LogManager.getLogger(IcebergScanNode.class);

private IcebergSource source;
private Table icebergTable;
Expand Down Expand Up @@ -217,32 +216,44 @@ private List<Split> doGetSplits(int numBackends) throws UserException {
boolean isPartitionedTable = icebergTable.spec().isPartitioned();

long realFileSplitSize = getRealFileSplitSize(0);
CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), realFileSplitSize);
CloseableIterable<FileScanTask> fileScanTasks = null;
try {
fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), realFileSplitSize);
} catch (NullPointerException e) {
/*
Caused by: java.lang.NullPointerException: Type cannot be null
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull
(Preconditions.java:921) ~[iceberg-bundled-guava-1.4.3.jar:?]
at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:447) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.types.Types$NestedField.optional(Types.java:416) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.lambda$new$0(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.forEach
(RegularImmutableMap.java:297) ~[iceberg-bundled-guava-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:71) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex$Builder.build(DeleteFileIndex.java:578) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.ManifestGroup.plan(ManifestGroup.java:183) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.ManifestGroup.planFiles(ManifestGroup.java:170) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DataTableScan.doPlanFiles(DataTableScan.java:89) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139) ~[iceberg-core-1.4.3.jar:?]
at org.apache.doris.datasource.iceberg.source.IcebergScanNode.doGetSplits
(IcebergScanNode.java:209) ~[doris-fe.jar:1.2-SNAPSHOT]
EXAMPLE:
CREATE TABLE iceberg_tb(col1 INT,col2 STRING) USING ICEBERG PARTITIONED BY (bucket(10,col2));
INSERT INTO iceberg_tb VALUES( ... );
ALTER TABLE iceberg_tb DROP PARTITION FIELD bucket(10,col2);
ALTER TABLE iceberg_tb DROP COLUMNS col2 STRING;
Link: https://github.com/apache/iceberg/pull/10755
*/
LOG.warn("Iceberg TableScanUtil.splitFiles throw NullPointerException. Cause : ", e);
throw new NotSupportedException("Unable to read Iceberg table with dropped old partition column.");
}
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, realFileSplitSize, 1, 0)) {
combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> {
List<String> partitionValues = new ArrayList<>();
if (isPartitionedTable) {
StructLike structLike = splitTask.file().partition();
List<PartitionField> fields = splitTask.spec().fields();
Types.StructType structType = icebergTable.schema().asStruct();

// set partitionValue for this IcebergSplit
for (int i = 0; i < structLike.size(); i++) {
Object obj = structLike.get(i, Object.class);
String value = String.valueOf(obj);
PartitionField partitionField = fields.get(i);
if (partitionField.transform().isIdentity()) {
Type type = structType.fieldType(partitionField.name());
if (type != null && type.typeId().equals(Type.TypeID.DATE)) {
// iceberg use integer to store date,
// we need transform it to string
value = DateTimeUtil.daysToIsoDate((Integer) obj);
}
}
partitionValues.add(value);
}

// Counts the number of partitions read
partitionPathSet.add(structLike.toString());
}
Expand All @@ -256,7 +267,7 @@ private List<Split> doGetSplits(int numBackends) throws UserException {
new String[0],
formatVersion,
source.getCatalog().getProperties(),
partitionValues,
new ArrayList<>(),
originalPath);
split.setTargetSplitSize(realFileSplitSize);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
Expand Down Expand Up @@ -289,7 +300,6 @@ private List<Split> doGetSplits(int numBackends) throws UserException {
return pushDownCountSplits;
}
}

selectedPartitionNum = partitionPathSet.size();
return splits;
}
Expand Down Expand Up @@ -351,8 +361,20 @@ public TFileFormatType getFileFormatType() throws UserException {

@Override
public List<String> getPathPartitionKeys() throws UserException {
return icebergTable.spec().fields().stream().map(PartitionField::name).map(String::toLowerCase)
.collect(Collectors.toList());
// return icebergTable.spec().fields().stream().map(PartitionField::name).map(String::toLowerCase)
// .collect(Collectors.toList());
/**First, iceberg partition columns are based on existing fields, which will be stored in the actual data file.
* Second, iceberg partition columns support Partition transforms. In this case, the path partition key is not
* equal to the column name of the partition column, so remove this code and get all the columns you want to
* read from the file.
* Related code:
* be/src/vec/exec/scan/vfile_scanner.cpp:
* VFileScanner::_init_expr_ctxes()
* if (slot_info.is_file_slot) {
* xxxx
* }
*/
return new ArrayList<>();
}

@Override
Expand Down
Loading

0 comments on commit 0671640

Please sign in to comment.