From a5719017541be05066360d56a84cc551eb81c379 Mon Sep 17 00:00:00 2001 From: Guowei Zhang Date: Tue, 27 Sep 2022 22:41:38 +0800 Subject: [PATCH 1/3] Support Hudi MOR snapshot --- .../io/trino/plugin/hive/util/HiveUtil.java | 6 +- plugin/trino-hudi/pom.xml | 117 ++++++++++--- .../io/trino/plugin/hudi/HudiErrorCode.java | 4 +- .../io/trino/plugin/hudi/HudiMetadata.java | 11 +- .../java/io/trino/plugin/hudi/HudiModule.java | 2 + .../plugin/hudi/HudiPageSourceProvider.java | 100 +++++++---- .../trino/plugin/hudi/HudiRecordCursor.java | 158 ++++++++++++++++++ .../java/io/trino/plugin/hudi/HudiSplit.java | 92 ++++------ .../io/trino/plugin/hudi/HudiSplitSource.java | 10 +- .../io/trino/plugin/hudi/HudiTableHandle.java | 16 +- .../java/io/trino/plugin/hudi/HudiUtil.java | 10 ++ .../io/trino/plugin/hudi/files/FileSlice.java | 5 + .../trino/plugin/hudi/files/HudiBaseFile.java | 31 +++- .../io/trino/plugin/hudi/files/HudiFile.java | 27 +++ .../trino/plugin/hudi/files/HudiLogFile.java | 38 ++++- .../plugin/hudi/model/HudiTableType.java | 24 ++- .../partition/HudiPartitionInfoLoader.java | 13 +- .../hudi/query/HiveHudiRecordCursor.java | 47 ++++++ .../hudi/query/HudiDirectoryLister.java | 3 + .../HudiReadOptimizedDirectoryLister.java | 7 + .../hudi/split/HudiBackgroundSplitLoader.java | 7 +- .../plugin/hudi/split/HudiSplitFactory.java | 106 +++++++++--- .../hudi/table/HudiTableFileSystemView.java | 25 ++- .../plugin/hudi/TestHudiPartitionManager.java | 4 +- 24 files changed, 699 insertions(+), 164 deletions(-) create mode 100644 plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordCursor.java create mode 100644 plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiFile.java create mode 100644 plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HiveHudiRecordCursor.java diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java index 34286446862..9566c25a1f4 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java @@ -188,9 +188,9 @@ public final class HiveUtil // Input formats class names are listed below as String due to hudi-hadoop-mr dependency is not in the context of trino-hive plugin public static final String HUDI_PARQUET_INPUT_FORMAT = "org.apache.hudi.hadoop.HoodieParquetInputFormat"; - private static final String HUDI_PARQUET_REALTIME_INPUT_FORMAT = "org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat"; - private static final String HUDI_INPUT_FORMAT = "com.uber.hoodie.hadoop.HoodieInputFormat"; - private static final String HUDI_REALTIME_INPUT_FORMAT = "com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat"; + public static final String HUDI_PARQUET_REALTIME_INPUT_FORMAT = "org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat"; + public static final String HUDI_INPUT_FORMAT = "com.uber.hoodie.hadoop.HoodieInputFormat"; + public static final String HUDI_REALTIME_INPUT_FORMAT = "com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat"; private static final HexFormat HEX_UPPER_FORMAT = HexFormat.of().withUpperCase(); diff --git a/plugin/trino-hudi/pom.xml b/plugin/trino-hudi/pom.xml index dcd48f5dad5..5f5e0b2ba6a 100644 --- a/plugin/trino-hudi/pom.xml +++ b/plugin/trino-hudi/pom.xml @@ -54,11 +54,21 @@ trino-plugin-toolkit + + io.trino.hadoop + hadoop-apache + + io.trino.hive hive-apache + + io.airlift + aircompressor + + io.airlift bootstrap @@ -135,6 +145,94 @@ avro + + org.apache.hudi + hudi-common + ${dep.hudi.version} + + + org.apache.hbase + hbase-server + + + org.apache.hbase + hbase-client + + + org.osgi + org.osgi.core + + + org.apache.orc + orc-core + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-databind + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + fluent-hc + + + org.rocksdb + rocksdbjni + + + com.esotericsoftware + kryo-shaded + + + org.apache.hadoop + hadoop-client + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.httpcomponents + httpcore + + + org.apache.hive + hive-exec + + + org.apache.hive + hive-jdbc + + + com.github.ben-manes.caffeine + caffeine + + + org.lz4 + lz4-java + + + + + + org.apache.hudi + hudi-hadoop-mr + ${dep.hudi.version} + + + * + * + + + + org.weakref jmxutils @@ -147,12 +245,6 @@ runtime - - io.trino.hadoop - hadoop-apache - runtime - - io.airlift log-manager @@ -335,19 +427,6 @@ - - org.apache.hudi - hudi-common - ${dep.hudi.version} - test - - - * - * - - - - org.apache.hudi hudi-java-client diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java index 40501723650..94da2327cbc 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java @@ -31,8 +31,8 @@ public enum HudiErrorCode HUDI_CURSOR_ERROR(6, EXTERNAL), HUDI_FILESYSTEM_ERROR(7, EXTERNAL), HUDI_PARTITION_NOT_FOUND(8, EXTERNAL), - // HUDI_UNSUPPORTED_TABLE_TYPE(9, EXTERNAL), // Unused. Could be mistaken with HUDI_UNKNOWN_TABLE_TYPE. - + HUDI_UNSUPPORTED_TABLE_TYPE(9, EXTERNAL), + HUDI_NO_VALID_COMMIT(10, EXTERNAL) /**/; private final ErrorCode errorCode; diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java index 709e7439862..1cd04362ade 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java @@ -22,6 +22,7 @@ import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hudi.model.HudiTableType; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; @@ -52,6 +53,7 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.hive.HiveTimestampPrecision.NANOSECONDS; +import static io.trino.plugin.hive.metastore.MetastoreUtil.getHiveSchema; import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter; import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles; import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema; @@ -61,7 +63,6 @@ import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY; import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY; import static io.trino.plugin.hudi.HudiUtil.hudiMetadataExists; -import static io.trino.plugin.hudi.model.HudiTableType.COPY_ON_WRITE; import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE; import static io.trino.spi.connector.SchemaTableName.schemaTableName; import static java.lang.String.format; @@ -109,13 +110,17 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName throw new TrinoException(HUDI_BAD_DATA, "Location of table %s does not contain Hudi table metadata: %s".formatted(tableName, location)); } + String inputFormat = table.get().getStorage().getStorageFormat().getInputFormat(); + HudiTableType hudiTableType = HudiTableType.fromInputFormat(inputFormat); + return new HudiTableHandle( tableName.getSchemaName(), tableName.getTableName(), table.get().getStorage().getLocation(), - COPY_ON_WRITE, + hudiTableType, + TupleDomain.all(), TupleDomain.all(), - TupleDomain.all()); + getHiveSchema(table.get())); } @Override diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java index 279e395a986..d6f4d43f1e1 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java @@ -19,6 +19,7 @@ import com.google.inject.Provides; import com.google.inject.Scopes; import com.google.inject.Singleton; +import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HiveNodePartitioningProvider; @@ -59,6 +60,7 @@ public void configure(Binder binder) newSetBinder(binder, SessionPropertiesProvider.class).addBinding().to(HudiSessionProperties.class).in(Scopes.SINGLETON); binder.bind(HudiTableProperties.class).in(Scopes.SINGLETON); + binder.bind(HdfsEnvironment.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(HudiSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSourceProvider.class).to(HudiPageSourceProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorNodePartitioningProvider.class).to(HiveNodePartitioningProvider.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index 9e888c6a6a9..bc22457c649 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -19,6 +19,7 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; +import io.trino.hdfs.HdfsEnvironment; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetDataSourceId; @@ -32,6 +33,8 @@ import io.trino.plugin.hive.ReaderColumns; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.TrinoParquetDataSource; +import io.trino.plugin.hudi.files.HudiBaseFile; +import io.trino.plugin.hudi.files.HudiFile; import io.trino.plugin.hudi.model.HudiFileFormat; import io.trino.spi.TrinoException; import io.trino.spi.block.Block; @@ -43,8 +46,12 @@ import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.DynamicFilter; +import io.trino.spi.connector.RecordCursor; +import io.trino.spi.connector.RecordPageSource; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Decimals; +import io.trino.spi.type.Type; +import io.trino.spi.type.TypeManager; import io.trino.spi.type.TypeSignature; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -68,6 +75,7 @@ import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.slice.Slices.utf8Slice; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.parquet.ParquetTypeUtils.getColumnIO; @@ -86,10 +94,14 @@ import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CURSOR_ERROR; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_INVALID_PARTITION_VALUE; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_FILE_FORMAT; +import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_TABLE_TYPE; +import static io.trino.plugin.hudi.HudiRecordCursor.createRealtimeRecordCursor; import static io.trino.plugin.hudi.HudiSessionProperties.isParquetOptimizedNestedReaderEnabled; import static io.trino.plugin.hudi.HudiSessionProperties.isParquetOptimizedReaderEnabled; import static io.trino.plugin.hudi.HudiSessionProperties.shouldUseParquetColumnNames; import static io.trino.plugin.hudi.HudiUtil.getHudiFileFormat; +import static io.trino.plugin.hudi.model.HudiTableType.COPY_ON_WRITE; +import static io.trino.plugin.hudi.model.HudiTableType.MERGE_ON_READ; import static io.trino.spi.predicate.Utils.nativeValueToBlock; import static io.trino.spi.type.StandardTypes.BIGINT; import static io.trino.spi.type.StandardTypes.BOOLEAN; @@ -121,18 +133,24 @@ public class HudiPageSourceProvider private final FileFormatDataSourceStats dataSourceStats; private final ParquetReaderOptions options; private final DateTimeZone timeZone; + private final HdfsEnvironment hdfsEnvironment; + private final TypeManager typeManager; private static final int DOMAIN_COMPACTION_THRESHOLD = 1000; @Inject public HudiPageSourceProvider( TrinoFileSystemFactory fileSystemFactory, FileFormatDataSourceStats dataSourceStats, - ParquetReaderConfig parquetReaderConfig) + ParquetReaderConfig parquetReaderConfig, + HdfsEnvironment hdfsEnvironment, + TypeManager typeManager) { this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.dataSourceStats = requireNonNull(dataSourceStats, "dataSourceStats is null"); this.options = requireNonNull(parquetReaderConfig, "parquetReaderConfig is null").toParquetReaderOptions(); this.timeZone = DateTimeZone.forID(TimeZone.getDefault().getID()); + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); } @Override @@ -145,11 +163,7 @@ public ConnectorPageSource createPageSource( DynamicFilter dynamicFilter) { HudiSplit split = (HudiSplit) connectorSplit; - String path = split.getLocation(); - HudiFileFormat hudiFileFormat = getHudiFileFormat(path); - if (!HudiFileFormat.PARQUET.equals(hudiFileFormat)) { - throw new TrinoException(HUDI_UNSUPPORTED_FILE_FORMAT, format("File format %s not supported", hudiFileFormat)); - } + HudiTableHandle tableHandle = (HudiTableHandle) connectorTable; List hiveColumns = columns.stream() .map(HiveColumnHandle.class::cast) @@ -159,26 +173,55 @@ public ConnectorPageSource createPageSource( List regularColumns = hiveColumns.stream() .filter(columnHandle -> !columnHandle.isPartitionKey() && !columnHandle.isHidden()) .collect(Collectors.toList()); - TrinoFileSystem fileSystem = fileSystemFactory.create(session); - TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(path), split.getFileSize()); - ConnectorPageSource dataPageSource = createPageSource( - session, - regularColumns, - split, - inputFile, - dataSourceStats, - options.withBatchColumnReaders(isParquetOptimizedReaderEnabled(session)) - .withBatchNestedColumnReaders(isParquetOptimizedNestedReaderEnabled(session)), - timeZone); - return new HudiPageSource( - toPartitionName(split.getPartitionKeys()), - hiveColumns, - convertPartitionValues(hiveColumns, split.getPartitionKeys()), // create blocks for partition values - dataPageSource, - path, - split.getFileSize(), - split.getFileModifiedTime()); + if (tableHandle.getTableType().equals(COPY_ON_WRITE)) { + HudiBaseFile baseFile = split.getBaseFile().orElseThrow(() -> new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Split without base file is invalid")); + String path = baseFile.getLocation().toString(); + HudiFileFormat hudiFileFormat = getHudiFileFormat(path); + if (!HudiFileFormat.PARQUET.equals(hudiFileFormat)) { + throw new TrinoException(HUDI_UNSUPPORTED_FILE_FORMAT, format("File format %s not supported", hudiFileFormat)); + } + + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(path), baseFile.getFileSize()); + ConnectorPageSource dataPageSource = createPageSource( + session, + regularColumns, + split, + inputFile, + dataSourceStats, + options.withBatchColumnReaders(isParquetOptimizedReaderEnabled(session)) + .withBatchNestedColumnReaders(isParquetOptimizedNestedReaderEnabled(session)), + timeZone); + + return new HudiPageSource( + toPartitionName(split.getPartitionKeys()), + hiveColumns, + convertPartitionValues(hiveColumns, split.getPartitionKeys()), // create blocks for partition values + dataPageSource, + path, + baseFile.getFileSize(), + baseFile.getFileModifiedTime()); + } + else if (tableHandle.getTableType().equals(MERGE_ON_READ)) { + RecordCursor recordCursor = createRealtimeRecordCursor(hdfsEnvironment, session, split, tableHandle, regularColumns); + List types = regularColumns.stream() + .map(column -> column.getHiveType().getType(typeManager)) + .collect(toImmutableList()); + HudiFile hudiFile = HudiUtil.getHudiBaseFile(split); + + return new HudiPageSource( + toPartitionName(split.getPartitionKeys()), + hiveColumns, + convertPartitionValues(hiveColumns, split.getPartitionKeys()), // create blocks for partition values + new RecordPageSource(types, recordCursor), + hudiFile.getLocation().toString(), + hudiFile.getFileSize(), + hudiFile.getFileModifiedTime()); + } + else { + throw new TrinoException(HUDI_UNSUPPORTED_TABLE_TYPE, "Could not create page source for table type " + tableHandle.getTableType()); + } } private static ConnectorPageSource createPageSource( @@ -192,9 +235,10 @@ private static ConnectorPageSource createPageSource( { ParquetDataSource dataSource = null; boolean useColumnNames = shouldUseParquetColumnNames(session); - String path = hudiSplit.getLocation(); - long start = hudiSplit.getStart(); - long length = hudiSplit.getLength(); + HudiBaseFile baseFile = hudiSplit.getBaseFile().get(); + String path = baseFile.getLocation().toString(); + long start = baseFile.getOffset(); + long length = baseFile.getFileSize(); try { dataSource = new TrinoParquetDataSource(inputFile, options, dataSourceStats); ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordCursor.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordCursor.java new file mode 100644 index 00000000000..3bb61163739 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordCursor.java @@ -0,0 +1,158 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import io.airlift.compress.lzo.LzoCodec; +import io.airlift.compress.lzo.LzopCodec; +import io.trino.hdfs.HdfsContext; +import io.trino.hdfs.HdfsEnvironment; +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hive.util.HiveUtil; +import io.trino.plugin.hudi.files.HudiFile; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.RecordCursor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.function.Function; + +import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.Lists.newArrayList; +import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT; +import static io.trino.plugin.hudi.HudiUtil.getHudiBaseFile; +import static io.trino.plugin.hudi.query.HiveHudiRecordCursor.createRecordCursor; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.serde2.ColumnProjectionUtils.READ_ALL_COLUMNS; +import static org.apache.hadoop.hive.serde2.ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR; +import static org.apache.hadoop.hive.serde2.ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR; + +public class HudiRecordCursor +{ + private HudiRecordCursor() {} + + public static RecordCursor createRealtimeRecordCursor( + HdfsEnvironment hdfsEnvironment, + ConnectorSession session, + HudiSplit split, + HudiTableHandle tableHandle, + List dataColumns) + { + requireNonNull(session, "session is null"); + checkArgument(dataColumns.stream().allMatch(HudiRecordCursor::isRegularColumn), "dataColumns contains non regular column"); + HudiFile baseFile = getHudiBaseFile(split); + Path path = new Path(baseFile.getLocation().toString()); + Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session), path); + + return hdfsEnvironment.doAs(session.getIdentity(), () -> { + RecordReader recordReader = createRecordReader(configuration, tableHandle.getSchema(), split, dataColumns, tableHandle.getBasePath()); + @SuppressWarnings("unchecked") RecordReader reader = (RecordReader) recordReader; + return createRecordCursor(configuration, path, reader, baseFile.getFileSize(), tableHandle.getSchema(), dataColumns); + }); + } + + private static RecordReader createRecordReader( + Configuration configuration, + Properties schema, + HudiSplit split, + List dataColumns, + String basePath) + { + // update configuration + JobConf jobConf = new JobConf(configuration); + jobConf.setBoolean(READ_ALL_COLUMNS, false); + jobConf.set(READ_COLUMN_IDS_CONF_STR, join(dataColumns, HiveColumnHandle::getBaseHiveColumnIndex)); + jobConf.set(READ_COLUMN_NAMES_CONF_STR, join(dataColumns, HiveColumnHandle::getName)); + schema.stringPropertyNames() + .forEach(name -> jobConf.set(name, schema.getProperty(name))); + refineCompressionCodecs(jobConf); + + // create input format + String inputFormatName = HiveUtil.getInputFormatName(schema); + InputFormat inputFormat = createInputFormat(jobConf, inputFormatName); + + // create record reader for split + try { + HudiFile baseFile = getHudiBaseFile(split); + Path path = new Path(baseFile.getLocation().toString()); + FileSplit fileSplit = new FileSplit(path, baseFile.getOffset(), baseFile.getFileSize(), (String[]) null); + List logFiles = split.getLogFiles() + .stream() + .map(file -> new HoodieLogFile(file.getLocation().toString())).collect(toList()); + FileSplit hudiSplit = new HoodieRealtimeFileSplit(fileSplit, basePath, logFiles, split.getCommitTime(), false, Option.empty()); + return inputFormat.getRecordReader(hudiSplit, jobConf, Reporter.NULL); + } + catch (IOException e) { + String msg = format("Error opening Hive split %s using %s: %s", + split, + inputFormatName, + firstNonNull(e.getMessage(), e.getClass().getName())); + throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, msg, e); + } + } + + private static InputFormat createInputFormat(Configuration conf, String inputFormat) + { + try { + Class clazz = conf.getClassByName(inputFormat); + @SuppressWarnings("unchecked") Class> cls = + (Class>) clazz.asSubclass(InputFormat.class); + return ReflectionUtils.newInstance(cls, conf); + } + catch (ClassNotFoundException | RuntimeException e) { + throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Unable to create input format " + inputFormat, e); + } + } + + private static void refineCompressionCodecs(Configuration conf) + { + List codecs = newArrayList(Splitter.on(",").trimResults().omitEmptyStrings() + .split(conf.get("io.compression.codecs", ""))); + if (!codecs.contains(LzoCodec.class.getName())) { + codecs.add(0, LzoCodec.class.getName()); + } + if (!codecs.contains(LzopCodec.class.getName())) { + codecs.add(0, LzopCodec.class.getName()); + } + conf.set("io.compression.codecs", String.join(",", codecs)); + } + + private static String join(List list, Function extractor) + { + return Joiner.on(',').join(list.stream().map(extractor).iterator()); + } + + private static boolean isRegularColumn(HiveColumnHandle column) + { + return column.getColumnType() == HiveColumnHandle.ColumnType.REGULAR; + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java index da201180e3d..b32fd44dd47 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java @@ -19,15 +19,17 @@ import com.google.common.collect.ImmutableMap; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HivePartitionKey; +import io.trino.plugin.hudi.files.HudiBaseFile; +import io.trino.plugin.hudi.files.HudiLogFile; import io.trino.spi.HostAddress; import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.predicate.TupleDomain; import java.util.List; +import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; -import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.airlift.slice.SizeOf.instanceSize; import static java.lang.Math.toIntExact; @@ -38,41 +40,31 @@ public class HudiSplit { private static final int INSTANCE_SIZE = toIntExact(instanceSize(HudiSplit.class)); - private final String location; - private final long start; - private final long length; - private final long fileSize; - private final long fileModifiedTime; private final List addresses; private final TupleDomain predicate; private final List partitionKeys; private final SplitWeight splitWeight; + private final Optional baseFile; + private final List logFiles; + private final String commitTime; @JsonCreator public HudiSplit( - @JsonProperty("location") String location, - @JsonProperty("start") long start, - @JsonProperty("length") long length, - @JsonProperty("fileSize") long fileSize, - @JsonProperty("fileModifiedTime") long fileModifiedTime, @JsonProperty("addresses") List addresses, @JsonProperty("predicate") TupleDomain predicate, @JsonProperty("partitionKeys") List partitionKeys, - @JsonProperty("splitWeight") SplitWeight splitWeight) + @JsonProperty("splitWeight") SplitWeight splitWeight, + @JsonProperty("baseFile") Optional baseFile, + @JsonProperty("logFiles") List logFiles, + @JsonProperty("commitTime") String commitTime) { - checkArgument(start >= 0, "start must be positive"); - checkArgument(length >= 0, "length must be positive"); - checkArgument(start + length <= fileSize, "fileSize must be at least start + length"); - - this.location = requireNonNull(location, "location is null"); - this.start = start; - this.length = length; - this.fileSize = fileSize; - this.fileModifiedTime = fileModifiedTime; this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null")); this.predicate = requireNonNull(predicate, "predicate is null"); this.partitionKeys = ImmutableList.copyOf(requireNonNull(partitionKeys, "partitionKeys is null")); this.splitWeight = requireNonNull(splitWeight, "splitWeight is null"); + this.baseFile = requireNonNull(baseFile, "baseFile is null"); + this.logFiles = requireNonNull(logFiles, "logFiles is null"); + this.commitTime = requireNonNull(commitTime, "commitTime is null"); } @Override @@ -91,13 +83,15 @@ public List getAddresses() @Override public Object getInfo() { - return ImmutableMap.builder() - .put("location", location) - .put("start", start) - .put("length", length) - .put("fileSize", fileSize) - .put("fileModifiedTime", fileModifiedTime) - .buildOrThrow(); + ImmutableMap.Builder infoMap = ImmutableMap.builder().put("commitTime", commitTime); + baseFile.ifPresent(file -> infoMap + .put("location", file.getLocation().toString()) + .put("start", file.getOffset()) + .put("length", file.getFileSize()) + .put("fileSize", file.getFileSize()) + .put("fileModifiedTime", file.getFileEntry().lastModified().toEpochMilli())); + + return infoMap.buildOrThrow(); } @JsonProperty @@ -108,52 +102,40 @@ public SplitWeight getSplitWeight() } @JsonProperty - public String getLocation() - { - return location; - } - - @JsonProperty - public long getStart() - { - return start; - } - - @JsonProperty - public long getLength() + public TupleDomain getPredicate() { - return length; + return predicate; } @JsonProperty - public long getFileSize() + public List getPartitionKeys() { - return fileSize; + return partitionKeys; } @JsonProperty - public long getFileModifiedTime() + public Optional getBaseFile() { - return fileModifiedTime; + return baseFile; } @JsonProperty - public TupleDomain getPredicate() + public List getLogFiles() { - return predicate; + return logFiles; } @JsonProperty - public List getPartitionKeys() + public String getCommitTime() { - return partitionKeys; + return commitTime; } @Override public long getRetainedSizeInBytes() { return INSTANCE_SIZE - + estimatedSizeOf(location) + + estimatedSizeOf(baseFile.map(file -> file.getLocation().toString()).orElse("")) + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + splitWeight.getRetainedSizeInBytes() + predicate.getRetainedSizeInBytes(HiveColumnHandle::getRetainedSizeInBytes) @@ -164,11 +146,9 @@ public long getRetainedSizeInBytes() public String toString() { return toStringHelper(this) - .addValue(location) - .addValue(start) - .addValue(length) - .addValue(fileSize) - .addValue(fileModifiedTime) + .add("baseFile", baseFile) + .add("logFiles", logFiles) + .addValue(commitTime) .toString(); } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java index ad3aae7e671..19ef96670a0 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java @@ -22,6 +22,7 @@ import io.trino.plugin.hive.metastore.Table; import io.trino.plugin.hive.util.AsyncQueue; import io.trino.plugin.hive.util.ThrottledAsyncQueue; +import io.trino.plugin.hudi.model.HudiInstant; import io.trino.plugin.hudi.query.HudiDirectoryLister; import io.trino.plugin.hudi.query.HudiReadOptimizedDirectoryLister; import io.trino.plugin.hudi.split.HudiBackgroundSplitLoader; @@ -74,6 +75,12 @@ public HudiSplitSource( HudiTableMetaClient metaClient = buildTableMetaClient(fileSystemFactory.create(session), tableHandle.getBasePath()); List partitionColumnHandles = table.getPartitionColumns().stream() .map(column -> partitionColumnHandleMap.get(column.getName())).collect(toList()); + String latestCommitTime = metaClient.getActiveTimeline() + .getCommitsTimeline() + .filterCompletedInstants() + .lastInstant() + .map(HudiInstant::getTimestamp) + .orElseThrow(() -> new TrinoException(HudiErrorCode.HUDI_NO_VALID_COMMIT, "Table has no valid commits")); HudiDirectoryLister hudiDirectoryLister = new HudiReadOptimizedDirectoryLister( tableHandle, @@ -91,7 +98,8 @@ public HudiSplitSource( queue, new BoundedExecutor(executor, getSplitGeneratorParallelism(session)), createSplitWeightProvider(session), - partitions); + partitions, + latestCommitTime); this.splitLoaderFuture = splitLoaderExecutorService.schedule(splitLoader, 0, TimeUnit.MILLISECONDS); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java index 0da9f2d897a..2c7d275a214 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java @@ -21,6 +21,8 @@ import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.TupleDomain; +import java.util.Properties; + import static io.trino.spi.connector.SchemaTableName.schemaTableName; import static java.util.Objects.requireNonNull; @@ -33,6 +35,7 @@ public class HudiTableHandle private final HudiTableType tableType; private final TupleDomain partitionPredicates; private final TupleDomain regularPredicates; + private final Properties schema; @JsonCreator public HudiTableHandle( @@ -41,7 +44,8 @@ public HudiTableHandle( @JsonProperty("basePath") String basePath, @JsonProperty("tableType") HudiTableType tableType, @JsonProperty("partitionPredicates") TupleDomain partitionPredicates, - @JsonProperty("regularPredicates") TupleDomain regularPredicates) + @JsonProperty("regularPredicates") TupleDomain regularPredicates, + @JsonProperty("properties") Properties schema) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); @@ -49,6 +53,7 @@ public HudiTableHandle( this.tableType = requireNonNull(tableType, "tableType is null"); this.partitionPredicates = requireNonNull(partitionPredicates, "partitionPredicates is null"); this.regularPredicates = requireNonNull(regularPredicates, "regularPredicates is null"); + this.schema = requireNonNull(schema, "schema is null"); } @JsonProperty @@ -87,6 +92,12 @@ public TupleDomain getRegularPredicates() return regularPredicates; } + @JsonProperty + public Properties getSchema() + { + return schema; + } + public SchemaTableName getSchemaTableName() { return schemaTableName(schemaName, tableName); @@ -102,7 +113,8 @@ HudiTableHandle applyPredicates( basePath, tableType, partitionPredicates.intersect(partitionTupleDomain), - regularPredicates.intersect(regularTupleDomain)); + regularPredicates.intersect(regularTupleDomain), + schema); } @Override diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java index 4f115f00e65..4ecb7f01ebb 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java @@ -23,6 +23,7 @@ import io.trino.plugin.hive.HivePartitionKey; import io.trino.plugin.hive.HivePartitionManager; import io.trino.plugin.hive.metastore.Column; +import io.trino.plugin.hudi.files.HudiFile; import io.trino.plugin.hudi.model.HudiFileFormat; import io.trino.plugin.hudi.table.HudiTableMetaClient; import io.trino.spi.TrinoException; @@ -172,4 +173,13 @@ public static HudiTableMetaClient buildTableMetaClient( .setBasePath(Location.of(basePath)) .build(); } + + public static HudiFile getHudiBaseFile(HudiSplit hudiSplit) + { + if (hudiSplit.getBaseFile().isPresent()) { + return hudiSplit.getBaseFile().get(); + } + // use first log file as base file for MOR table if it hasn't base file + return hudiSplit.getLogFiles().get(0); + } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/FileSlice.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/FileSlice.java index a9d93624465..5ec9475cb69 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/FileSlice.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/FileSlice.java @@ -54,6 +54,11 @@ public Optional getBaseFile() return baseFile; } + public TreeSet getLogFiles() + { + return logFiles; + } + public boolean isEmpty() { return (baseFile == null) && (logFiles.isEmpty()); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiBaseFile.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiBaseFile.java index c9c651d2a5b..2cec446fe64 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiBaseFile.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiBaseFile.java @@ -22,11 +22,14 @@ import static java.util.Objects.requireNonNull; public class HudiBaseFile + implements HudiFile { private transient FileEntry fileEntry; private final String fullPath; private final String fileName; private long fileLen; + private long offset; + private long fileModifiedTime; public HudiBaseFile(FileEntry fileEntry) { @@ -42,14 +45,12 @@ private HudiBaseFile(FileEntry fileEntry, String fullPath, String fileName, long this.fullPath = requireNonNull(fullPath, "fullPath is null"); this.fileLen = fileLen; this.fileName = requireNonNull(fileName, "fileName is null"); + this.offset = fileEntry.blocks().map(listOfBlocks -> (!listOfBlocks.isEmpty()) ? listOfBlocks.get(0).offset() : 0).orElse(0L); + this.fileModifiedTime = fileEntry.lastModified().toEpochMilli(); } - public String getPath() - { - return fullPath; - } - - public Location getFullPath() + @Override + public Location getLocation() { if (fileEntry != null) { return fileEntry.location(); @@ -68,6 +69,24 @@ public FileEntry getFileEntry() return fileEntry; } + @Override + public long getFileSize() + { + return fileLen; + } + + @Override + public long getOffset() + { + return offset; + } + + @Override + public long getFileModifiedTime() + { + return fileModifiedTime; + } + public String getFileId() { return getFileName().split("_")[0]; diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiFile.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiFile.java new file mode 100644 index 00000000000..f24e5f7962b --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiFile.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.files; + +import io.trino.filesystem.Location; + +public interface HudiFile +{ + Location getLocation(); + + long getOffset(); + + long getFileSize(); + + long getFileModifiedTime(); +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiLogFile.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiLogFile.java index dde08427b23..5125e4b091d 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiLogFile.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/files/HudiLogFile.java @@ -25,43 +25,65 @@ import static io.trino.plugin.hudi.files.FSUtils.getWriteTokenFromLogPath; public class HudiLogFile + implements HudiFile { private static final Comparator LOG_FILE_COMPARATOR_REVERSED = new HudiLogFile.LogFileComparator().reversed(); private final String pathStr; private final long fileLen; + private final long modificationTime; - public HudiLogFile(FileEntry fileStatus) + public HudiLogFile(FileEntry fileEntry) { - this.pathStr = fileStatus.location().toString(); - this.fileLen = fileStatus.length(); + this.pathStr = fileEntry.location().toString(); + this.fileLen = fileEntry.length(); + this.modificationTime = fileEntry.lastModified().toEpochMilli(); } public String getFileId() { - return getFileIdFromLogPath(getPath()); + return getFileIdFromLogPath(getLocation()); } public String getBaseCommitTime() { - return getBaseCommitTimeFromLogPath(getPath()); + return getBaseCommitTimeFromLogPath(getLocation()); } public int getLogVersion() { - return getFileVersionFromLog(getPath()); + return getFileVersionFromLog(getLocation()); } public String getLogWriteToken() { - return getWriteTokenFromLogPath(getPath()); + return getWriteTokenFromLogPath(getLocation()); } - public Location getPath() + @Override + public Location getLocation() { return Location.of(pathStr); } + @Override + public long getOffset() + { + return 0L; + } + + @Override + public long getFileSize() + { + return fileLen; + } + + @Override + public long getFileModifiedTime() + { + return modificationTime; + } + public static Comparator getReverseLogFileComparator() { return LOG_FILE_COMPARATOR_REVERSED; diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiTableType.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiTableType.java index da93f80d95b..a76892449d5 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiTableType.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiTableType.java @@ -13,6 +13,14 @@ */ package io.trino.plugin.hudi.model; +import io.trino.spi.TrinoException; + +import static io.trino.plugin.hive.util.HiveUtil.HUDI_INPUT_FORMAT; +import static io.trino.plugin.hive.util.HiveUtil.HUDI_PARQUET_INPUT_FORMAT; +import static io.trino.plugin.hive.util.HiveUtil.HUDI_PARQUET_REALTIME_INPUT_FORMAT; +import static io.trino.plugin.hive.util.HiveUtil.HUDI_REALTIME_INPUT_FORMAT; +import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_TABLE_TYPE; + /** * Type of the Hoodie Table. *

