diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/AbstractBucketAssigner.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/AbstractBucketAssigner.java new file mode 100644 index 00000000..cd586259 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/AbstractBucketAssigner.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lakehouse; + +import com.alibaba.fluss.row.InternalRow; + +/** + * the bucket extractor of bucket, fluss will use the bucket that bucket assign to align with lake + * table when data lake is enabled. + */ +public interface AbstractBucketAssigner { + + int assignBucket(InternalRow row); +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/LakeTableBucketAssigner.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/LakeTableBucketAssigner.java index 4bf79e0a..d94abdac 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/LakeTableBucketAssigner.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/LakeTableBucketAssigner.java @@ -19,6 +19,7 @@ import com.alibaba.fluss.client.lakehouse.paimon.PaimonBucketAssigner; import com.alibaba.fluss.client.write.BucketAssigner; import com.alibaba.fluss.cluster.Cluster; +import com.alibaba.fluss.lakehouse.DataLakeType; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.types.RowType; @@ -30,21 +31,34 @@ /** A bucket assigner for table with data lake enabled. */ public class LakeTableBucketAssigner implements BucketAssigner { - // the bucket extractor of bucket, fluss will use the the bucket - // that paimon assign to align with paimon when data lake is enabled - // todo: make it pluggable - private final PaimonBucketAssigner paimonBucketAssigner; + private final AbstractBucketAssigner bucketAssigner; public LakeTableBucketAssigner(TableDescriptor tableDescriptor, int bucketNum) { - this.paimonBucketAssigner = - new PaimonBucketAssigner( - tableDescriptor.getSchema().toRowType(), - tableDescriptor.getBucketKey(), - bucketNum); + DataLakeType dataLakeType = tableDescriptor.getDataLakeType(); + switch (dataLakeType) { + case PAIMON: + this.bucketAssigner = + new PaimonBucketAssigner( + tableDescriptor.getSchema().toRowType(), + tableDescriptor.getBucketKey(), + bucketNum); + break; + default: + throw new UnsupportedOperationException( + "Data lake type " + dataLakeType + " is not supported."); + } } - public LakeTableBucketAssigner(RowType rowType, List bucketKey, int bucketNum) { - this.paimonBucketAssigner = new PaimonBucketAssigner(rowType, bucketKey, bucketNum); + public LakeTableBucketAssigner( + RowType rowType, List bucketKey, int bucketNum, DataLakeType dataLakeType) { + switch (dataLakeType) { + case PAIMON: + this.bucketAssigner = new PaimonBucketAssigner(rowType, bucketKey, bucketNum); + break; + default: + throw new UnsupportedOperationException( + "Data lake type " + dataLakeType + " is not supported."); + } } @Override @@ -57,7 +71,7 @@ public int assignBucket(@Nullable byte[] key, Cluster cluster) { @Override public int assignBucket(@Nullable byte[] key, InternalRow row, Cluster cluster) { - return paimonBucketAssigner.assignBucket(row); + return bucketAssigner.assignBucket(row); } @Override diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/paimon/PaimonBucketAssigner.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/paimon/PaimonBucketAssigner.java index 3661db29..aafeb807 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/paimon/PaimonBucketAssigner.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/paimon/PaimonBucketAssigner.java @@ -16,6 +16,7 @@ package com.alibaba.fluss.client.lakehouse.paimon; +import com.alibaba.fluss.client.lakehouse.AbstractBucketAssigner; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.row.ProjectedRow; import com.alibaba.fluss.types.RowType; @@ -28,7 +29,7 @@ import java.util.List; /** A bucket assigner to align with Paimon. */ -public class PaimonBucketAssigner { +public class PaimonBucketAssigner implements AbstractBucketAssigner { private final int bucketNum; @@ -56,6 +57,7 @@ private static int[] getBucketKeyIndex(RowType rowType, List bucketKey) return bucketKeyIndex; } + @Override public int assignBucket(InternalRow row) { BinaryRow bucketKey = getBucketRow(row); return KeyAndBucketExtractor.bucket( diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java index 9815eeb4..6489a5d8 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java @@ -201,7 +201,8 @@ private int getBucketId(byte[] keyBytes, InternalRow key) { new LakeTableBucketAssigner( keyRowType, tableInfo.getTableDescriptor().getBucketKey(), - numBuckets); + numBuckets, + tableInfo.getTableDescriptor().getDataLakeType()); } return lakeTableBucketAssigner.assignBucket( keyBytes, key, metadataUpdater.getCluster()); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index 468deedd..e057db78 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -18,6 +18,7 @@ import com.alibaba.fluss.annotation.Internal; import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.lakehouse.DataLakeType; import com.alibaba.fluss.metadata.KvFormat; import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.utils.ArrayUtils; @@ -884,6 +885,12 @@ public class ConfigOptions { + "When this option is set to ture and the datalake tiering service is up," + " the table will be tiered and compacted into datalake format stored on lakehouse storage."); + public static final ConfigOption TABLE_DATALAKE_TYPE = + key("table.datalake.type") + .enumType(DataLakeType.class) + .defaultValue(DataLakeType.PAIMON) + .withDescription("Datalake type, currently only supports paimon."); + // ------------------------------------------------------------------------ // ConfigOptions for Kv // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/DataLakeType.java b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/DataLakeType.java new file mode 100644 index 00000000..eab1fee9 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/lakehouse/DataLakeType.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lakehouse; + +/** The type of data lake. */ +public enum DataLakeType { + PAIMON +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index d3737b93..f36ece35 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -22,6 +22,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.ConfigurationUtils; +import com.alibaba.fluss.lakehouse.DataLakeType; import com.alibaba.fluss.utils.AutoPartitionStrategy; import com.alibaba.fluss.utils.Preconditions; import com.alibaba.fluss.utils.json.JsonSerdeUtils; @@ -244,6 +245,10 @@ public boolean isDataLakeEnabled() { return configuration().get(ConfigOptions.TABLE_DATALAKE_ENABLED); } + public DataLakeType getDataLakeType() { + return configuration().get(ConfigOptions.TABLE_DATALAKE_TYPE); + } + public TableDescriptor copy(Map newProperties) { return new TableDescriptor( schema, comment, partitionKeys, tableDistribution, newProperties, customProperties);