Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwenchi committed Dec 4, 2024
1 parent 04e58bc commit 201ca19
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public LocationPath(String location, Map<String, String> props) {
private LocationPath(String originLocation, Map<String, String> props, boolean convertPath) {
isBindBroker = props.containsKey(HMSExternalCatalog.BIND_BROKER_NAME);
String tmpLocation = originLocation;
if (!originLocation.contains(SCHEME_DELIM)) {
if (!(originLocation.contains(SCHEME_DELIM) || originLocation.contains(NONSTANDARD_SCHEME_DELIM))) {
// Sometimes the file path does not contain scheme, need to add default fs
// eg, /path/to/file.parquet -> hdfs://nn/path/to/file.parquet
// the default fs is from the catalog properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
Expand Down Expand Up @@ -70,6 +71,7 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -114,6 +116,10 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI

private static final String USE_HIVE_SYNC_PARTITION = "use_hive_sync_partition";

private HoodieTableMetaClient hudiClient = null;
private final byte[] hudiClientLock = new byte[0];


static {
SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet();
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat");
Expand Down Expand Up @@ -501,34 +507,38 @@ public long getLastDdlTime() {
@Override
public Optional<SchemaCacheValue> initSchema() {
makeSureInitialized();
List<Column> columns;
if (dlaType.equals(DLAType.ICEBERG)) {
columns = getIcebergSchema();
return getIcebergSchema();
} else if (dlaType.equals(DLAType.HUDI)) {
columns = getHudiSchema();
return getHudiSchema();
} else {
columns = getHiveSchema();
return getHiveSchema();
}
List<Column> partitionColumns = initPartitionColumns(columns);
return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}

private List<Column> getIcebergSchema() {
return IcebergUtils.getSchema(catalog, dbName, name);
private Optional<SchemaCacheValue> getIcebergSchema() {
List<Column> columns = IcebergUtils.getSchema(catalog, dbName, name);
List<Column> partitionColumns = initPartitionColumns(columns);
return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}

private List<Column> getHudiSchema() {
private Optional<SchemaCacheValue> getHudiSchema() {
org.apache.avro.Schema hudiSchema = HiveMetaStoreClientHelper.getHudiTableSchema(this);
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hudiSchema.getFields().size());
List<String> colTypes = Lists.newArrayList();
for (org.apache.avro.Schema.Field hudiField : hudiSchema.getFields()) {
String columnName = hudiField.name().toLowerCase(Locale.ROOT);
tmpSchema.add(new Column(columnName, HudiUtils.fromAvroHudiTypeToDorisType(hudiField.schema()),
true, null, true, null, "", true, null, -1, null));
colTypes.add(HudiUtils.convertAvroToHiveType(hudiField.schema()));
}
return tmpSchema;
List<Column> partitionColumns = initPartitionColumns(tmpSchema);
HudiSchemaCacheValue hudiSchemaCacheValue = new HudiSchemaCacheValue(tmpSchema, partitionColumns);
hudiSchemaCacheValue.setColTypes(colTypes);
return Optional.of(hudiSchemaCacheValue);
}

private List<Column> getHiveSchema() {
private Optional<SchemaCacheValue> getHiveSchema() {
HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient();
List<FieldSchema> schema = client.getSchema(dbName, name);
Map<String, String> colDefaultValues = client.getDefaultColumnValues(dbName, name);
Expand All @@ -540,7 +550,8 @@ private List<Column> getHiveSchema() {
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, defaultValue, field.getComment(), true, -1));
}
return columns;
List<Column> partitionColumns = initPartitionColumns(columns);
return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}

@Override
Expand Down Expand Up @@ -983,4 +994,18 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
Env.getCurrentEnv().getRefreshManager()
.refreshTable(getCatalog().getName(), getDbName(), getName(), true);
}

public HoodieTableMetaClient getHudiClient() {
if (hudiClient != null) {
return hudiClient;
}
synchronized (hudiClientLock) {
if (hudiClient != null) {
return hudiClient;
}
hudiClient = HudiUtils.buildHudiTableMetaClient(
getRemoteTable().getSd().getLocation(), catalog.getConfiguration());
return hudiClient;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -830,12 +829,7 @@ public static <T> T ugiDoAs(Configuration conf, PrivilegedExceptionAction<T> act
}

public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) {
String hudiBasePath = table.getRemoteTable().getSd().getLocation();
Configuration conf = getConfiguration(table);
HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf);
return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf),
() -> HoodieTableMetaClient.builder().setConf(hadoopStorageConfiguration).setBasePath(hudiBasePath)
.build());
return table.getHudiClient();
}

public static Configuration getConfiguration(HMSExternalTable table) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hudi;

import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.hive.HMSSchemaCacheValue;

import java.util.List;

public class HudiSchemaCacheValue extends HMSSchemaCacheValue {

private List<String> colTypes;

public HudiSchemaCacheValue(List<Column> schema, List<Column> partitionColumns) {
super(schema, partitionColumns);
}

public List<String> getColTypes() {
return colTypes;
}

public void setColTypes(List<String> colTypes) {
this.colTypes = colTypes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopUGI;

import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;

import java.text.ParseException;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -231,4 +236,11 @@ private static Type handleUnionType(Schema avroSchema) {
}
return Type.UNSUPPORTED;
}

public static HoodieTableMetaClient buildHudiTableMetaClient(String hudiBasePath, Configuration conf) {
HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf);
return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf),
() -> HoodieTableMetaClient.builder()
.setConf(hadoopStorageConfiguration).setBasePath(hudiBasePath).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -83,7 +84,10 @@ public void cleanTablePartitions(String dbName, String tblName) {
}

public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table,
HoodieTableMetaClient tableMetaClient, String timestamp, boolean useHiveSyncPartition) {
HoodieTableMetaClient tableMetaClient,
String timestamp,
boolean useHiveSyncPartition,
HoodieTableMetadata hoodieTableMetadata) {
Preconditions.checkState(catalogId == table.getCatalog().getId());
Option<String[]> partitionColumns = tableMetaClient.getTableConfig().getPartitionFields();
if (!partitionColumns.isPresent()) {
Expand All @@ -96,7 +100,7 @@ public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table,
}
long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp());
if (Long.parseLong(timestamp) == lastTimestamp) {
return getPartitionValues(table, tableMetaClient, useHiveSyncPartition);
return getPartitionValues(table, tableMetaClient, useHiveSyncPartition, hoodieTableMetadata);
}
List<String> partitionNameAndValues = getPartitionNamesBeforeOrEquals(timeline, timestamp);
List<String> partitionNames = Arrays.asList(partitionColumns.get());
Expand All @@ -108,7 +112,8 @@ public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table,
}

public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient,
boolean useHiveSyncPartition)
boolean useHiveSyncPartition,
HoodieTableMetadata hoodieTableMetadata)
throws CacheException {
Preconditions.checkState(catalogId == table.getCatalog().getId());
Option<String[]> partitionColumns = tableMetaClient.getTableConfig().getPartitionFields();
Expand Down Expand Up @@ -150,10 +155,10 @@ public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTab
partitionNames = catalog.getClient().listPartitionNames(table.getDbName(), table.getName());
if (partitionNames.size() == 0) {
LOG.warn("Failed to get partitions from hms api, switch it from hudi api.");
partitionNames = getAllPartitionNames(tableMetaClient);
partitionNames = getAllPartitionNames(hoodieTableMetadata);
}
} else {
partitionNames = getAllPartitionNames(tableMetaClient);
partitionNames = getAllPartitionNames(hoodieTableMetadata);
}
List<String> partitionColumnsList = Arrays.asList(partitionColumns.get());
partitionValues.cleanPartitions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.datasource.hudi.source;

import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
Expand All @@ -42,17 +41,8 @@ public String[] getPartitionColumns(HoodieTableMetaClient tableMetaClient) {
return tableMetaClient.getTableConfig().getPartitionFields().get();
}

public List<String> getAllPartitionNames(HoodieTableMetaClient tableMetaClient) throws IOException {
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.enable(HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient))
.build();

HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(
new HudiLocalEngineContext(tableMetaClient.getStorageConf()), tableMetaClient.getStorage(),
metadataConfig,
tableMetaClient.getBasePathV2().toString(), true);

return newTableMetadata.getAllPartitionPaths();
public List<String> getAllPartitionNames(HoodieTableMetadata hoodieTableMetadata) throws IOException {
return hoodieTableMetadata.getAllPartitionPaths();
}

public List<String> getPartitionNamesBeforeOrEquals(HoodieTimeline timeline, String timestamp) {
Expand Down
Loading

0 comments on commit 201ca19

Please sign in to comment.