@@ -25,5 +33,19 @@ public enum HudiTableType { COPY_ON_WRITE, - MERGE_ON_READ + MERGE_ON_READ; + + public static HudiTableType fromInputFormat(String inputFormat) + { + switch (inputFormat) { + case HUDI_PARQUET_INPUT_FORMAT: + case HUDI_INPUT_FORMAT: + return COPY_ON_WRITE; + case HUDI_PARQUET_REALTIME_INPUT_FORMAT: + case HUDI_REALTIME_INPUT_FORMAT: + return MERGE_ON_READ; + default: + throw new TrinoException(HUDI_UNSUPPORTED_TABLE_TYPE, "Unknown hudi table inputFormat: " + inputFormat); + } + } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java index 82a0b421610..2d0239bc12b 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java @@ -16,7 +16,7 @@ import io.airlift.concurrent.MoreFutures; import io.trino.plugin.hive.HivePartitionKey; import io.trino.plugin.hive.util.AsyncQueue; -import io.trino.plugin.hudi.HudiFileStatus; +import io.trino.plugin.hudi.files.FileSlice; import io.trino.plugin.hudi.query.HudiDirectoryLister; import io.trino.plugin.hudi.split.HudiSplitFactory; import io.trino.spi.connector.ConnectorSplit; @@ -32,6 +32,7 @@ public class HudiPartitionInfoLoader private final HudiSplitFactory hudiSplitFactory; private final AsyncQueue asyncQueue; private final Deque partitionQueue; + private final String commitTime; private boolean isRunning; @@ -39,12 +40,14 @@ public HudiPartitionInfoLoader( HudiDirectoryLister hudiDirectoryLister, HudiSplitFactory hudiSplitFactory, AsyncQueue asyncQueue, - Deque partitionQueue) + Deque partitionQueue, + String commitTime) { this.hudiDirectoryLister = hudiDirectoryLister; this.hudiSplitFactory = hudiSplitFactory; this.asyncQueue = asyncQueue; this.partitionQueue = partitionQueue; + this.commitTime = commitTime; this.isRunning = true; } @@ -66,9 +69,9 @@ private void generateSplitsFromPartition(String partitionName) partitionInfo.ifPresent(hudiPartitionInfo -> { if (hudiPartitionInfo.doesMatchPredicates() || partitionName.equals("")) { List partitionKeys = hudiPartitionInfo.getHivePartitionKeys(); - List partitionFiles = hudiDirectoryLister.listStatus(hudiPartitionInfo); - partitionFiles.stream() - .flatMap(fileStatus -> hudiSplitFactory.createSplits(partitionKeys, fileStatus).stream()) + List fileSlices = hudiDirectoryLister.listFileSlicesBeforeOn(hudiPartitionInfo, commitTime); + fileSlices.stream() + .flatMap(fileSlice -> hudiSplitFactory.createSplits(partitionKeys, fileSlice, commitTime).stream()) .map(asyncQueue::offer) .forEachOrdered(MoreFutures::getFutureValue); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HiveHudiRecordCursor.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HiveHudiRecordCursor.java new file mode 100644 index 00000000000..fdc2aecdc48 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HiveHudiRecordCursor.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.query; + +import io.trino.plugin.hive.GenericHiveRecordCursor; +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.spi.connector.RecordCursor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.RecordReader; + +import java.util.List; +import java.util.Properties; + +public final class HiveHudiRecordCursor +{ + private HiveHudiRecordCursor() {} + + public static RecordCursor createRecordCursor( + Configuration configuration, + Path path, + RecordReader recordReader, + long totalBytes, + Properties hiveSchema, + List hiveColumnHandles) + { + return new GenericHiveRecordCursor<>( + configuration, + path, + recordReader, + totalBytes, + hiveSchema, + hiveColumnHandles); + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java index 710dfc44916..2528bc3acfc 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java @@ -14,6 +14,7 @@ package io.trino.plugin.hudi.query; import io.trino.plugin.hudi.HudiFileStatus; +import io.trino.plugin.hudi.files.FileSlice; import io.trino.plugin.hudi.partition.HudiPartitionInfo; import java.io.Closeable; @@ -25,5 +26,7 @@ public interface HudiDirectoryLister { List listStatus(HudiPartitionInfo partitionInfo); + List listFileSlicesBeforeOn(HudiPartitionInfo partitionInfo, String commitTime); + Optional getPartitionInfo(String partition); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java index 869806461c7..9bb978312c5 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java @@ -19,6 +19,7 @@ import io.trino.plugin.hive.metastore.Table; import io.trino.plugin.hudi.HudiFileStatus; import io.trino.plugin.hudi.HudiTableHandle; +import io.trino.plugin.hudi.files.FileSlice; import io.trino.plugin.hudi.files.HudiBaseFile; import io.trino.plugin.hudi.partition.HiveHudiPartitionInfo; import io.trino.plugin.hudi.partition.HudiPartitionInfo; @@ -76,6 +77,12 @@ public List listStatus(HudiPartitionInfo partitionInfo) .collect(toImmutableList()); } + @Override + public List listFileSlicesBeforeOn(HudiPartitionInfo partitionInfo, String commitTime) + { + return fileSystemView.getLatestFileSlicesBeforeOn(partitionInfo.getRelativePartitionPath(), commitTime).toList(); + } + @Override public Optional getPartitionInfo(String partition) { diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java index 31447f74d08..9b659d4ad5e 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java @@ -43,6 +43,7 @@ public class HudiBackgroundSplitLoader private final int splitGeneratorNumThreads; private final HudiSplitFactory hudiSplitFactory; private final List partitions; + private final String commitTime; public HudiBackgroundSplitLoader( ConnectorSession session, @@ -51,7 +52,8 @@ public HudiBackgroundSplitLoader( AsyncQueue asyncQueue, Executor splitGeneratorExecutor, HudiSplitWeightProvider hudiSplitWeightProvider, - List partitions) + List partitions, + String commitTime) { this.hudiDirectoryLister = requireNonNull(hudiDirectoryLister, "hudiDirectoryLister is null"); this.asyncQueue = requireNonNull(asyncQueue, "asyncQueue is null"); @@ -59,6 +61,7 @@ public HudiBackgroundSplitLoader( this.splitGeneratorNumThreads = getSplitGeneratorParallelism(session); this.hudiSplitFactory = new HudiSplitFactory(tableHandle, hudiSplitWeightProvider); this.partitions = requireNonNull(partitions, "partitions is null"); + this.commitTime = requireNonNull(commitTime, "commitTime is null"); } @Override @@ -70,7 +73,7 @@ public void run() // Start a number of partition split generators to generate the splits in parallel for (int i = 0; i < splitGeneratorNumThreads; i++) { - HudiPartitionInfoLoader generator = new HudiPartitionInfoLoader(hudiDirectoryLister, hudiSplitFactory, asyncQueue, partitionQueue); + HudiPartitionInfoLoader generator = new HudiPartitionInfoLoader(hudiDirectoryLister, hudiSplitFactory, asyncQueue, partitionQueue, commitTime); splitGeneratorList.add(generator); splitGeneratorFutures.add(Futures.submit(generator, splitGeneratorExecutor)); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java index 4cd6d54c468..c1addbafac3 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java @@ -14,15 +14,23 @@ package io.trino.plugin.hudi.split; import com.google.common.collect.ImmutableList; +import io.trino.filesystem.FileEntry; import io.trino.plugin.hive.HivePartitionKey; -import io.trino.plugin.hudi.HudiFileStatus; import io.trino.plugin.hudi.HudiSplit; import io.trino.plugin.hudi.HudiTableHandle; +import io.trino.plugin.hudi.files.FileSlice; +import io.trino.plugin.hudi.files.HudiBaseFile; +import io.trino.plugin.hudi.files.HudiLogFile; +import io.trino.plugin.hudi.model.HudiTableType; import io.trino.spi.TrinoException; import java.util.List; +import java.util.Optional; -import static io.trino.plugin.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_TABLE_TYPE; +import static io.trino.plugin.hudi.model.HudiTableType.COPY_ON_WRITE; +import static io.trino.plugin.hudi.model.HudiTableType.MERGE_ON_READ; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -41,56 +49,104 @@ public HudiSplitFactory( this.hudiSplitWeightProvider = requireNonNull(hudiSplitWeightProvider, "hudiSplitWeightProvider is null"); } - public List createSplits(List partitionKeys, HudiFileStatus fileStatus) + public List createSplits(List partitionKeys, FileSlice fileSlice, String commitTime) { - if (fileStatus.isDirectory()) { - throw new TrinoException(HUDI_FILESYSTEM_ERROR, format("Not a valid location: %s", fileStatus.location())); + HudiTableType tableType = hudiTableHandle.getTableType(); + switch (tableType) { + case COPY_ON_WRITE: + return createSplitsForCOW(partitionKeys, fileSlice, commitTime); + case MERGE_ON_READ: + return createSplitsForMOR(partitionKeys, fileSlice, commitTime); + default: + throw new TrinoException(HUDI_UNSUPPORTED_TABLE_TYPE, format("Unsupported table type: %s", tableType)); + } + } + + private List createSplitsForCOW(List partitionKeys, FileSlice fileSlice, String commitTime) + { + if (!fileSlice.getBaseFile().isPresent()) { + return ImmutableList.of(); } - long fileSize = fileStatus.length(); + HudiBaseFile baseFile = fileSlice.getBaseFile().get(); + long fileSize = baseFile.getFileSize(); if (fileSize == 0) { return ImmutableList.of(new HudiSplit( - fileStatus.location().toString(), - 0, - fileSize, - fileSize, - fileStatus.modificationTime(), ImmutableList.of(), hudiTableHandle.getRegularPredicates(), partitionKeys, - hudiSplitWeightProvider.calculateSplitWeight(fileSize))); + hudiSplitWeightProvider.calculateSplitWeight(fileSize), + fileSlice.getBaseFile(), + ImmutableList.of(), + commitTime)); } ImmutableList.Builder splits = ImmutableList.builder(); - long splitSize = fileStatus.blockSize(); + FileEntry baseFileEntry = baseFile.getFileEntry(); + long splitSize = baseFileEntry.blocks() + .map(listOfBlocks -> (!listOfBlocks.isEmpty()) ? listOfBlocks.get(0).length() : 0).orElse(0L); long bytesRemaining = fileSize; while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { - splits.add(new HudiSplit( - fileStatus.location().toString(), - fileSize - bytesRemaining, - splitSize, + FileEntry fileEntry = new FileEntry( + baseFileEntry.location(), fileSize, - fileStatus.modificationTime(), + baseFileEntry.lastModified(), + Optional.of(ImmutableList.of(new FileEntry.Block( + ImmutableList.of(), + fileSize - bytesRemaining, + splitSize)))); + + splits.add(new HudiSplit( ImmutableList.of(), hudiTableHandle.getRegularPredicates(), partitionKeys, - hudiSplitWeightProvider.calculateSplitWeight(splitSize))); + hudiSplitWeightProvider.calculateSplitWeight(splitSize), + Optional.of(new HudiBaseFile(fileEntry)), + ImmutableList.of(), + commitTime)); bytesRemaining -= splitSize; } if (bytesRemaining > 0) { - splits.add(new HudiSplit( - fileStatus.location().toString(), - fileSize - bytesRemaining, - bytesRemaining, + FileEntry fileEntry = new FileEntry( + baseFileEntry.location(), fileSize, - fileStatus.modificationTime(), + baseFileEntry.lastModified(), + Optional.of(ImmutableList.of(new FileEntry.Block( + ImmutableList.of(), + fileSize - bytesRemaining, + bytesRemaining)))); + + splits.add(new HudiSplit( ImmutableList.of(), hudiTableHandle.getRegularPredicates(), partitionKeys, - hudiSplitWeightProvider.calculateSplitWeight(bytesRemaining))); + hudiSplitWeightProvider.calculateSplitWeight(bytesRemaining), + Optional.of(new HudiBaseFile(fileEntry)), + ImmutableList.of(), + commitTime)); } return splits.build(); } + + private List createSplitsForMOR(List partitionKeys, FileSlice fileSlice, String commitTime) + { + Optional baseFile = fileSlice.getBaseFile(); + List logFiles = fileSlice.getLogFiles().stream().collect(toImmutableList()); + + long logFilesSize = logFiles.size() > 0 ? logFiles.stream().map(HudiLogFile::getFileSize).reduce(0L, Long::sum) : 0L; + long fileSize = baseFile.isPresent() ? baseFile.get().getFileEntry().length() + logFilesSize : logFilesSize; + + HudiSplit split = new HudiSplit( + ImmutableList.of(), + hudiTableHandle.getRegularPredicates(), + partitionKeys, + hudiSplitWeightProvider.calculateSplitWeight(fileSize), + baseFile, + logFiles, + commitTime); + + return ImmutableList.of(split); + } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableFileSystemView.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableFileSystemView.java index c354aa9b14f..ba26b305b0a 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableFileSystemView.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableFileSystemView.java @@ -23,6 +23,7 @@ import io.trino.plugin.hudi.compaction.CompactionOperation; import io.trino.plugin.hudi.compaction.HudiCompactionOperation; import io.trino.plugin.hudi.compaction.HudiCompactionPlan; +import io.trino.plugin.hudi.files.FileSlice; import io.trino.plugin.hudi.files.HudiBaseFile; import io.trino.plugin.hudi.files.HudiFileGroup; import io.trino.plugin.hudi.files.HudiFileGroupId; @@ -243,6 +244,19 @@ public final Stream getLatestBaseFiles(String partitionStr) } } + public final Stream getLatestFileSlicesBeforeOn(String partitionStr, String commitTime) + { + try { + readLock.lock(); + String partitionPath = formatPartitionKey(partitionStr); + ensurePartitionLoadedCorrectly(partitionPath); + return fetchLatestFileSlicesBeforeOn(partitionPath, commitTime); + } + finally { + readLock.unlock(); + } + } + private boolean isFileGroupReplaced(String partitionPath, String fileId) { return isFileGroupReplaced(new HudiFileGroupId(partitionPath, fileId)); @@ -353,7 +367,7 @@ private List buildFileGroups( Map, List> logFiles = logFileStream .collect(groupingBy((logFile) -> { - String partitionPathStr = getRelativePartitionPath(metaClient.getBasePath(), logFile.getPath().parentDirectory()); + String partitionPathStr = getRelativePartitionPath(metaClient.getBasePath(), logFile.getLocation().parentDirectory()); return Map.entry(partitionPathStr, logFile.getFileId()); })); @@ -388,7 +402,7 @@ private List buildFileGroups( private String getPartitionPathFor(HudiBaseFile baseFile) { - return getRelativePartitionPath(metaClient.getBasePath(), baseFile.getFullPath().parentDirectory()); + return getRelativePartitionPath(metaClient.getBasePath(), baseFile.getLocation().parentDirectory()); } private String getRelativePartitionPath(Location basePath, Location fullPartitionPath) @@ -428,6 +442,13 @@ private Stream fetchLatestBaseFiles(final String partitionPath) .map(pair -> pair.getValue().get()); } + private Stream fetchLatestFileSlicesBeforeOn(final String partitionPath, String commitTime) + { + return fetchAllStoredFileGroups(partitionPath) + .filter(filGroup -> !isFileGroupReplaced(filGroup.getFileGroupId())) + .flatMap(fileGroup -> fileGroup.getAllFileSlicesBeforeOn(commitTime)); // TODO: filter file slice which pending Compaction / Clustering + } + private Stream fetchAllStoredFileGroups(String partition) { final List fileGroups = ImmutableList.copyOf(partitionToFileGroupsMap.get(partition)); diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiPartitionManager.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiPartitionManager.java index f2e8c1cca02..12816ac3454 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiPartitionManager.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiPartitionManager.java @@ -31,6 +31,7 @@ import static io.trino.plugin.hive.HiveType.HIVE_INT; import static io.trino.plugin.hive.HiveType.HIVE_STRING; import static io.trino.plugin.hive.TableType.MANAGED_TABLE; +import static io.trino.plugin.hive.metastore.MetastoreUtil.getHiveSchema; import static io.trino.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat; import static io.trino.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V1; import static io.trino.plugin.hudi.model.HudiTableType.COPY_ON_WRITE; @@ -78,7 +79,8 @@ public void testParseValuesAndFilterPartition() TABLE.getStorage().getLocation(), COPY_ON_WRITE, TupleDomain.all(), - TupleDomain.all()); + TupleDomain.all(), + getHiveSchema(TABLE)); List actualPartitions = hudiPartitionManager.getEffectivePartitions( tableHandle, metastore); From c744601d7b5ea08812ba174051312b7b894f14f5 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Sat, 24 Jun 2023 17:02:47 +0530 Subject: [PATCH 2/3] Add MOR snapshot record reader Add PageSource for HoodieRecord in Avro --- .../io/trino/plugin/hive/util/HiveUtil.java | 2 +- plugin/trino-hudi/pom.xml | 7 +- .../trino/plugin/hudi/HudiAvroPageSource.java | 276 ++++++++++++++++++ .../plugin/hudi/HudiRealtimeRecordCursor.java | 107 +++++++ .../trino/plugin/hudi/HudiRecordCursor.java | 56 +--- .../io/trino/plugin/hudi/HudiRecordSet.java | 46 +++ .../plugin/hudi/HudiRecordSetProvider.java | 39 +++ .../io/trino/plugin/hudi/HudiTypeUtil.java | 48 +++ .../hudi/query/HiveHudiRecordCursor.java | 47 --- 9 files changed, 534 insertions(+), 94 deletions(-) create mode 100644 plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiAvroPageSource.java create mode 100644 plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRealtimeRecordCursor.java create mode 100644 plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordSet.java create mode 100644 plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordSetProvider.java create mode 100644 plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTypeUtil.java delete mode 100644 plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HiveHudiRecordCursor.java diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java index 9566c25a1f4..ffd766fa1d3 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java @@ -322,7 +322,7 @@ private static void setReadColumns(Configuration configuration, List re configuration.setBoolean(READ_ALL_COLUMNS, false); } - private static void configureCompressionCodecs(JobConf jobConf) + public static void configureCompressionCodecs(JobConf jobConf) { // add Airlift LZO and LZOP to head of codecs list to not override existing entries List codecs = newArrayList(Splitter.on(",").trimResults().omitEmptyStrings().split(jobConf.get("io.compression.codecs", ""))); diff --git a/plugin/trino-hudi/pom.xml b/plugin/trino-hudi/pom.xml index 5f5e0b2ba6a..fea3e072430 100644 --- a/plugin/trino-hudi/pom.xml +++ b/plugin/trino-hudi/pom.xml @@ -15,7 +15,7 @@ ${project.parent.basedir} - 0.12.3 + 0.14.0-SNAPSHOT @@ -64,11 +64,6 @@ hive-apache - - io.airlift - aircompressor - - io.airlift bootstrap diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiAvroPageSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiAvroPageSource.java new file mode 100644 index 00000000000..bd24949d7e3 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiAvroPageSource.java @@ -0,0 +1,276 @@ +package io.trino.plugin.hudi; + +import io.airlift.log.Logger; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.spi.Page; +import io.trino.spi.PageBuilder; +import io.trino.spi.TrinoException; +import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.DecimalType; +import io.trino.spi.type.Decimals; +import io.trino.spi.type.Int128; +import io.trino.spi.type.LongTimestampWithTimeZone; +import io.trino.spi.type.RowType; +import io.trino.spi.type.Type; +import io.trino.spi.type.TypeSignatureParameter; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; +import org.apache.avro.Conversions; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.hadoop.realtime.HoodieMergeOnReadSnapshotReader; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.hudi.HudiTypeUtil.toTrinoTimestamp; +import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.Decimals.encodeShortScaledValue; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.LongTimestampWithTimeZone.fromEpochMillisAndFraction; +import static io.trino.spi.type.TimeType.TIME_MICROS; +import static io.trino.spi.type.TimeZoneKey.UTC_KEY; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS; +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; +import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND; +import static java.lang.Math.floorDiv; +import static java.lang.Math.floorMod; +import static java.lang.Math.toIntExact; +import static java.lang.String.format; + +public class HudiAvroPageSource + implements ConnectorPageSource +{ + private static final Logger log = Logger.get(HudiAvroPageSource.class); + private static final AvroDecimalConverter DECIMAL_CONVERTER = new AvroDecimalConverter(); + + private final HudiSplit hudiSplit; + private final Schema schema; + private final Iterator records; + private final HoodieMergeOnReadSnapshotReader snapshotReader; + private final PageBuilder pageBuilder; + private final List columnNames; + private final List columnTypes; + private final AtomicLong readBytes; + + public HudiAvroPageSource( + HudiSplit hudiSplit, + Schema schema, + Iterator records, + HoodieMergeOnReadSnapshotReader snapshotReader, + List columnNames, + List columnTypes) + { + this.hudiSplit = hudiSplit; + this.schema = schema; + this.records = records; + this.snapshotReader = snapshotReader; + this.columnNames = columnNames; + this.columnTypes = columnTypes; + this.pageBuilder = new PageBuilder(columnTypes); + this.readBytes = new AtomicLong(); + } + + @Override + public long getCompletedBytes() + { + return readBytes.get(); + } + + @Override + public long getReadTimeNanos() + { + return 0; + } + + @Override + public boolean isFinished() + { + return !records.hasNext(); + } + + @Override + public Page getNextPage() + { + checkState(pageBuilder.isEmpty(), "PageBuilder is not empty at the beginning of a new page"); + if (!records.hasNext()) { + return null; + } + while (records.hasNext()) { + HoodieRecord record = records.next(); + pageBuilder.declarePosition(); + for (int column = 0; column < columnTypes.size(); column++) { + BlockBuilder output = pageBuilder.getBlockBuilder(column); + appendTo(columnTypes.get(column), record.getData(), output); + } + } + return null; + } + + private void appendTo(Type type, Object value, BlockBuilder output) + { + if (value == null) { + output.appendNull(); + return; + } + + Class javaType = type.getJavaType(); + try { + if (javaType == boolean.class) { + type.writeBoolean(output, (Boolean) value); + } + else if (javaType == long.class) { + if (type.equals(BIGINT)) { + type.writeLong(output, ((Number) value).longValue()); + } + else if (type.equals(INTEGER)) { + type.writeLong(output, ((Number) value).intValue()); + } + else if (type instanceof DecimalType decimalType) { + verify(decimalType.isShort(), "The type should be short decimal"); + BigDecimal decimal = DECIMAL_CONVERTER.convert(decimalType.getPrecision(), decimalType.getScale(), value); + type.writeLong(output, encodeShortScaledValue(decimal, decimalType.getScale())); + } + else if (type.equals(DATE)) { + type.writeLong(output, ((Number) value).intValue()); + } + else if (type.equals(TIMESTAMP_MICROS)) { + type.writeLong(output, toTrinoTimestamp(((Utf8) value).toString())); + } + else if (type.equals(TIME_MICROS)) { + type.writeLong(output, (long) value * PICOSECONDS_PER_MICROSECOND); + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type)); + } + } + else if (javaType == double.class) { + type.writeDouble(output, ((Number) value).doubleValue()); + } + else if (type.getJavaType() == Int128.class) { + writeObject(output, type, value); + } + else if (javaType == Slice.class) { + writeSlice(output, type, value); + } + else if (javaType == LongTimestampWithTimeZone.class) { + verify(type.equals(TIMESTAMP_TZ_MICROS)); + long epochMicros = (long) value; + int picosOfMillis = toIntExact(floorMod(epochMicros, MICROSECONDS_PER_MILLISECOND)) * PICOSECONDS_PER_MICROSECOND; + type.writeObject(output, fromEpochMillisAndFraction(floorDiv(epochMicros, MICROSECONDS_PER_MILLISECOND), picosOfMillis, UTC_KEY)); + } + else if (javaType == Block.class) { + writeBlock(output, type, value); + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type)); + } + } + catch (ClassCastException ignore) { + // returns null instead of raising exception + output.appendNull(); + } + } + + private static void writeSlice(BlockBuilder output, Type type, Object value) + { + if (type instanceof VarcharType) { + type.writeSlice(output, utf8Slice(((Utf8) value).toString())); + } + else if (type instanceof VarbinaryType) { + if (value instanceof ByteBuffer) { + type.writeSlice(output, Slices.wrappedBuffer((ByteBuffer) value)); + } + else { + output.appendNull(); + } + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Slice: " + type.getTypeSignature()); + } + } + + private static void writeObject(BlockBuilder output, Type type, Object value) + { + if (type instanceof DecimalType decimalType) { + verify(!decimalType.isShort(), "The type should be long decimal"); + BigDecimal decimal = DECIMAL_CONVERTER.convert(decimalType.getPrecision(), decimalType.getScale(), value); + type.writeObject(output, Decimals.encodeScaledValue(decimal, decimalType.getScale())); + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Object: " + type.getTypeSignature()); + } + } + + private void writeBlock(BlockBuilder output, Type type, Object value) + { + if (type instanceof ArrayType && value instanceof List) { + BlockBuilder builder = output.beginBlockEntry(); + + for (Object element : (List) value) { + appendTo(type.getTypeParameters().get(0), element, builder); + } + + output.closeEntry(); + return; + } + if (type instanceof RowType && value instanceof GenericRecord record) { + BlockBuilder builder = output.beginBlockEntry(); + + List fieldNames = new ArrayList<>(); + for (int i = 0; i < type.getTypeSignature().getParameters().size(); i++) { + TypeSignatureParameter parameter = type.getTypeSignature().getParameters().get(i); + fieldNames.add(parameter.getNamedTypeSignature().getName().orElse("field" + i)); + } + checkState(fieldNames.size() == type.getTypeParameters().size(), "fieldName doesn't match with type size : %s", type); + for (int index = 0; index < type.getTypeParameters().size(); index++) { + appendTo(type.getTypeParameters().get(index), record.get(fieldNames.get(index)), builder); + } + output.closeEntry(); + return; + } + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature()); + } + + @Override + public long getMemoryUsage() + { + return pageBuilder.getSizeInBytes(); + } + + @Override + public void close() + throws IOException + { + + } + + static class AvroDecimalConverter + { + private static final Conversions.DecimalConversion AVRO_DECIMAL_CONVERSION = new Conversions.DecimalConversion(); + + BigDecimal convert(int precision, int scale, Object value) + { + Schema schema = new Schema.Parser().parse(format("{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":%d,\"scale\":%d}", precision, scale)); + return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer) value, schema, schema.getLogicalType()); + } + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRealtimeRecordCursor.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRealtimeRecordCursor.java new file mode 100644 index 00000000000..31a0fc190e4 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRealtimeRecordCursor.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi; + +import io.airlift.slice.Slice; +import io.trino.spi.connector.RecordCursor; +import io.trino.spi.type.Type; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.hadoop.realtime.HoodieMergeOnReadSnapshotReader; +import org.apache.hudi.io.storage.HoodieFileReader; + +public class HudiRealtimeRecordCursor + implements RecordCursor +{ + private final HoodieMergedLogRecordScanner logRecordScanner; + private final HoodieFileReader baseFileReader; + private final HoodieMergeOnReadSnapshotReader snapshotReader; + + public HudiRealtimeRecordCursor( + HoodieMergedLogRecordScanner logRecordScanner, + HoodieFileReader baseFileReader, + HoodieMergeOnReadSnapshotReader snapshotReader) + { + this.logRecordScanner = logRecordScanner; + this.baseFileReader = baseFileReader; + this.snapshotReader = snapshotReader; + } + + @Override + public long getCompletedBytes() + { + return 0; + } + + @Override + public long getReadTimeNanos() + { + return 0; + } + + @Override + public Type getType(int field) + { + return null; + } + + @Override + public boolean advanceNextPosition() + { + return false; + } + + @Override + public boolean getBoolean(int field) + { + return false; + } + + @Override + public long getLong(int field) + { + return 0; + } + + @Override + public double getDouble(int field) + { + return 0; + } + + @Override + public Slice getSlice(int field) + { + return null; + } + + @Override + public Object getObject(int field) + { + return null; + } + + @Override + public boolean isNull(int field) + { + return false; + } + + @Override + public void close() + { + if (logRecordScanner != null) { + logRecordScanner.close(); + } + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordCursor.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordCursor.java index 3bb61163739..1184647a6f9 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordCursor.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordCursor.java @@ -14,13 +14,10 @@ package io.trino.plugin.hudi; import com.google.common.base.Joiner; -import com.google.common.base.Splitter; -import io.airlift.compress.lzo.LzoCodec; -import io.airlift.compress.lzo.LzopCodec; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; +import io.trino.plugin.hive.GenericHiveRecordCursor; import io.trino.plugin.hive.HiveColumnHandle; -import io.trino.plugin.hive.util.HiveUtil; import io.trino.plugin.hudi.files.HudiFile; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; @@ -45,10 +42,10 @@ import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.Lists.newArrayList; +import static io.trino.plugin.hive.util.HiveUtil.configureCompressionCodecs; +import static io.trino.plugin.hive.util.HiveUtil.getInputFormatName; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT; import static io.trino.plugin.hudi.HudiUtil.getHudiBaseFile; -import static io.trino.plugin.hudi.query.HiveHudiRecordCursor.createRecordCursor; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; @@ -76,7 +73,13 @@ public static RecordCursor createRealtimeRecordCursor( return hdfsEnvironment.doAs(session.getIdentity(), () -> { RecordReader recordReader = createRecordReader(configuration, tableHandle.getSchema(), split, dataColumns, tableHandle.getBasePath()); @SuppressWarnings("unchecked") RecordReader reader = (RecordReader) recordReader; - return createRecordCursor(configuration, path, reader, baseFile.getFileSize(), tableHandle.getSchema(), dataColumns); + return new GenericHiveRecordCursor<>( + configuration, + path, + reader, + baseFile.getFileSize(), + tableHandle.getSchema(), + dataColumns); }); } @@ -94,13 +97,9 @@ public static RecordCursor createRealtimeRecordCursor( jobConf.set(READ_COLUMN_NAMES_CONF_STR, join(dataColumns, HiveColumnHandle::getName)); schema.stringPropertyNames() .forEach(name -> jobConf.set(name, schema.getProperty(name))); - refineCompressionCodecs(jobConf); - - // create input format - String inputFormatName = HiveUtil.getInputFormatName(schema); - InputFormat inputFormat = createInputFormat(jobConf, inputFormatName); - + configureCompressionCodecs(jobConf); // create record reader for split + String inputFormatName = getInputFormatName(schema); try { HudiFile baseFile = getHudiBaseFile(split); Path path = new Path(baseFile.getLocation().toString()); @@ -109,9 +108,12 @@ public static RecordCursor createRealtimeRecordCursor( .stream() .map(file -> new HoodieLogFile(file.getLocation().toString())).collect(toList()); FileSplit hudiSplit = new HoodieRealtimeFileSplit(fileSplit, basePath, logFiles, split.getCommitTime(), false, Option.empty()); + Class clazz = jobConf.getClassByName(inputFormatName); + @SuppressWarnings("unchecked") Class> cls = (Class>) clazz.asSubclass(InputFormat.class); + InputFormat inputFormat = ReflectionUtils.newInstance(cls, jobConf); return inputFormat.getRecordReader(hudiSplit, jobConf, Reporter.NULL); } - catch (IOException e) { + catch (IOException | ClassNotFoundException | RuntimeException e) { String msg = format("Error opening Hive split %s using %s: %s", split, inputFormatName, @@ -120,32 +122,6 @@ public static RecordCursor createRealtimeRecordCursor( } } - private static InputFormat createInputFormat(Configuration conf, String inputFormat) - { - try { - Class clazz = conf.getClassByName(inputFormat); - @SuppressWarnings("unchecked") Class> cls = - (Class>) clazz.asSubclass(InputFormat.class); - return ReflectionUtils.newInstance(cls, conf); - } - catch (ClassNotFoundException | RuntimeException e) { - throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Unable to create input format " + inputFormat, e); - } - } - - private static void refineCompressionCodecs(Configuration conf) - { - List codecs = newArrayList(Splitter.on(",").trimResults().omitEmptyStrings() - .split(conf.get("io.compression.codecs", ""))); - if (!codecs.contains(LzoCodec.class.getName())) { - codecs.add(0, LzoCodec.class.getName()); - } - if (!codecs.contains(LzopCodec.class.getName())) { - codecs.add(0, LzopCodec.class.getName()); - } - conf.set("io.compression.codecs", String.join(",", codecs)); - } - private static String join(List list, Function extractor) { return Joiner.on(',').join(list.stream().map(extractor).iterator()); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordSet.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordSet.java new file mode 100644 index 00000000000..2d53401d071 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordSet.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi; + +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.spi.connector.RecordCursor; +import io.trino.spi.connector.RecordSet; +import io.trino.spi.type.Type; + +import java.util.List; + +import static com.google.common.collect.ImmutableList.toImmutableList; + +public class HudiRecordSet + implements RecordSet +{ + private final List columnHandles; + + public HudiRecordSet(HudiSplit split, List columnHandles) + { + this.columnHandles = columnHandles; + } + + @Override + public List getColumnTypes() + { + return columnHandles.stream().map(HiveColumnHandle::getType).collect(toImmutableList()); + } + + @Override + public RecordCursor cursor() + { + return null; + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordSetProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordSetProvider.java new file mode 100644 index 00000000000..75d7e2873d1 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiRecordSetProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi; + +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ConnectorRecordSetProvider; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.RecordSet; + +import java.util.List; + +public class HudiRecordSetProvider + implements ConnectorRecordSetProvider +{ + @Override + public RecordSet getRecordSet( + ConnectorTransactionHandle transaction, + ConnectorSession session, + ConnectorSplit split, + ConnectorTableHandle table, + List columns) + { + return ConnectorRecordSetProvider.super.getRecordSet(transaction, session, split, table, columns); + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTypeUtil.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTypeUtil.java new file mode 100644 index 00000000000..e0b5aa28ab9 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTypeUtil.java @@ -0,0 +1,48 @@ +package io.trino.plugin.hudi; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND; +import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND; +import static java.lang.Integer.parseInt; +import static java.time.ZoneOffset.UTC; + +public final class HudiTypeUtil +{ + private HudiTypeUtil() {} + + private static final int[] NANO_FACTOR = { + -1, // 0, no need to multiply + 100_000_000, // 1 digit after the dot + 10_000_000, // 2 digits after the dot + 1_000_000, // 3 digits after the dot + 100_000, // 4 digits after the dot + 10_000, // 5 digits after the dot + 1000, // 6 digits after the dot + 100, // 7 digits after the dot + 10, // 8 digits after the dot + 1, // 9 digits after the dot + }; + + public static long toTrinoTimestamp(String datetime) + { + Instant instant = toLocalDateTime(datetime).toInstant(UTC); + return (instant.getEpochSecond() * MICROSECONDS_PER_SECOND) + (instant.getNano() / NANOSECONDS_PER_MICROSECOND); + } + + private static LocalDateTime toLocalDateTime(String datetime) + { + int dotPosition = datetime.indexOf('.'); + if (dotPosition == -1) { + // no sub-second element + return LocalDateTime.from(DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(datetime)); + } + LocalDateTime result = LocalDateTime.from(DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(datetime.substring(0, dotPosition))); + // has sub-second element, so convert to nanosecond + String nanosStr = datetime.substring(dotPosition + 1); + int nanoOfSecond = parseInt(nanosStr) * NANO_FACTOR[nanosStr.length()]; + return result.withNano(nanoOfSecond); + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HiveHudiRecordCursor.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HiveHudiRecordCursor.java deleted file mode 100644 index fdc2aecdc48..00000000000 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HiveHudiRecordCursor.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.hudi.query; - -import io.trino.plugin.hive.GenericHiveRecordCursor; -import io.trino.plugin.hive.HiveColumnHandle; -import io.trino.spi.connector.RecordCursor; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.RecordReader; - -import java.util.List; -import java.util.Properties; - -public final class HiveHudiRecordCursor -{ - private HiveHudiRecordCursor() {} - - public static RecordCursor createRecordCursor( - Configuration configuration, - Path path, - RecordReader recordReader, - long totalBytes, - Properties hiveSchema, - List hiveColumnHandles) - { - return new GenericHiveRecordCursor<>( - configuration, - path, - recordReader, - totalBytes, - hiveSchema, - hiveColumnHandles); - } -} From d0ff3cac51acecadae2ce1fd6fe31a5586df3748 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Tue, 27 Jun 2023 22:16:28 +0530 Subject: [PATCH 3/3] Add missing license header Fix NPE --- .../io/trino/plugin/hudi/HudiAvroPageSource.java | 15 ++++++++++++++- .../java/io/trino/plugin/hudi/HudiTypeUtil.java | 13 +++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiAvroPageSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiAvroPageSource.java index bd24949d7e3..a023fc4c0ec 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiAvroPageSource.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiAvroPageSource.java @@ -1,3 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.trino.plugin.hudi; import io.airlift.log.Logger; @@ -122,7 +135,7 @@ public Page getNextPage() appendTo(columnTypes.get(column), record.getData(), output); } } - return null; + return pageBuilder.build(); } private void appendTo(Type type, Object value, BlockBuilder output) diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTypeUtil.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTypeUtil.java index e0b5aa28ab9..af7b68883bf 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTypeUtil.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTypeUtil.java @@ -1,3 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.trino.plugin.hudi; import java.time.Instant;