Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enchement](iceberg)support read iceberg partition evolution table. #45367

Merged
merged 6 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/iceberg/partition_transformers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ std::string PartitionColumnTransform::get_partition_value(const TypeDescriptor&
if (value.has_value()) {
switch (type.type) {
case TYPE_BOOLEAN: {
return std::to_string(std::any_cast<bool>(value));
return std::any_cast<bool>(value) ? "true" : "false";
}
case TYPE_TINYINT: {
return std::to_string(std::any_cast<Int8>(value));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@

use demo.test_db;
CREATE TABLE iceberg_add_partition (
id INT,
name STRING,
age INT
) USING iceberg;
INSERT INTO iceberg_add_partition VALUES(1, 'Alice', 30),(2, 'Bob', 25);
ALTER TABLE iceberg_add_partition ADD PARTITION FIELD age;
ALTER TABLE iceberg_add_partition ADD COLUMNS address STRING;
INSERT INTO iceberg_add_partition VALUES (4, 'Charlie', 45, '123 Street Name');
ALTER TABLE iceberg_add_partition ADD PARTITION FIELD bucket(10, id);
INSERT INTO iceberg_add_partition VALUES (5, 'Eve', 29, '789 Third St');
ALTER TABLE iceberg_add_partition ADD PARTITION FIELD truncate(5, address);
INSERT INTO iceberg_add_partition VALUES (6, 'Frank', 33,"xx"),(7, 'Grace', 28,"yyyyyyyyy");



CREATE TABLE iceberg_drop_partition (
id INT,
name STRING,
amount DOUBLE,
created_date DATE
)
USING iceberg
PARTITIONED BY (year(created_date),bucket(10,created_date));
INSERT INTO iceberg_drop_partition VALUES
(1, 'Alice', 100.0, DATE '2023-12-01'),
(2, 'Bob', 200.0, DATE '2023-12-02'),
(3, 'Charlie', 300.0, DATE '2024-12-03');
ALTER TABLE iceberg_drop_partition DROP PARTITION FIELD year(created_date);
INSERT INTO iceberg_drop_partition VALUES
(4, 'David', 400.0, DATE '2023-12-02'),
(5, 'Eve', 500.0, DATE '2024-12-03');
ALTER TABLE iceberg_drop_partition DROP PARTITION FIELD bucket(10,created_date);
INSERT INTO iceberg_drop_partition VALUES
(6, 'David', 400.0, DATE '2025-12-12'),
(7, 'Eve', 500.0, DATE '2025-12-23');


CREATE TABLE iceberg_replace_partition (
id INT,
name STRING,
amount DOUBLE,
created_date DATE
)
USING iceberg
PARTITIONED BY (year(created_date),bucket(10,created_date));
INSERT INTO iceberg_replace_partition VALUES
(1, 'Alice', 100.0, DATE '2023-01-01'),
(2, 'Bob', 200.0, DATE '2023-12-02'),
(3, 'Charlie', 300.0, DATE '2024-12-03');
ALTER TABLE iceberg_replace_partition REPLACE PARTITION FIELD year(created_date) WITH month(created_date);
INSERT INTO iceberg_replace_partition VALUES
(4, 'David', 400.0, DATE '2023-12-02'),
(5, 'Eve', 500.0, DATE '2024-07-03');
ALTER TABLE iceberg_replace_partition REPLACE PARTITION FIELD bucket(10,created_date) WITH bucket(10,id);
INSERT INTO iceberg_replace_partition VALUES
(6, 'David', 400.0, DATE '2025-10-12'),
(7, 'Eve', 500.0, DATE '2025-09-23');




CREATE TABLE iceberg_evolution_partition (
id INT,
name STRING,
age INT
) USING iceberg;
INSERT INTO iceberg_evolution_partition VALUES(1, 'Alice', 30),(2, 'Bob', 25);
ALTER TABLE iceberg_evolution_partition ADD PARTITION FIELD age;
ALTER TABLE iceberg_evolution_partition ADD COLUMNS address STRING;
INSERT INTO iceberg_evolution_partition VALUES (4, 'Charlie', 45, '123 Street Name');
ALTER TABLE iceberg_evolution_partition ADD PARTITION FIELD bucket(10, id);
INSERT INTO iceberg_evolution_partition VALUES (5, 'Eve', 29, '789 Third St');
ALTER TABLE iceberg_evolution_partition REPLACE PARTITION FIELD bucket(10, id) WITH truncate(5, address);
INSERT INTO iceberg_evolution_partition VALUES (6, 'Frank', 33,"xx"),(7, 'Grace', 28,"yyyyyyyyy");
ALTER TABLE iceberg_evolution_partition DROP PARTITION FIELD truncate(5, address);
INSERT INTO iceberg_evolution_partition VALUES (8, 'Hank', 40, "zz"), (9, 'Ivy', 22, "aaaaaa");
ALTER TABLE iceberg_evolution_partition DROP COLUMNS address;
-- INSERT INTO iceberg_evolution_partition VALUES (10, 'Jack', 35), (11, 'Kara', 30);
-- spark error.

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
Expand All @@ -56,19 +57,17 @@
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -78,11 +77,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;

public class IcebergScanNode extends FileQueryScanNode {

public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
private static final Logger LOG = LogManager.getLogger(IcebergScanNode.class);

private IcebergSource source;
private Table icebergTable;
Expand Down Expand Up @@ -217,32 +216,44 @@ private List<Split> doGetSplits(int numBackends) throws UserException {
boolean isPartitionedTable = icebergTable.spec().isPartitioned();

long realFileSplitSize = getRealFileSplitSize(0);
CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), realFileSplitSize);
CloseableIterable<FileScanTask> fileScanTasks = null;
try {
fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), realFileSplitSize);
} catch (NullPointerException e) {
/*
Caused by: java.lang.NullPointerException: Type cannot be null
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull
(Preconditions.java:921) ~[iceberg-bundled-guava-1.4.3.jar:?]
at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:447) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.types.Types$NestedField.optional(Types.java:416) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.lambda$new$0(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.forEach
(RegularImmutableMap.java:297) ~[iceberg-bundled-guava-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:71) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex$Builder.build(DeleteFileIndex.java:578) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.ManifestGroup.plan(ManifestGroup.java:183) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.ManifestGroup.planFiles(ManifestGroup.java:170) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DataTableScan.doPlanFiles(DataTableScan.java:89) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139) ~[iceberg-core-1.4.3.jar:?]
at org.apache.doris.datasource.iceberg.source.IcebergScanNode.doGetSplits
(IcebergScanNode.java:209) ~[doris-fe.jar:1.2-SNAPSHOT]
EXAMPLE:
CREATE TABLE iceberg_tb(col1 INT,col2 STRING) USING ICEBERG PARTITIONED BY (bucket(10,col2));
INSERT INTO iceberg_tb VALUES( ... );
ALTER TABLE iceberg_tb DROP PARTITION FIELD bucket(10,col2);
ALTER TABLE iceberg_tb DROP COLUMNS col2 STRING;
Link: https://github.com/apache/iceberg/pull/10755
*/
LOG.warn("Iceberg TableScanUtil.splitFiles throw NullPointerException. Cause : ", e);
throw new NotSupportedException("Unable to read Iceberg table with dropped old partition column.");
}
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, realFileSplitSize, 1, 0)) {
combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> {
List<String> partitionValues = new ArrayList<>();
if (isPartitionedTable) {
StructLike structLike = splitTask.file().partition();
List<PartitionField> fields = splitTask.spec().fields();
Types.StructType structType = icebergTable.schema().asStruct();

// set partitionValue for this IcebergSplit
for (int i = 0; i < structLike.size(); i++) {
Object obj = structLike.get(i, Object.class);
String value = String.valueOf(obj);
PartitionField partitionField = fields.get(i);
if (partitionField.transform().isIdentity()) {
Type type = structType.fieldType(partitionField.name());
if (type != null && type.typeId().equals(Type.TypeID.DATE)) {
// iceberg use integer to store date,
// we need transform it to string
value = DateTimeUtil.daysToIsoDate((Integer) obj);
}
}
partitionValues.add(value);
}

// Counts the number of partitions read
partitionPathSet.add(structLike.toString());
}
Expand All @@ -256,7 +267,7 @@ private List<Split> doGetSplits(int numBackends) throws UserException {
new String[0],
formatVersion,
source.getCatalog().getProperties(),
partitionValues,
new ArrayList<>(),
originalPath);
split.setTargetSplitSize(realFileSplitSize);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
Expand Down Expand Up @@ -289,7 +300,6 @@ private List<Split> doGetSplits(int numBackends) throws UserException {
return pushDownCountSplits;
}
}

selectedPartitionNum = partitionPathSet.size();
return splits;
}
Expand Down Expand Up @@ -351,8 +361,20 @@ public TFileFormatType getFileFormatType() throws UserException {

@Override
public List<String> getPathPartitionKeys() throws UserException {
return icebergTable.spec().fields().stream().map(PartitionField::name).map(String::toLowerCase)
.collect(Collectors.toList());
// return icebergTable.spec().fields().stream().map(PartitionField::name).map(String::toLowerCase)
// .collect(Collectors.toList());
/**First, iceberg partition columns are based on existing fields, which will be stored in the actual data file.
* Second, iceberg partition columns support Partition transforms. In this case, the path partition key is not
* equal to the column name of the partition column, so remove this code and get all the columns you want to
* read from the file.
* Related code:
* be/src/vec/exec/scan/vfile_scanner.cpp:
* VFileScanner::_init_expr_ctxes()
* if (slot_info.is_file_slot) {
* xxxx
* }
*/
return new ArrayList<>();
}

@Override
Expand Down
Loading
Loading