diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java index 314b50d7072b..a67e363f4a55 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java @@ -18,45 +18,20 @@ package org.apache.paimon.hive.mapred; -import org.apache.paimon.fs.Path; import org.apache.paimon.hive.RowDataContainer; -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.table.source.InnerTableScan; -import org.apache.paimon.table.source.ReadBuilder; -import org.apache.paimon.tag.TagPreview; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.PartitionPathUtils; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.io.IOConstants; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; - -import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import static java.util.Collections.singletonMap; -import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME; +import static org.apache.paimon.hive.mapred.PaimonRecordReader.createRecordReader; +import static org.apache.paimon.hive.utils.HiveSplitGenerator.generateSplits; import static org.apache.paimon.hive.utils.HiveUtils.createFileStoreTable; -import static org.apache.paimon.hive.utils.HiveUtils.createPredicate; /** * {@link InputFormat} for paimon. It divides all files into {@link InputSplit}s (one split per @@ -67,79 +42,7 @@ public class PaimonInputFormat implements InputFormat { @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) { FileStoreTable table = createFileStoreTable(jobConf); - List predicates = new ArrayList<>(); - createPredicate(table.schema(), jobConf, false).ifPresent(predicates::add); - - // If the path of the Paimon table is moved from the location of the Hive table to - // properties (see HiveCatalogOptions#LOCATION_IN_PROPERTIES), Hive will add a location for - // this table based on the warehouse, database, and table automatically. When querying by - // Hive, an exception may occur because the specified path for split for Paimon may not - // match the location of Hive. To work around this problem, we specify the path for split as - // the location of Hive. - String locations = jobConf.get(FileInputFormat.INPUT_DIR); - - @Nullable String tagToPartField = table.coreOptions().tagToPartitionField(); - @Nullable TagPreview tagPreview = TagPreview.create(table.coreOptions()); - - List splits = new ArrayList<>(); - // locations may contain multiple partitions - for (String location : locations.split(",")) { - InnerTableScan scan; - if (tagToPartField != null) { - // the location should be pruned by partition predicate - // we can extract tag name from location, and use time travel to scan - String tag = extractTagName(location, tagToPartField); - Map dynamicOptions = - tagPreview == null - ? singletonMap(SCAN_TAG_NAME.key(), tag) - : tagPreview.timeTravel(table, tag); - scan = table.copy(dynamicOptions).newScan(); - if (predicates.size() > 0) { - scan.withFilter(PredicateBuilder.and(predicates)); - } - } else { - List predicatePerPartition = new ArrayList<>(predicates); - createPartitionPredicate( - table.schema().logicalRowType(), - table.schema().partitionKeys(), - location) - .ifPresent(predicatePerPartition::add); - - scan = table.newScan(); - if (predicatePerPartition.size() > 0) { - scan.withFilter(PredicateBuilder.and(predicatePerPartition)); - } - } - scan.plan() - .splits() - .forEach( - split -> splits.add(new PaimonInputSplit(location, (DataSplit) split))); - } - return splits.toArray(new InputSplit[0]); - } - - private Optional createPartitionPredicate( - RowType rowType, List partitionKeys, String partitionDir) { - Set partitionKeySet = new HashSet<>(partitionKeys); - LinkedHashMap partition = new LinkedHashMap<>(); - for (String s : partitionDir.split("/")) { - s = s.trim(); - if (s.isEmpty()) { - continue; - } - String[] kv = s.split("="); - if (kv.length != 2) { - continue; - } - if (partitionKeySet.contains(kv[0])) { - partition.put(kv[0], kv[1]); - } - } - if (partition.isEmpty() || partition.size() != partitionKeys.size()) { - return Optional.empty(); - } else { - return Optional.ofNullable(PredicateBuilder.partition(partition, rowType)); - } + return generateSplits(table, jobConf); } @Override @@ -147,47 +50,6 @@ public RecordReader getRecordReader( InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { FileStoreTable table = createFileStoreTable(jobConf); PaimonInputSplit split = (PaimonInputSplit) inputSplit; - ReadBuilder readBuilder = table.newReadBuilder(); - createPredicate(table.schema(), jobConf, true).ifPresent(readBuilder::withFilter); - List paimonColumns = table.schema().fieldNames(); - return new PaimonRecordReader( - readBuilder, - split, - paimonColumns, - getHiveColumns(jobConf).orElse(paimonColumns), - Arrays.asList(getSelectedColumns(jobConf)), - table.coreOptions().tagToPartitionField()); - } - - private Optional> getHiveColumns(JobConf jobConf) { - String columns = jobConf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS); - if (columns == null) { - columns = jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS); - } - String delimiter = - jobConf.get( - // serdeConstants.COLUMN_NAME_DELIMITER is not defined in earlier Hive - // versions, so we use a constant string instead - "column.name.delimite", String.valueOf(SerDeUtils.COMMA)); - if (columns == null || delimiter == null) { - return Optional.empty(); - } else { - return Optional.of(Arrays.asList(columns.split(delimiter))); - } - } - - private String[] getSelectedColumns(JobConf jobConf) { - // when using tez engine or when same table is joined multiple times, - // it is possible that some selected columns are duplicated - return Arrays.stream(ColumnProjectionUtils.getReadColumnNames(jobConf)) - .distinct() - .toArray(String[]::new); - } - - /** Extract tag name from location, partition field should be tag name. */ - public static String extractTagName(String location, String tagToPartField) { - LinkedHashMap partSpec = - PartitionPathUtils.extractPartitionSpecFromPath(new Path(location)); - return partSpec.get(tagToPartField); + return createRecordReader(table, split, jobConf); } } diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java index 64de2c048048..6e1660d0ca7e 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java @@ -25,20 +25,31 @@ import org.apache.paimon.data.JoinedRow; import org.apache.paimon.hive.RowDataContainer; import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.utils.ProjectedRow; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.paimon.hive.utils.HiveUtils.createPredicate; +import static org.apache.paimon.hive.utils.HiveUtils.extractTagName; + /** * Base {@link RecordReader} for paimon. Reads {@link KeyValue}s from data files and picks out * {@link InternalRow} for Hive to consume. @@ -101,8 +112,7 @@ public PaimonRecordReader( if (tagToPartField != null) { // in case of reading partition field // add last field (partition field from tag name) to row - String tag = - PaimonInputFormat.extractTagName(split.getPath().getName(), tagToPartField); + String tag = extractTagName(split.getPath().getName(), tagToPartField); addTagToPartFieldRow = new JoinedRow(); addTagToPartFieldRow.replace(null, GenericRow.of(BinaryString.fromString(tag))); } else { @@ -162,4 +172,43 @@ public float getProgress() throws IOException { // TODO make this more precise return progress; } + + public static RecordReader createRecordReader( + FileStoreTable table, PaimonInputSplit split, JobConf jobConf) throws IOException { + ReadBuilder readBuilder = table.newReadBuilder(); + createPredicate(table.schema(), jobConf, true).ifPresent(readBuilder::withFilter); + List paimonColumns = table.schema().fieldNames(); + return new PaimonRecordReader( + readBuilder, + split, + paimonColumns, + getHiveColumns(jobConf).orElse(paimonColumns), + Arrays.asList(getSelectedColumns(jobConf)), + table.coreOptions().tagToPartitionField()); + } + + private static Optional> getHiveColumns(JobConf jobConf) { + String columns = jobConf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS); + if (columns == null) { + columns = jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS); + } + String delimiter = + jobConf.get( + // serdeConstants.COLUMN_NAME_DELIMITER is not defined in earlier Hive + // versions, so we use a constant string instead + "column.name.delimite", String.valueOf(SerDeUtils.COMMA)); + if (columns == null || delimiter == null) { + return Optional.empty(); + } else { + return Optional.of(Arrays.asList(columns.split(delimiter))); + } + } + + private static String[] getSelectedColumns(JobConf jobConf) { + // when using tez engine or when same table is joined multiple times, + // it is possible that some selected columns are duplicated + return Arrays.stream(ColumnProjectionUtils.getReadColumnNames(jobConf)) + .distinct() + .toArray(String[]::new); + } } diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java new file mode 100644 index 000000000000..32b1e4745d41 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive.utils; + +import org.apache.paimon.hive.mapred.PaimonInputSplit; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.tag.TagPreview; +import org.apache.paimon.types.RowType; + +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.util.Collections.singletonMap; +import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME; +import static org.apache.paimon.hive.utils.HiveUtils.createPredicate; +import static org.apache.paimon.hive.utils.HiveUtils.extractTagName; + +/** Generator to generate hive input splits. */ +public class HiveSplitGenerator { + + public static InputSplit[] generateSplits(FileStoreTable table, JobConf jobConf) { + List predicates = new ArrayList<>(); + createPredicate(table.schema(), jobConf, false).ifPresent(predicates::add); + + // If the path of the Paimon table is moved from the location of the Hive table to + // properties (see HiveCatalogOptions#LOCATION_IN_PROPERTIES), Hive will add a location for + // this table based on the warehouse, database, and table automatically. When querying by + // Hive, an exception may occur because the specified path for split for Paimon may not + // match the location of Hive. To work around this problem, we specify the path for split as + // the location of Hive. + String locations = jobConf.get(FileInputFormat.INPUT_DIR); + + @Nullable String tagToPartField = table.coreOptions().tagToPartitionField(); + @Nullable TagPreview tagPreview = TagPreview.create(table.coreOptions()); + + List splits = new ArrayList<>(); + // locations may contain multiple partitions + for (String location : locations.split(",")) { + InnerTableScan scan; + if (tagToPartField != null) { + // the location should be pruned by partition predicate + // we can extract tag name from location, and use time travel to scan + String tag = extractTagName(location, tagToPartField); + Map dynamicOptions = + tagPreview == null + ? singletonMap(SCAN_TAG_NAME.key(), tag) + : tagPreview.timeTravel(table, tag); + scan = table.copy(dynamicOptions).newScan(); + if (predicates.size() > 0) { + scan.withFilter(PredicateBuilder.and(predicates)); + } + } else { + List predicatePerPartition = new ArrayList<>(predicates); + createPartitionPredicate( + table.schema().logicalRowType(), + table.schema().partitionKeys(), + location) + .ifPresent(predicatePerPartition::add); + + scan = table.newScan(); + if (predicatePerPartition.size() > 0) { + scan.withFilter(PredicateBuilder.and(predicatePerPartition)); + } + } + scan.plan() + .splits() + .forEach( + split -> splits.add(new PaimonInputSplit(location, (DataSplit) split))); + } + return splits.toArray(new InputSplit[0]); + } + + private static Optional createPartitionPredicate( + RowType rowType, List partitionKeys, String partitionDir) { + Set partitionKeySet = new HashSet<>(partitionKeys); + LinkedHashMap partition = new LinkedHashMap<>(); + for (String s : partitionDir.split("/")) { + s = s.trim(); + if (s.isEmpty()) { + continue; + } + String[] kv = s.split("="); + if (kv.length != 2) { + continue; + } + if (partitionKeySet.contains(kv[0])) { + partition.put(kv[0], kv[1]); + } + } + if (partition.isEmpty() || partition.size() != partitionKeys.size()) { + return Optional.empty(); + } else { + return Optional.ofNullable(PredicateBuilder.partition(partition, rowType)); + } + } +} diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java index e2959ea6a1a0..2b843fee824b 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.Path; import org.apache.paimon.hive.LocationKeyExtractor; import org.apache.paimon.hive.SearchArgumentToPredicateConverter; import org.apache.paimon.options.Options; @@ -27,6 +28,7 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.utils.PartitionPathUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; @@ -37,6 +39,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -95,4 +98,11 @@ public static Options extractCatalogConfig(Configuration hiveConf) { hiveConf, PAIMON_PREFIX, v -> !"NULL".equalsIgnoreCase(v)); return Options.fromMap(configMap); } + + /** Extract tag name from location, partition field should be tag name. */ + public static String extractTagName(String location, String tagToPartField) { + LinkedHashMap partSpec = + PartitionPathUtils.extractPartitionSpecFromPath(new Path(location)); + return partSpec.get(tagToPartField); + } }