diff --git a/core/trino-server/src/main/provisio/presto.xml b/core/trino-server/src/main/provisio/presto.xml
index 48190e7b21a7..02575a105ae4 100644
--- a/core/trino-server/src/main/provisio/presto.xml
+++ b/core/trino-server/src/main/provisio/presto.xml
@@ -74,6 +74,12 @@
+
+
+
+
+
+
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java
index 5546808b2bdb..37f33bc6cfaa 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java
@@ -28,7 +28,6 @@
import io.trino.plugin.hive.HiveSplit.BucketConversion;
import io.trino.plugin.hive.HiveSplit.BucketValidation;
import io.trino.plugin.hive.acid.AcidTransaction;
-import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.util.HiveBucketing.BucketingVersion;
@@ -100,8 +99,6 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
-import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
-import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static io.trino.plugin.hive.HivePartitionManager.partitionMatches;
import static io.trino.plugin.hive.HiveSessionProperties.getMaxInitialSplitSize;
@@ -114,11 +111,11 @@
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.FAIL;
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.IGNORED;
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.RECURSE;
-import static io.trino.plugin.hive.util.HiveUtil.checkCondition;
import static io.trino.plugin.hive.util.HiveUtil.getFooterCount;
import static io.trino.plugin.hive.util.HiveUtil.getHeaderCount;
import static io.trino.plugin.hive.util.HiveUtil.getInputFormat;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
+import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeys;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.Integer.parseInt;
import static java.lang.Math.max;
@@ -942,28 +939,6 @@ private static List getTargetPathsFromSymlink(FileSystem fileSystem, Path
}
}
- private static List getPartitionKeys(Table table, Optional partition)
- {
- if (partition.isEmpty()) {
- return ImmutableList.of();
- }
- ImmutableList.Builder partitionKeys = ImmutableList.builder();
- List keys = table.getPartitionColumns();
- List values = partition.get().getValues();
- checkCondition(keys.size() == values.size(), HIVE_INVALID_METADATA, "Expected %s partition key values, but got %s", keys.size(), values.size());
- for (int i = 0; i < keys.size(); i++) {
- String name = keys.get(i).getName();
- HiveType hiveType = keys.get(i).getType();
- if (!hiveType.isSupportedType(table.getStorage().getStorageFormat())) {
- throw new TrinoException(NOT_SUPPORTED, format("Unsupported Hive type %s found in partition keys of table %s.%s", hiveType, table.getDatabaseName(), table.getTableName()));
- }
- String value = values.get(i);
- checkCondition(value != null, HIVE_INVALID_PARTITION_VALUE, "partition key value cannot be null for field: %s", name);
- partitionKeys.add(new HivePartitionKey(name, value));
- }
- return partitionKeys.build();
- }
-
private static Properties getPartitionSchema(Table table, Optional partition)
{
if (partition.isEmpty()) {
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
index d72571d27363..d57d4a3909cf 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
@@ -290,9 +290,9 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq
Optional readerProjections = projectBaseColumns(columns);
List baseColumns = readerProjections.map(projection ->
- projection.get().stream()
- .map(HiveColumnHandle.class::cast)
- .collect(toUnmodifiableList()))
+ projection.get().stream()
+ .map(HiveColumnHandle.class::cast)
+ .collect(toUnmodifiableList()))
.orElse(columns);
for (HiveColumnHandle column : baseColumns) {
@@ -365,7 +365,7 @@ public static Optional getColumnType(HiveColumnH
return Optional.of(new GroupType(baseType.getRepetition(), baseType.getName(), ImmutableList.of(type)));
}
- private static Optional getColumnIndexStore(
+ public static Optional getColumnIndexStore(
ParquetDataSource dataSource,
BlockMetaData blockMetadata,
Map, RichColumnDescriptor> descriptorsByPath,
@@ -440,7 +440,7 @@ public static TupleDomain getParquetTupleDomain(
return TupleDomain.withColumnDomains(predicate.build());
}
- private static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
+ public static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
{
if (useParquetColumnNames) {
return getParquetTypeByName(column.getBaseColumnName(), messageType);
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java
index c6d398402f0b..100809bdef90 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java
@@ -32,6 +32,7 @@
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.avro.TrinoAvroSerDe;
import io.trino.plugin.hive.metastore.Column;
+import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.SortingColumn;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.ErrorCodeSupplier;
@@ -1143,4 +1144,26 @@ public static boolean isIcebergTable(Table table)
{
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(ICEBERG_TABLE_TYPE_NAME));
}
+
+ public static List getPartitionKeys(Table table, Optional partition)
+ {
+ if (partition.isEmpty()) {
+ return ImmutableList.of();
+ }
+ ImmutableList.Builder partitionKeys = ImmutableList.builder();
+ List keys = table.getPartitionColumns();
+ List values = partition.get().getValues();
+ checkCondition(keys.size() == values.size(), HIVE_INVALID_METADATA, "Expected %s partition key values, but got %s", keys.size(), values.size());
+ for (int i = 0; i < keys.size(); i++) {
+ String name = keys.get(i).getName();
+ HiveType hiveType = keys.get(i).getType();
+ if (!hiveType.isSupportedType(table.getStorage().getStorageFormat())) {
+ throw new TrinoException(NOT_SUPPORTED, format("Unsupported Hive type %s found in partition keys of table %s.%s", hiveType, table.getDatabaseName(), table.getTableName()));
+ }
+ String value = values.get(i);
+ checkCondition(value != null, HIVE_INVALID_PARTITION_VALUE, "partition key value cannot be null for field: %s", name);
+ partitionKeys.add(new HivePartitionKey(name, value));
+ }
+ return partitionKeys.build();
+ }
}
diff --git a/plugin/trino-hudi/pom.xml b/plugin/trino-hudi/pom.xml
new file mode 100644
index 000000000000..8c91dcfa1b24
--- /dev/null
+++ b/plugin/trino-hudi/pom.xml
@@ -0,0 +1,510 @@
+
+
+ 4.0.0
+
+
+ trino-root
+ io.trino
+ 368-SNAPSHOT
+ ../../pom.xml
+
+
+ trino-hudi
+ Trino - Hudi Connector
+ trino-plugin
+
+
+ ${project.parent.basedir}
+ 0.11.0-SNAPSHOT
+
+
+
+
+ io.trino
+ trino-hive
+
+
+ io.trino
+ trino-memory-context
+
+
+ io.trino
+ trino-parquet
+
+
+ io.trino
+ trino-plugin-toolkit
+
+
+ io.trino.hadoop
+ hadoop-apache
+
+
+ io.trino.hive
+ hive-apache
+
+
+ io.airlift
+ bootstrap
+
+
+ io.airlift
+ configuration
+
+
+ io.airlift
+ event
+
+
+ io.airlift
+ json
+
+
+ io.airlift
+ log
+
+
+ io.airlift
+ units
+
+
+ com.google.guava
+ guava
+
+
+ com.google.inject
+ guice
+
+
+ javax.inject
+ javax.inject
+
+
+ javax.validation
+ validation-api
+
+
+ joda-time
+ joda-time
+
+
+ org.apache.hudi
+ hudi-common
+ ${dep.hudi.version}
+
+
+ org.apache.hbase
+ hbase-server
+
+
+ org.apache.orc
+ orc-core
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.apache.httpcomponents
+ fluent-hc
+
+
+ org.rocksdb
+ rocksdbjni
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.httpcomponents
+ httpcore
+
+
+ org.apache.hive
+ hive-exec
+
+
+ org.apache.hive
+ hive-jdbc
+
+
+
+
+ org.apache.hudi
+ hudi-hadoop-mr
+ ${dep.hudi.version}
+
+
+ org.apache.hbase
+ hbase-server
+
+
+ org.apache.orc
+ orc-core
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.apache.httpcomponents
+ fluent-hc
+
+
+ org.rocksdb
+ rocksdbjni
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.httpcomponents
+ httpcore
+
+
+ org.apache.hive
+ hive-exec
+
+
+ org.apache.hive
+ hive-jdbc
+
+
+
+
+ org.weakref
+ jmxutils
+
+
+ io.airlift
+ log-manager
+ runtime
+
+
+ org.apache.hbase
+ hbase-common
+ 1.7.1
+ runtime
+
+
+ commons-codec
+ commons-codec
+
+
+ commons-lang
+ commons-lang
+
+
+ commons-logging
+ commons-logging
+
+
+ log4j
+ log4j
+
+
+ org.apache.commons
+ commons-lang
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.javassist
+ javassist
+
+
+
+
+ org.apache.hbase
+ hbase-hadoop-compat
+ 1.7.1
+ runtime
+
+
+ commons-logging
+ commons-logging
+
+
+ log4j
+ log4j
+
+
+
+
+ org.apache.hbase
+ hbase-server
+ 1.7.1
+ runtime
+
+
+ asm
+ asm
+
+
+ com.sun.jersey
+ jersey-core
+
+
+ com.sun.jersey
+ jersey-server
+
+
+ commons-codec
+ commons-codec
+
+
+ commons-lang
+ commons-lang
+
+
+ commons-logging
+ commons-logging
+
+
+ io.netty
+ netty-handler
+
+
+ io.netty
+ netty-transport-native-epoll
+
+
+ log4j
+ log4j
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ io.dropwizard.metrics
+ metrics-core
+
+
+ jakarta.ws.rs
+ jakarta.ws.rs-api
+
+
+ jakarta.annotation
+ jakarta.annotation-api
+
+
+ jakarta.validation
+ jakarta.validation-api
+
+
+ org.apache.hbase.thirdparty
+ hbase-shaded-jetty
+
+
+ org.apache.hadoop
+ hadoop-annotations
+
+
+ org.apache.hadoop
+ hadoop-auth
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.hadoop
+ hadoop-distcp
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hbase
+ hbase-protocol
+
+
+ org.apache.hbase.thirdparty
+ hbase-shaded-protobuf
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.apache.httpcomponents
+ httpcore
+
+
+ org.glassfish.hk2.external
+ jakarta.inject
+
+
+ org.javassist
+ javassist
+
+
+ org.jruby.jcodings
+ jcodings
+
+
+ org.mortbay.jetty
+ servlet-api-2.5
+
+
+ tomcat
+ jasper-compiler
+
+
+ tomcat
+ jasper-runtime
+
+
+
+
+ org.slf4j
+ log4j-over-slf4j
+ 1.7.32
+ runtime
+
+
+
+ io.trino
+ trino-spi
+ provided
+
+
+ io.airlift
+ slice
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+ org.openjdk.jol
+ jol-core
+ provided
+
+
+
+
+ io.trino
+ trino-hive
+ test-jar
+ test
+
+
+ io.trino
+ trino-hive-hadoop2
+ test
+
+
+ io.trino
+ trino-main
+ test
+
+
+ io.trino
+ trino-main
+ test-jar
+ test
+
+
+ io.trino
+ trino-parser
+ test
+
+
+ io.trino
+ trino-spi
+ test-jar
+ test
+
+
+ io.trino
+ trino-testing
+ test
+
+
+ io.trino
+ trino-testing-services
+ test
+
+
+ io.trino
+ trino-tpch
+ test
+
+
+ io.trino.tpch
+ tpch
+ test
+
+
+ io.airlift
+ testing
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ org.jetbrains
+ annotations
+ test
+
+
+ org.testng
+ testng
+ test
+
+
+
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java
new file mode 100644
index 000000000000..7b10e20d7fa2
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.units.DataSize;
+import org.apache.hudi.common.model.HoodieFileFormat;
+
+import javax.validation.constraints.NotNull;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+
+public class HudiConfig
+{
+ private HoodieFileFormat fileFormat = PARQUET;
+ private boolean metadataEnabled;
+ private boolean shouldSkipMetaStoreForPartition = true;
+ private DataSize maxSplitSize = DataSize.ofBytes(128 * 1024 * 1024);
+
+ @NotNull
+ public HoodieFileFormat getFileFormat()
+ {
+ return HoodieFileFormat.valueOf(fileFormat.name());
+ }
+
+ @Config("hudi.file-format")
+ public HudiConfig setFileFormat(HoodieFileFormat fileFormat)
+ {
+ this.fileFormat = fileFormat;
+ return this;
+ }
+
+ @Config("hudi.metadata-enabled")
+ @ConfigDescription("Fetch the list of file names and sizes from metadata rather than storage")
+ public HudiConfig setMetadataEnabled(boolean metadataEnabled)
+ {
+ this.metadataEnabled = metadataEnabled;
+ return this;
+ }
+
+ @NotNull
+ public boolean isMetadataEnabled()
+ {
+ return this.metadataEnabled;
+ }
+
+ @Config("hudi.max-split-size")
+ public HudiConfig setMaxSplitSize(DataSize size)
+ {
+ this.maxSplitSize = size;
+ return this;
+ }
+
+ @NotNull
+ public DataSize getMaxSplitSize()
+ {
+ return this.maxSplitSize;
+ }
+
+ @Config("hudi.skip-metastore-for-partition")
+ @ConfigDescription("Whether to skip metastore for partition")
+ public HudiConfig setSkipMetaStoreForPartition(boolean shouldSkipMetaStoreForPartition)
+ {
+ this.shouldSkipMetaStoreForPartition = shouldSkipMetaStoreForPartition;
+ return this;
+ }
+
+ @NotNull
+ public boolean getSkipMetaStoreForPartition()
+ {
+ return this.shouldSkipMetaStoreForPartition;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnector.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnector.java
new file mode 100644
index 000000000000..401bdf44619c
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnector.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
+import io.trino.plugin.base.session.SessionPropertiesProvider;
+import io.trino.plugin.hive.HiveTransactionHandle;
+import io.trino.spi.classloader.ThreadContextClassLoader;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorAccessControl;
+import io.trino.spi.connector.ConnectorHandleResolver;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorNodePartitioningProvider;
+import io.trino.spi.connector.ConnectorPageSinkProvider;
+import io.trino.spi.connector.ConnectorPageSourceProvider;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.SystemTable;
+import io.trino.spi.session.PropertyMetadata;
+import io.trino.spi.transaction.IsolationLevel;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.trino.spi.transaction.IsolationLevel.SERIALIZABLE;
+import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class HudiConnector
+ implements Connector
+{
+ private static final Logger log = Logger.get(HudiConnector.class);
+
+ private final LifeCycleManager lifeCycleManager;
+ private final HudiTransactionManager transactionManager;
+ private final HudiMetadataFactory metadataFactory;
+ private final ConnectorSplitManager splitManager;
+ private final ConnectorPageSourceProvider pageSourceProvider;
+ private final ConnectorPageSinkProvider pageSinkProvider;
+ private final ConnectorNodePartitioningProvider nodePartitioningProvider;
+ private final Set systemTables;
+ private final List> sessionProperties;
+ private final List> tableProperties;
+ private final Optional accessControl;
+
+ public HudiConnector(
+ LifeCycleManager lifeCycleManager,
+ HudiTransactionManager transactionManager,
+ HudiMetadataFactory metadataFactory,
+ ConnectorSplitManager splitManager,
+ ConnectorPageSourceProvider pageSourceProvider,
+ ConnectorPageSinkProvider pageSinkProvider,
+ ConnectorNodePartitioningProvider nodePartitioningProvider,
+ Set systemTables,
+ Set sessionPropertiesProviders,
+ List> tableProperties,
+ Optional accessControl)
+ {
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
+ this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
+ this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
+ this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null");
+ this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null"));
+ this.sessionProperties = requireNonNull(sessionPropertiesProviders, "sessionPropertiesProviders is null").stream()
+ .flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream())
+ .collect(toImmutableList());
+ this.tableProperties = ImmutableList.copyOf(requireNonNull(tableProperties, "tableProperties is null"));
+ this.accessControl = requireNonNull(accessControl, "accessControl is null");
+ }
+
+ @Override
+ public Optional getHandleResolver()
+ {
+ return Optional.of(new HudiHandleResolver());
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
+ {
+ ConnectorMetadata metadata = transactionManager.get(transactionHandle);
+ return new ClassLoaderSafeConnectorMetadata(metadata, getClass().getClassLoader());
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorPageSourceProvider getPageSourceProvider()
+ {
+ return pageSourceProvider;
+ }
+
+ @Override
+ public ConnectorPageSinkProvider getPageSinkProvider()
+ {
+ return pageSinkProvider;
+ }
+
+ @Override
+ public ConnectorNodePartitioningProvider getNodePartitioningProvider()
+ {
+ return nodePartitioningProvider;
+ }
+
+ @Override
+ public Set getSystemTables()
+ {
+ return systemTables;
+ }
+
+ @Override
+ public List> getSessionProperties()
+ {
+ return sessionProperties;
+ }
+
+ @Override
+ public List> getTableProperties()
+ {
+ return tableProperties;
+ }
+
+ @Override
+ public ConnectorAccessControl getAccessControl()
+ {
+ return accessControl.orElseThrow(UnsupportedOperationException::new);
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
+ {
+ checkConnectorSupports(SERIALIZABLE, isolationLevel);
+ ConnectorTransactionHandle transaction = new HiveTransactionHandle();
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
+ transactionManager.put(transaction, metadataFactory.create());
+ }
+ return transaction;
+ }
+
+ @Override
+ public void commit(ConnectorTransactionHandle transaction)
+ {
+ transactionManager.remove(transaction);
+ }
+
+ @Override
+ public void rollback(ConnectorTransactionHandle transaction)
+ {
+ HudiMetadata metadata = transactionManager.remove(transaction);
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
+ metadata.rollback();
+ }
+ }
+
+ @Override
+ public final void shutdown()
+ {
+ try {
+ lifeCycleManager.stop();
+ }
+ catch (Exception e) {
+ log.error(e, "Error shutting down connector");
+ }
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java
new file mode 100644
index 000000000000..34d077f756a5
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.connector.ConnectorFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static java.util.Objects.requireNonNull;
+
+public class HudiConnectorFactory
+ implements ConnectorFactory
+{
+ private final String name;
+ private final Class extends Module> module;
+
+ public HudiConnectorFactory(String name)
+ {
+ this(name, EmptyModule.class);
+ }
+
+ public HudiConnectorFactory(String name, Class extends Module> module)
+ {
+ checkArgument(!isNullOrEmpty(name), "name is null or empty");
+ this.name = name;
+ this.module = requireNonNull(module, "module is null");
+ }
+
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public Connector create(String catalogName, Map config, ConnectorContext context)
+ {
+ ClassLoader classLoader = context.duplicatePluginClassLoader();
+ try {
+ Object moduleInstance = classLoader.loadClass(module.getName()).getConstructor().newInstance();
+ Class> moduleClass = classLoader.loadClass(Module.class.getName());
+ return (Connector) classLoader.loadClass(InternalHudiConnectorFactory.class.getName())
+ .getMethod("createConnector", String.class, Map.class, ConnectorContext.class, moduleClass)
+ .invoke(null, catalogName, config, context, moduleInstance);
+ }
+ catch (InvocationTargetException e) {
+ Throwable targetException = e.getTargetException();
+ throwIfUnchecked(targetException);
+ throw new RuntimeException(targetException);
+ }
+ catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static class EmptyModule
+ implements Module
+ {
+ @Override
+ public void configure(Binder binder) {}
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java
new file mode 100644
index 000000000000..d38724451c22
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import io.trino.spi.ErrorCode;
+import io.trino.spi.ErrorCodeSupplier;
+import io.trino.spi.ErrorType;
+
+import static io.trino.spi.ErrorType.EXTERNAL;
+import static io.trino.spi.ErrorType.INTERNAL_ERROR;
+import static io.trino.spi.ErrorType.USER_ERROR;
+
+public enum HudiErrorCode
+ implements ErrorCodeSupplier
+{
+ HUDI_UNKNOWN_TABLE_TYPE(0, EXTERNAL),
+ HUDI_INVALID_METADATA(1, EXTERNAL),
+ HUDI_TOO_MANY_OPEN_PARTITIONS(2, USER_ERROR),
+ HUDI_INVALID_PARTITION_VALUE(3, EXTERNAL),
+ HUDI_BAD_DATA(4, EXTERNAL),
+ HUDI_MISSING_DATA(5, EXTERNAL),
+ HUDI_CANNOT_OPEN_SPLIT(6, EXTERNAL),
+ HUDI_WRITER_OPEN_ERROR(7, EXTERNAL),
+ HUDI_FILESYSTEM_ERROR(8, EXTERNAL),
+ HUDI_CURSOR_ERROR(9, EXTERNAL),
+ HUDI_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR),
+ HUDI_INVALID_SNAPSHOT_ID(11, USER_ERROR);
+
+ private final ErrorCode errorCode;
+
+ HudiErrorCode(int code, ErrorType type)
+ {
+ errorCode = new ErrorCode(code + 0x0100_0000, name(), type);
+ }
+
+ @Override
+ public ErrorCode toErrorCode()
+ {
+ return errorCode;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHandleResolver.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHandleResolver.java
new file mode 100644
index 000000000000..fd72934f825a
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiHandleResolver.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import io.trino.plugin.hive.HiveColumnHandle;
+import io.trino.plugin.hive.HiveInsertTableHandle;
+import io.trino.plugin.hive.HiveOutputTableHandle;
+import io.trino.plugin.hive.HiveTransactionHandle;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorHandleResolver;
+import io.trino.spi.connector.ConnectorInsertTableHandle;
+import io.trino.spi.connector.ConnectorOutputTableHandle;
+import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+
+public class HudiHandleResolver
+ implements ConnectorHandleResolver
+{
+ @Override
+ public Class extends ConnectorTableHandle> getTableHandleClass()
+ {
+ return HudiTableHandle.class;
+ }
+
+ @Override
+ public Class extends ColumnHandle> getColumnHandleClass()
+ {
+ return HiveColumnHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorSplit> getSplitClass()
+ {
+ return HudiSplit.class;
+ }
+
+ @Override
+ public Class extends ConnectorOutputTableHandle> getOutputTableHandleClass()
+ {
+ return HiveOutputTableHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorInsertTableHandle> getInsertTableHandleClass()
+ {
+ return HiveInsertTableHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorTransactionHandle> getTransactionHandleClass()
+ {
+ return HiveTransactionHandle.class;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiInputInfo.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiInputInfo.java
new file mode 100644
index 000000000000..36843d75540f
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiInputInfo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public class HudiInputInfo
+{
+ private final List partitionIds;
+ // Code that serialize HudiInputInfo into log would often need the ability to limit the length of log entries.
+ // This boolean field allows such code to mark the log entry as length limited.
+ private final boolean truncated;
+
+ @JsonCreator
+ public HudiInputInfo(
+ @JsonProperty("partitionIds") List partitionIds,
+ @JsonProperty("truncated") boolean truncated)
+ {
+ this.partitionIds = partitionIds;
+ this.truncated = truncated;
+ }
+
+ @JsonProperty
+ public List getPartitionIds()
+ {
+ return partitionIds;
+ }
+
+ @JsonProperty
+ public boolean isTruncated()
+ {
+ return truncated;
+ }
+}
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
new file mode 100644
index 000000000000..5b7819b28791
--- /dev/null
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import io.airlift.log.Logger;
+import io.trino.plugin.hive.HdfsEnvironment;
+import io.trino.plugin.hive.HiveColumnHandle;
+import io.trino.plugin.hive.HivePartition;
+import io.trino.plugin.hive.acid.AcidSchema;
+import io.trino.plugin.hive.authentication.HiveIdentity;
+import io.trino.plugin.hive.metastore.Column;
+import io.trino.plugin.hive.metastore.HiveMetastore;
+import io.trino.plugin.hive.metastore.Table;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.ConnectorTableProperties;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.ConstraintApplicationResult;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.SchemaTablePrefix;
+import io.trino.spi.connector.TableNotFoundException;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.type.TypeManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static com.google.common.collect.Iterables.concat;
+import static io.trino.plugin.hive.HiveColumnHandle.BUCKET_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveColumnHandle.FILE_SIZE_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveColumnHandle.PARTITION_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveColumnHandle.PATH_COLUMN_NAME;
+import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
+import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
+import static io.trino.plugin.hive.HiveTableProperties.EXTERNAL_LOCATION_PROPERTY;
+import static io.trino.plugin.hive.HiveTableProperties.PARTITIONED_BY_PROPERTY;
+import static io.trino.plugin.hive.HiveTimestampPrecision.NANOSECONDS;
+import static io.trino.plugin.hive.util.HiveUtil.columnExtraInfo;
+import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
+import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
+import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNKNOWN_TABLE_TYPE;
+import static io.trino.plugin.hudi.HudiUtil.splitPredicate;
+import static java.lang.String.format;
+import static java.util.Collections.singletonList;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE;
+import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable;
+import static org.apache.hudi.common.fs.FSUtils.getFs;
+import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
+import static org.apache.hudi.exception.TableNotFoundException.checkTableValidity;
+
+public class HudiMetadata
+ implements ConnectorMetadata
+{
+ private static final Logger LOG = Logger.get(HudiMetadata.class);
+ private final HiveMetastore metastore;
+ private final HdfsEnvironment hdfsEnvironment;
+ private final TypeManager typeManager;
+
+ public HudiMetadata(HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager)
+ {
+ this.metastore = requireNonNull(metastore, "metastore is null");
+ this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
+
+ @Override
+ public List listSchemaNames(ConnectorSession session)
+ {
+ return metastore.getAllDatabases();
+ }
+
+ @Override
+ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
+ {
+ requireNonNull(tableName, "tableName is null");
+ if (isHiveSystemSchema(tableName.getSchemaName())) {
+ return null;
+ }
+ Optional table = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), tableName.getTableName());
+ if (table.isEmpty()) {
+ return null;
+ }
+ if (!isHudiTable(session, table.get())) {
+ throw new TrinoException(HUDI_UNKNOWN_TABLE_TYPE, format("Not a Hudi table: %s", tableName));
+ }
+ return new HudiTableHandle(
+ tableName.getSchemaName(),
+ tableName.getTableName(),
+ table.get().getStorage().getLocation(),
+ HoodieTableType.COPY_ON_WRITE,
+ TupleDomain.all(),
+ Optional.of(getTableMetaClient(session, table.get())));
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
+ {
+ HudiTableHandle hudiTableHandle = (HudiTableHandle) table;
+ return getTableMetadata(session, hudiTableHandle.getSchemaTableName());
+ }
+
+ @Override
+ public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint)
+ {
+ HudiTableHandle handle = (HudiTableHandle) tableHandle;
+ if (constraint.getSummary().equals(handle.getPredicate())) {
+ return Optional.empty();
+ }
+
+ List> predicate = splitPredicate(constraint.getSummary());
+ TupleDomain unenforcedPredicate = predicate.get(1);
+ HudiTableHandle newHudiTableHandle = handle.withPredicate(constraint.getSummary().transformKeys(HiveColumnHandle.class::cast));
+
+ return Optional.of(new ConstraintApplicationResult<>(
+ newHudiTableHandle,
+ unenforcedPredicate,
+ false));
+ }
+
+ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName tableName)
+ {
+ Table table = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), tableName.getTableName())
+ .orElseThrow(() -> new TableNotFoundException(tableName));
+ Function metadataGetter = columnMetadataGetter(table);
+ ImmutableList.Builder columns = ImmutableList.builder();
+ for (HiveColumnHandle columnHandle : hiveColumnHandles(table, typeManager, NANOSECONDS)) {
+ columns.add(metadataGetter.apply(columnHandle));
+ }
+
+ // External location property
+ ImmutableMap.Builder properties = ImmutableMap.builder();
+ if (table.getTableType().equals(EXTERNAL_TABLE.name())) {
+ properties.put(EXTERNAL_LOCATION_PROPERTY, table.getStorage().getLocation());
+ }
+
+ // Partitioning property
+ List partitionedBy = table.getPartitionColumns().stream()
+ .map(Column::getName)
+ .collect(toImmutableList());
+ if (!partitionedBy.isEmpty()) {
+ properties.put(PARTITIONED_BY_PROPERTY, partitionedBy);
+ }
+
+ Optional comment = Optional.ofNullable(table.getParameters().get(TABLE_COMMENT));
+ return new ConnectorTableMetadata(tableName, columns.build(), properties.build(), comment);
+ }
+
+ @Override
+ public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ return new ConnectorTableProperties();
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ SchemaTableName tableName = ((HudiTableHandle) tableHandle).getSchemaTableName();
+ Table table = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), tableName.getTableName())
+ .orElseThrow(() -> new TableNotFoundException(tableName));
+ return hiveColumnHandles(table, typeManager, NANOSECONDS).stream()
+ .collect(toImmutableMap(HiveColumnHandle::getName, identity()));
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
+ {
+ return ((HiveColumnHandle) columnHandle).getColumnMetadata();
+ }
+
+ @Override
+ public Optional