diff --git a/core/trino-server/src/main/provisio/presto.xml b/core/trino-server/src/main/provisio/presto.xml
index 48190e7b21a7..02575a105ae4 100644
--- a/core/trino-server/src/main/provisio/presto.xml
+++ b/core/trino-server/src/main/provisio/presto.xml
@@ -74,6 +74,12 @@
+
+
+
+
+
+
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java
index 5546808b2bdb..37f33bc6cfaa 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java
@@ -28,7 +28,6 @@
import io.trino.plugin.hive.HiveSplit.BucketConversion;
import io.trino.plugin.hive.HiveSplit.BucketValidation;
import io.trino.plugin.hive.acid.AcidTransaction;
-import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.util.HiveBucketing.BucketingVersion;
@@ -100,8 +99,6 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
-import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
-import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static io.trino.plugin.hive.HivePartitionManager.partitionMatches;
import static io.trino.plugin.hive.HiveSessionProperties.getMaxInitialSplitSize;
@@ -114,11 +111,11 @@
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.FAIL;
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.IGNORED;
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.RECURSE;
-import static io.trino.plugin.hive.util.HiveUtil.checkCondition;
import static io.trino.plugin.hive.util.HiveUtil.getFooterCount;
import static io.trino.plugin.hive.util.HiveUtil.getHeaderCount;
import static io.trino.plugin.hive.util.HiveUtil.getInputFormat;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
+import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeys;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.Integer.parseInt;
import static java.lang.Math.max;
@@ -942,28 +939,6 @@ private static List getTargetPathsFromSymlink(FileSystem fileSystem, Path
}
}
- private static List getPartitionKeys(Table table, Optional partition)
- {
- if (partition.isEmpty()) {
- return ImmutableList.of();
- }
- ImmutableList.Builder partitionKeys = ImmutableList.builder();
- List keys = table.getPartitionColumns();
- List values = partition.get().getValues();
- checkCondition(keys.size() == values.size(), HIVE_INVALID_METADATA, "Expected %s partition key values, but got %s", keys.size(), values.size());
- for (int i = 0; i < keys.size(); i++) {
- String name = keys.get(i).getName();
- HiveType hiveType = keys.get(i).getType();
- if (!hiveType.isSupportedType(table.getStorage().getStorageFormat())) {
- throw new TrinoException(NOT_SUPPORTED, format("Unsupported Hive type %s found in partition keys of table %s.%s", hiveType, table.getDatabaseName(), table.getTableName()));
- }
- String value = values.get(i);
- checkCondition(value != null, HIVE_INVALID_PARTITION_VALUE, "partition key value cannot be null for field: %s", name);
- partitionKeys.add(new HivePartitionKey(name, value));
- }
- return partitionKeys.build();
- }
-
private static Properties getPartitionSchema(Table table, Optional partition)
{
if (partition.isEmpty()) {
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
index 1aeee10e3e85..8ee548f8a89d 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
@@ -365,7 +365,7 @@ public static Optional getColumnType(HiveColumnH
return Optional.of(new GroupType(baseType.getRepetition(), baseType.getName(), ImmutableList.of(type)));
}
- private static Optional getColumnIndexStore(
+ public static Optional getColumnIndexStore(
ParquetDataSource dataSource,
BlockMetaData blockMetadata,
Map, RichColumnDescriptor> descriptorsByPath,
@@ -440,7 +440,7 @@ public static TupleDomain getParquetTupleDomain(
return TupleDomain.withColumnDomains(predicate.buildOrThrow());
}
- private static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
+ public static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
{
if (useParquetColumnNames) {
return getParquetTypeByName(column.getBaseColumnName(), messageType);
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java
index 9486f51f3d8d..0a8e611088c9 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java
@@ -32,6 +32,7 @@
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.avro.TrinoAvroSerDe;
import io.trino.plugin.hive.metastore.Column;
+import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.SortingColumn;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.ErrorCodeSupplier;
@@ -1128,4 +1129,26 @@ public static boolean isIcebergTable(Table table)
{
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(ICEBERG_TABLE_TYPE_NAME));
}
+
+ public static List getPartitionKeys(Table table, Optional partition)
+ {
+ if (partition.isEmpty()) {
+ return ImmutableList.of();
+ }
+ ImmutableList.Builder partitionKeys = ImmutableList.builder();
+ List keys = table.getPartitionColumns();
+ List values = partition.get().getValues();
+ checkCondition(keys.size() == values.size(), HIVE_INVALID_METADATA, "Expected %s partition key values, but got %s", keys.size(), values.size());
+ for (int i = 0; i < keys.size(); i++) {
+ String name = keys.get(i).getName();
+ HiveType hiveType = keys.get(i).getType();
+ if (!hiveType.isSupportedType(table.getStorage().getStorageFormat())) {
+ throw new TrinoException(NOT_SUPPORTED, format("Unsupported Hive type %s found in partition keys of table %s.%s", hiveType, table.getDatabaseName(), table.getTableName()));
+ }
+ String value = values.get(i);
+ checkCondition(value != null, HIVE_INVALID_PARTITION_VALUE, "partition key value cannot be null for field: %s", name);
+ partitionKeys.add(new HivePartitionKey(name, value));
+ }
+ return partitionKeys.build();
+ }
}
diff --git a/plugin/trino-hudi/pom.xml b/plugin/trino-hudi/pom.xml
new file mode 100644
index 000000000000..a98763cd29c2
--- /dev/null
+++ b/plugin/trino-hudi/pom.xml
@@ -0,0 +1,371 @@
+
+
+ 4.0.0
+
+
+ trino-root
+ io.trino
+ 370-SNAPSHOT
+ ../../pom.xml
+
+
+ trino-hudi
+ Trino - Hudi Connector
+ trino-plugin
+
+
+ ${project.parent.basedir}
+ 0.10.0
+
+
+
+
+ io.trino
+ trino-hive
+
+
+ io.trino
+ trino-memory-context
+
+
+ io.trino
+ trino-parquet
+
+
+ io.trino
+ trino-plugin-toolkit
+
+
+ io.trino.hadoop
+ hadoop-apache
+
+
+ io.trino.hive
+ hive-apache
+
+
+ io.airlift
+ bootstrap
+
+
+ io.airlift
+ configuration
+
+
+ io.airlift
+ event
+
+
+ io.airlift
+ json
+
+
+ io.airlift
+ log
+
+
+ io.airlift
+ units
+
+
+ com.google.guava
+ guava
+
+
+ com.google.inject
+ guice
+
+
+ javax.inject
+ javax.inject
+
+
+ javax.validation
+ validation-api
+
+
+ joda-time
+ joda-time
+
+
+ org.apache.hudi
+ hudi-common
+ ${dep.hudi.version}
+
+
+ org.apache.hbase
+ hbase-server
+
+
+ org.apache.orc
+ orc-core
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.apache.httpcomponents
+ fluent-hc
+
+
+ org.rocksdb
+ rocksdbjni
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.httpcomponents
+ httpcore
+
+
+ org.apache.hive
+ hive-exec
+
+
+ org.apache.hive
+ hive-jdbc
+
+
+
+
+ org.apache.hudi
+ hudi-hadoop-mr
+ ${dep.hudi.version}
+
+
+ org.apache.hbase
+ hbase-server
+
+
+ org.apache.orc
+ orc-core
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.apache.httpcomponents
+ fluent-hc
+
+
+ org.rocksdb
+ rocksdbjni
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.httpcomponents
+ httpcore
+
+
+ org.apache.hive
+ hive-exec
+
+
+ org.apache.hive
+ hive-jdbc
+
+
+
+
+ org.apache.hudi
+ hudi-hive-sync
+ ${dep.hudi.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-auth
+
+
+ org.apache.hive
+ hive-common
+
+
+ org.apache.hive
+ hive-jdbc
+
+
+ org.apache.hive
+ hive-metastore
+
+
+ org.apache.hive
+ hive-service
+
+
+ org.apache.hudi
+ hudi-common
+
+
+ org.apache.hudi
+ hudi-hadoop-mr
+
+
+ org.apache.hudi
+ hudi-sync-common
+
+
+ org.apache.parquet
+ parquet-avro
+
+
+ log4j
+ log4j
+
+
+ com.beust
+ jcommander
+
+
+ servletapi
+ servletapi
+
+
+
+
+ org.weakref
+ jmxutils
+
+
+
+ io.trino
+ trino-spi
+ provided
+
+
+ io.airlift
+ slice
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+ org.openjdk.jol
+ jol-core
+ provided
+
+
+
+
+ io.trino
+ trino-hive
+ test-jar
+ test
+
+
+ io.trino
+ trino-hive-hadoop2
+ test
+
+
+ io.trino
+ trino-main
+ test
+
+
+ io.trino
+ trino-main
+ test-jar
+ test
+
+
+ io.trino
+ trino-parser
+ test
+
+
+ io.trino
+ trino-spi
+ test-jar
+ test
+
+
+ io.trino
+ trino-testing
+ test
+
+
+ io.trino
+ trino-testing-services
+ test
+
+
+ io.trino
+ trino-tpch
+ test
+
+
+ io.trino.tpch
+ tpch
+ test
+
+
+ io.airlift
+ testing
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ org.jetbrains
+ annotations
+ test
+
+
+ org.testng
+ testng
+ test
+
+
+
+
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java
new file mode 100644
index 000000000000..1b84fa5bc635
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.units.DataSize;
+import org.apache.hudi.common.model.HoodieFileFormat;
+
+import javax.validation.constraints.DecimalMax;
+import javax.validation.constraints.DecimalMin;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import static io.airlift.units.DataSize.Unit.MEGABYTE;
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+
+public class HudiConfig
+{
+ private HoodieFileFormat baseFileFormat = PARQUET;
+ private boolean metadataEnabled;
+ private boolean shouldSkipMetaStoreForPartition;
+ private boolean shouldUseParquetColumnNames = true;
+ private int partitionScannerParallelism = 16;
+ private int splitGeneratorParallelism = 16;
+ private int minPartitionBatchSize = 10;
+ private int maxPartitionBatchSize = 100;
+ private boolean sizeBasedSplitWeightsEnabled = true;
+ private DataSize standardSplitWeightSize = DataSize.of(128, MEGABYTE);
+ private double minimumAssignedSplitWeight = 0.05;
+
+ @NotNull
+ public HoodieFileFormat getBaseFileFormat()
+ {
+ return HoodieFileFormat.valueOf(baseFileFormat.name());
+ }
+
+ @Config("hudi.base-file-format")
+ public HudiConfig setBaseFileFormat(HoodieFileFormat baseFileFormat)
+ {
+ this.baseFileFormat = baseFileFormat;
+ return this;
+ }
+
+ @Config("hudi.metadata-enabled")
+ @ConfigDescription("Fetch the list of file names and sizes from metadata rather than storage")
+ public HudiConfig setMetadataEnabled(boolean metadataEnabled)
+ {
+ this.metadataEnabled = metadataEnabled;
+ return this;
+ }
+
+ @NotNull
+ public boolean isMetadataEnabled()
+ {
+ return this.metadataEnabled;
+ }
+
+ @Config("hudi.skip-metastore-for-partition")
+ @ConfigDescription("By default, partition info is fetched from the metastore. " +
+ "When this config is enabled, then the partition info is fetched using Hudi's partition extractor and relative partition path.")
+ public HudiConfig setSkipMetaStoreForPartition(boolean shouldSkipMetaStoreForPartition)
+ {
+ this.shouldSkipMetaStoreForPartition = shouldSkipMetaStoreForPartition;
+ return this;
+ }
+
+ @NotNull
+ public boolean getSkipMetaStoreForPartition()
+ {
+ return this.shouldSkipMetaStoreForPartition;
+ }
+
+ @Config("hudi.use-parquet-column-names")
+ @ConfigDescription("Access parquet columns using names from the file. If disabled, then columns are accessed using index."
+ + "Only applicable to parquet file format.")
+ public HudiConfig setUseParquetColumnNames(boolean shouldUseParquetColumnNames)
+ {
+ this.shouldUseParquetColumnNames = shouldUseParquetColumnNames;
+ return this;
+ }
+
+ @NotNull
+ public boolean getUseParquetColumnNames()
+ {
+ return this.shouldUseParquetColumnNames;
+ }
+
+ @Config("hudi.partition-scanner-parallelism")
+ @ConfigDescription("Number of threads to use for partition scanners")
+ public HudiConfig setPartitionScannerParallelism(int partitionScannerParallelism)
+ {
+ this.partitionScannerParallelism = partitionScannerParallelism;
+ return this;
+ }
+
+ @NotNull
+ public int getPartitionScannerParallelism()
+ {
+ return this.partitionScannerParallelism;
+ }
+
+ @Config("hudi.split-generator-parallelism")
+ @ConfigDescription("Number of threads to use for split generators")
+ public HudiConfig setSplitGeneratorParallelism(int splitGeneratorParallelism)
+ {
+ this.splitGeneratorParallelism = splitGeneratorParallelism;
+ return this;
+ }
+
+ @NotNull
+ public int getSplitGeneratorParallelism()
+ {
+ return this.splitGeneratorParallelism;
+ }
+
+ @Config("hudi.min-partition-batch-size")
+ public HudiConfig setMinPartitionBatchSize(int minPartitionBatchSize)
+ {
+ this.minPartitionBatchSize = minPartitionBatchSize;
+ return this;
+ }
+
+ @Min(1)
+ public int getMinPartitionBatchSize()
+ {
+ return minPartitionBatchSize;
+ }
+
+ @Config("hudi.max-partition-batch-size")
+ public HudiConfig setMaxPartitionBatchSize(int maxPartitionBatchSize)
+ {
+ this.maxPartitionBatchSize = maxPartitionBatchSize;
+ return this;
+ }
+
+ @Min(1)
+ public int getMaxPartitionBatchSize()
+ {
+ return maxPartitionBatchSize;
+ }
+
+ @Config("hudi.size-based-split-weights-enabled")
+ @ConfigDescription("Unlike uniform splitting, size-based splitting ensures that each batch of splits has enough data to process. " +
+ "By default, it is enabled to improve performance")
+ public HudiConfig setSizeBasedSplitWeightsEnabled(boolean sizeBasedSplitWeightsEnabled)
+ {
+ this.sizeBasedSplitWeightsEnabled = sizeBasedSplitWeightsEnabled;
+ return this;
+ }
+
+ public boolean isSizeBasedSplitWeightsEnabled()
+ {
+ return sizeBasedSplitWeightsEnabled;
+ }
+
+ @Config("hudi.standard-split-weight-size")
+ @ConfigDescription("The split size corresponding to the standard weight (1.0) "
+ + "when size based split weights are enabled")
+ public HudiConfig setStandardSplitWeightSize(DataSize standardSplitWeightSize)
+ {
+ this.standardSplitWeightSize = standardSplitWeightSize;
+ return this;
+ }
+
+ @NotNull
+ public DataSize getStandardSplitWeightSize()
+ {
+ return standardSplitWeightSize;
+ }
+
+ @Config("hudi.minimum-assigned-split-weight")
+ @ConfigDescription("Minimum weight that a split can be assigned when size based split weights are enabled")
+ public HudiConfig setMinimumAssignedSplitWeight(double minimumAssignedSplitWeight)
+ {
+ this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
+ return this;
+ }
+
+ @DecimalMax("1")
+ @DecimalMin(value = "0", inclusive = false)
+ public double getMinimumAssignedSplitWeight()
+ {
+ return minimumAssignedSplitWeight;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnector.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnector.java
new file mode 100644
index 000000000000..ec7e0e39fa96
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnector.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
+import io.trino.plugin.base.session.SessionPropertiesProvider;
+import io.trino.plugin.hive.HiveTransactionHandle;
+import io.trino.spi.classloader.ThreadContextClassLoader;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorAccessControl;
+import io.trino.spi.connector.ConnectorHandleResolver;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorNodePartitioningProvider;
+import io.trino.spi.connector.ConnectorPageSourceProvider;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.SystemTable;
+import io.trino.spi.session.PropertyMetadata;
+import io.trino.spi.transaction.IsolationLevel;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.trino.spi.transaction.IsolationLevel.SERIALIZABLE;
+import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class HudiConnector
+ implements Connector
+{
+ private static final Logger log = Logger.get(HudiConnector.class);
+
+ private final LifeCycleManager lifeCycleManager;
+ private final HudiTransactionManager transactionManager;
+ private final HudiMetadataFactory metadataFactory;
+ private final ConnectorSplitManager splitManager;
+ private final ConnectorPageSourceProvider pageSourceProvider;
+ private final ConnectorNodePartitioningProvider nodePartitioningProvider;
+ private final Set systemTables;
+ private final List> sessionProperties;
+ private final List> tableProperties;
+ private final Optional accessControl;
+
+ public HudiConnector(
+ LifeCycleManager lifeCycleManager,
+ HudiTransactionManager transactionManager,
+ HudiMetadataFactory metadataFactory,
+ ConnectorSplitManager splitManager,
+ ConnectorPageSourceProvider pageSourceProvider,
+ ConnectorNodePartitioningProvider nodePartitioningProvider,
+ Set systemTables,
+ Set sessionPropertiesProviders,
+ List> tableProperties,
+ Optional accessControl)
+ {
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
+ this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
+ this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null");
+ this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null"));
+ this.sessionProperties = requireNonNull(sessionPropertiesProviders, "sessionPropertiesProviders is null").stream()
+ .flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream())
+ .collect(toImmutableList());
+ this.tableProperties = ImmutableList.copyOf(requireNonNull(tableProperties, "tableProperties is null"));
+ this.accessControl = requireNonNull(accessControl, "accessControl is null");
+ }
+
+ @Override
+ public Optional getHandleResolver()
+ {
+ return Optional.of(new HudiHandleResolver());
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
+ {
+ ConnectorMetadata metadata = transactionManager.get(transactionHandle);
+ return new ClassLoaderSafeConnectorMetadata(metadata, getClass().getClassLoader());
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorPageSourceProvider getPageSourceProvider()
+ {
+ return pageSourceProvider;
+ }
+
+ @Override
+ public ConnectorNodePartitioningProvider getNodePartitioningProvider()
+ {
+ return nodePartitioningProvider;
+ }
+
+ @Override
+ public Set getSystemTables()
+ {
+ return systemTables;
+ }
+
+ @Override
+ public List> getSessionProperties()
+ {
+ return sessionProperties;
+ }
+
+ @Override
+ public List> getTableProperties()
+ {
+ return tableProperties;
+ }
+
+ @Override
+ public ConnectorAccessControl getAccessControl()
+ {
+ return accessControl.orElseThrow(UnsupportedOperationException::new);
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
+ {
+ checkConnectorSupports(SERIALIZABLE, isolationLevel);
+ ConnectorTransactionHandle transaction = new HiveTransactionHandle();
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
+ transactionManager.put(transaction, metadataFactory.create());
+ }
+ return transaction;
+ }
+
+ @Override
+ public void commit(ConnectorTransactionHandle transaction)
+ {
+ transactionManager.remove(transaction);
+ }
+
+ @Override
+ public void rollback(ConnectorTransactionHandle transaction)
+ {
+ HudiMetadata metadata = transactionManager.remove(transaction);
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
+ metadata.rollback();
+ }
+ }
+
+ @Override
+ public final void shutdown()
+ {
+ try {
+ lifeCycleManager.stop();
+ }
+ catch (Exception e) {
+ log.error(e, "Error shutting down connector");
+ }
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java
new file mode 100644
index 000000000000..34d077f756a5
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.connector.ConnectorFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static java.util.Objects.requireNonNull;
+
+public class HudiConnectorFactory
+ implements ConnectorFactory
+{
+ private final String name;
+ private final Class extends Module> module;
+
+ public HudiConnectorFactory(String name)
+ {
+ this(name, EmptyModule.class);
+ }
+
+ public HudiConnectorFactory(String name, Class extends Module> module)
+ {
+ checkArgument(!isNullOrEmpty(name), "name is null or empty");
+ this.name = name;
+ this.module = requireNonNull(module, "module is null");
+ }
+
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public Connector create(String catalogName, Map config, ConnectorContext context)
+ {
+ ClassLoader classLoader = context.duplicatePluginClassLoader();
+ try {
+ Object moduleInstance = classLoader.loadClass(module.getName()).getConstructor().newInstance();
+ Class> moduleClass = classLoader.loadClass(Module.class.getName());
+ return (Connector) classLoader.loadClass(InternalHudiConnectorFactory.class.getName())
+ .getMethod("createConnector", String.class, Map.class, ConnectorContext.class, moduleClass)
+ .invoke(null, catalogName, config, context, moduleInstance);
+ }
+ catch (InvocationTargetException e) {
+ Throwable targetException = e.getTargetException();
+ throwIfUnchecked(targetException);
+ throw new RuntimeException(targetException);
+ }
+ catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static class EmptyModule
+ implements Module
+ {
+ @Override
+ public void configure(Binder binder) {}
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java
new file mode 100644
index 000000000000..d38724451c22
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import io.trino.spi.ErrorCode;
+import io.trino.spi.ErrorCodeSupplier;
+import io.trino.spi.ErrorType;
+
+import static io.trino.spi.ErrorType.EXTERNAL;
+import static io.trino.spi.ErrorType.INTERNAL_ERROR;
+import static io.trino.spi.ErrorType.USER_ERROR;
+
+public enum HudiErrorCode
+ implements ErrorCodeSupplier
+{
+ HUDI_UNKNOWN_TABLE_TYPE(0, EXTERNAL),
+ HUDI_INVALID_METADATA(1, EXTERNAL),
+ HUDI_TOO_MANY_OPEN_PARTITIONS(2, USER_ERROR),
+ HUDI_INVALID_PARTITION_VALUE(3, EXTERNAL),
+ HUDI_BAD_DATA(4, EXTERNAL),
+ HUDI_MISSING_DATA(5, EXTERNAL),
+ HUDI_CANNOT_OPEN_SPLIT(6, EXTERNAL),
+ HUDI_WRITER_OPEN_ERROR(7, EXTERNAL),
+ HUDI_FILESYSTEM_ERROR(8, EXTERNAL),
+ HUDI_CURSOR_ERROR(9, EXTERNAL),
+ HUDI_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR),
+ HUDI_INVALID_SNAPSHOT_ID(11, USER_ERROR);
+
+ private final ErrorCode errorCode;
+
+ HudiErrorCode(int code, ErrorType type)
+ {
+ errorCode = new ErrorCode(code + 0x0100_0000, name(), type);
+ }
+
+ @Override
+ public ErrorCode toErrorCode()
+ {
+ return errorCode;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHandleResolver.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHandleResolver.java
new file mode 100644
index 000000000000..fd72934f825a
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHandleResolver.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import io.trino.plugin.hive.HiveColumnHandle;
+import io.trino.plugin.hive.HiveInsertTableHandle;
+import io.trino.plugin.hive.HiveOutputTableHandle;
+import io.trino.plugin.hive.HiveTransactionHandle;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorHandleResolver;
+import io.trino.spi.connector.ConnectorInsertTableHandle;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
+import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+
+public class HudiHandleResolver
+ implements ConnectorHandleResolver
+{
+ @Override
+ public Class extends ConnectorTableHandle> getTableHandleClass()
+ {
+ return HudiTableHandle.class;
+ }
+
+ @Override
+ public Class extends ColumnHandle> getColumnHandleClass()
+ {
+ return HiveColumnHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorSplit> getSplitClass()
+ {
+ return HudiSplit.class;
+ }
+
+ @Override
+ public Class extends ConnectorOutputTableHandle> getOutputTableHandleClass()
+ {
+ return HiveOutputTableHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorInsertTableHandle> getInsertTableHandleClass()
+ {
+ return HiveInsertTableHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorTransactionHandle> getTransactionHandleClass()
+ {
+ return HiveTransactionHandle.class;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiInputInfo.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiInputInfo.java
new file mode 100644
index 000000000000..36843d75540f
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiInputInfo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public class HudiInputInfo
+{
+ private final List partitionIds;
+ // Code that serialize HudiInputInfo into log would often need the ability to limit the length of log entries.
+ // This boolean field allows such code to mark the log entry as length limited.
+ private final boolean truncated;
+
+ @JsonCreator
+ public HudiInputInfo(
+ @JsonProperty("partitionIds") List partitionIds,
+ @JsonProperty("truncated") boolean truncated)
+ {
+ this.partitionIds = partitionIds;
+ this.truncated = truncated;
+ }
+
+ @JsonProperty
+ public List getPartitionIds()
+ {
+ return partitionIds;
+ }
+
+ @JsonProperty
+ public boolean isTruncated()
+ {
+ return truncated;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
new file mode 100644
index 000000000000..e03aa4a01ac5
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import io.airlift.log.Logger;
+import io.trino.plugin.hive.HdfsEnvironment;
+import io.trino.plugin.hive.HiveColumnHandle;
+import io.trino.plugin.hive.HivePartition;
+import io.trino.plugin.hive.acid.AcidSchema;
+import io.trino.plugin.hive.authentication.HiveIdentity;
+import io.trino.plugin.hive.metastore.Column;
+import io.trino.plugin.hive.metastore.HiveMetastore;
+import io.trino.plugin.hive.metastore.Table;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.ConnectorTableProperties;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.ConstraintApplicationResult;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.SchemaTablePrefix;
+import io.trino.spi.connector.TableNotFoundException;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.type.TypeManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static com.google.common.collect.Iterables.concat;
+import static io.trino.plugin.hive.HiveColumnHandle.BUCKET_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveColumnHandle.FILE_SIZE_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveColumnHandle.PARTITION_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveColumnHandle.PATH_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
+import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
+import static io.trino.plugin.hive.HiveTableProperties.EXTERNAL_LOCATION_PROPERTY;
+import static io.trino.plugin.hive.HiveTableProperties.PARTITIONED_BY_PROPERTY;
+import static io.trino.plugin.hive.HiveTimestampPrecision.NANOSECONDS;
+import static io.trino.plugin.hive.util.HiveUtil.columnExtraInfo;
+import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
+import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
+import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNKNOWN_TABLE_TYPE;
+import static io.trino.plugin.hudi.HudiUtil.splitPredicate;
+import static java.lang.String.format;
+import static java.util.Collections.singletonList;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE;
+import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable;
+import static org.apache.hudi.common.fs.FSUtils.getFs;
+import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
+import static org.apache.hudi.exception.TableNotFoundException.checkTableValidity;
+
+public class HudiMetadata
+ implements ConnectorMetadata
+{
+ private static final Logger log = Logger.get(HudiMetadata.class);
+ private final HiveMetastore metastore;
+ private final HdfsEnvironment hdfsEnvironment;
+ private final TypeManager typeManager;
+ private Table hiveTable;
+
+ public HudiMetadata(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager)
+ {
+ this.metastore = requireNonNull(metastore, "metastore is null");
+ this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
+
+ @Override
+ public List listSchemaNames(ConnectorSession session)
+ {
+ return metastore.getAllDatabases();
+ }
+
+ @Override
+ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
+ {
+ requireNonNull(tableName, "tableName is null");
+ if (isHiveSystemSchema(tableName.getSchemaName())) {
+ return null;
+ }
+ Optional table = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), tableName.getTableName());
+ if (table.isEmpty()) {
+ return null;
+ }
+ hiveTable = table.get();
+ if (!isHudiTable(session, hiveTable)) {
+ throw new TrinoException(HUDI_UNKNOWN_TABLE_TYPE, format("Not a Hudi table: %s", tableName));
+ }
+ return new HudiTableHandle(
+ tableName.getSchemaName(),
+ tableName.getTableName(),
+ hiveTable.getStorage().getLocation(),
+ HoodieTableType.COPY_ON_WRITE,
+ TupleDomain.all(),
+ TupleDomain.all(),
+ Optional.of(getTableMetaClient(session, table.get())));
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
+ {
+ HudiTableHandle hudiTableHandle = (HudiTableHandle) table;
+ return getTableMetadata(hudiTableHandle.getSchemaTableName());
+ }
+
+ @Override
+ public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint)
+ {
+ HudiTableHandle handle = (HudiTableHandle) tableHandle;
+ HudiPredicates predicates = splitPredicate(constraint.getSummary());
+ HudiTableHandle newHudiTableHandle = handle.withPredicates(predicates);
+
+ if (handle.getPartitionPredicates().equals(newHudiTableHandle.getPartitionPredicates())
+ && handle.getRegularPredicates().equals(newHudiTableHandle.getRegularPredicates())) {
+ log.info("No new predicates to apply");
+ return Optional.empty();
+ }
+
+ return Optional.of(new ConstraintApplicationResult<>(
+ newHudiTableHandle,
+ newHudiTableHandle.getRegularPredicates().transformKeys(ColumnHandle.class::cast),
+ false));
+ }
+
+ @Override
+ public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ return new ConnectorTableProperties();
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ requireNonNull(hiveTable, "hiveTable is null");
+ return hiveColumnHandles(hiveTable, typeManager, NANOSECONDS).stream()
+ .collect(toImmutableMap(HiveColumnHandle::getName, identity()));
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
+ {
+ return ((HiveColumnHandle) columnHandle).getColumnMetadata();
+ }
+
+ @Override
+ public Optional