Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Client] Make bucketAssigner for lake table pluggable #123

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.client.lakehouse;

import com.alibaba.fluss.row.InternalRow;

/**
* the bucket extractor of bucket, fluss will use the bucket that bucket assign to align with lake
* table when data lake is enabled.
*/
public interface AbstractBucketAssigner {

int assignBucket(InternalRow row);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.fluss.client.lakehouse.paimon.PaimonBucketAssigner;
import com.alibaba.fluss.client.write.BucketAssigner;
import com.alibaba.fluss.cluster.Cluster;
import com.alibaba.fluss.lakehouse.DataLakeType;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.types.RowType;
Expand All @@ -30,21 +31,34 @@
/** A bucket assigner for table with data lake enabled. */
public class LakeTableBucketAssigner implements BucketAssigner {

// the bucket extractor of bucket, fluss will use the the bucket
// that paimon assign to align with paimon when data lake is enabled
// todo: make it pluggable
private final PaimonBucketAssigner paimonBucketAssigner;
private final AbstractBucketAssigner bucketAssigner;

public LakeTableBucketAssigner(TableDescriptor tableDescriptor, int bucketNum) {
this.paimonBucketAssigner =
new PaimonBucketAssigner(
tableDescriptor.getSchema().toRowType(),
tableDescriptor.getBucketKey(),
bucketNum);
DataLakeType dataLakeType = tableDescriptor.getDataLakeType();
switch (dataLakeType) {
case PAIMON:
this.bucketAssigner =
new PaimonBucketAssigner(
tableDescriptor.getSchema().toRowType(),
tableDescriptor.getBucketKey(),
bucketNum);
break;
default:
throw new UnsupportedOperationException(
"Data lake type " + dataLakeType + " is not supported.");
}
}

public LakeTableBucketAssigner(RowType rowType, List<String> bucketKey, int bucketNum) {
this.paimonBucketAssigner = new PaimonBucketAssigner(rowType, bucketKey, bucketNum);
public LakeTableBucketAssigner(
RowType rowType, List<String> bucketKey, int bucketNum, DataLakeType dataLakeType) {
switch (dataLakeType) {
case PAIMON:
this.bucketAssigner = new PaimonBucketAssigner(rowType, bucketKey, bucketNum);
break;
default:
throw new UnsupportedOperationException(
"Data lake type " + dataLakeType + " is not supported.");
}
}

@Override
Expand All @@ -57,7 +71,7 @@ public int assignBucket(@Nullable byte[] key, Cluster cluster) {

@Override
public int assignBucket(@Nullable byte[] key, InternalRow row, Cluster cluster) {
return paimonBucketAssigner.assignBucket(row);
return bucketAssigner.assignBucket(row);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.fluss.client.lakehouse.paimon;

import com.alibaba.fluss.client.lakehouse.AbstractBucketAssigner;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.ProjectedRow;
import com.alibaba.fluss.types.RowType;
Expand All @@ -28,7 +29,7 @@
import java.util.List;

/** A bucket assigner to align with Paimon. */
public class PaimonBucketAssigner {
public class PaimonBucketAssigner implements AbstractBucketAssigner {

private final int bucketNum;

Expand Down Expand Up @@ -56,6 +57,7 @@ private static int[] getBucketKeyIndex(RowType rowType, List<String> bucketKey)
return bucketKeyIndex;
}

@Override
public int assignBucket(InternalRow row) {
BinaryRow bucketKey = getBucketRow(row);
return KeyAndBucketExtractor.bucket(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ private int getBucketId(byte[] keyBytes, InternalRow key) {
new LakeTableBucketAssigner(
keyRowType,
tableInfo.getTableDescriptor().getBucketKey(),
numBuckets);
numBuckets,
tableInfo.getTableDescriptor().getDataLakeType());
}
return lakeTableBucketAssigner.assignBucket(
keyBytes, key, metadataUpdater.getCluster());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.lakehouse.DataLakeType;
import com.alibaba.fluss.metadata.KvFormat;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.utils.ArrayUtils;
Expand Down Expand Up @@ -884,6 +885,12 @@ public class ConfigOptions {
+ "When this option is set to ture and the datalake tiering service is up,"
+ " the table will be tiered and compacted into datalake format stored on lakehouse storage.");

public static final ConfigOption<DataLakeType> TABLE_DATALAKE_TYPE =
key("table.datalake.type")
.enumType(DataLakeType.class)
.defaultValue(DataLakeType.PAIMON)
.withDescription("Datalake type, currently only supports paimon.");

// ------------------------------------------------------------------------
// ConfigOptions for Kv
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.lakehouse;

/** The type of data lake. */
public enum DataLakeType {
PAIMON
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.config.ConfigurationUtils;
import com.alibaba.fluss.lakehouse.DataLakeType;
import com.alibaba.fluss.utils.AutoPartitionStrategy;
import com.alibaba.fluss.utils.Preconditions;
import com.alibaba.fluss.utils.json.JsonSerdeUtils;
Expand Down Expand Up @@ -244,6 +245,10 @@ public boolean isDataLakeEnabled() {
return configuration().get(ConfigOptions.TABLE_DATALAKE_ENABLED);
}

public DataLakeType getDataLakeType() {
return configuration().get(ConfigOptions.TABLE_DATALAKE_TYPE);
}

public TableDescriptor copy(Map<String, String> newProperties) {
return new TableDescriptor(
schema, comment, partitionKeys, tableDistribution, newProperties, customProperties);
Expand Down