Skip to content

Commit

Permalink
Hive: Add View support for HIVE catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
nk1506 committed Mar 15, 2024
1 parent 5c27843 commit a9c09c9
Show file tree
Hide file tree
Showing 17 changed files with 1,174 additions and 375 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.exceptions;

import com.google.errorprone.annotations.FormatMethod;

/** NoSuchIcebergViewException thrown when a view is found, but it is not an Iceberg view. */
public class NoSuchIcebergViewException extends NoSuchViewException {
@FormatMethod
public NoSuchIcebergViewException(String message, Object... args) {
super(message, args);
}

@FormatMethod
public static void check(boolean test, String message, Object... args) {
if (!test) {
throw new NoSuchIcebergViewException(message, args);
}
}
}
53 changes: 53 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.io.Serializable;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.iceberg.util.PropertyUtil;

/** A base class for {@link TableMetadata} and {@link org.apache.iceberg.view.ViewMetadata} */
public interface BaseMetadata extends Serializable {

String location();

Map<String, String> properties();

@Nullable
String metadataFileLocation();

Schema schema();

default String property(String property, String defaultValue) {
return properties().getOrDefault(property, defaultValue);
}

default boolean propertyAsBoolean(String property, boolean defaultValue) {
return PropertyUtil.propertyAsBoolean(properties(), property, defaultValue);
}

default int propertyAsInt(String property, int defaultValue) {
return PropertyUtil.propertyAsInt(properties(), property, defaultValue);
}

default long propertyAsLong(String property, long defaultValue) {
return PropertyUtil.propertyAsLong(properties(), property, defaultValue);
}
}
21 changes: 1 addition & 20 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,26 +285,7 @@ private Map<String, String> tableOverrideProperties() {
}

protected static String fullTableName(String catalogName, TableIdentifier identifier) {
StringBuilder sb = new StringBuilder();

if (catalogName.contains("/") || catalogName.contains(":")) {
// use / for URI-like names: thrift://host:port/db.table
sb.append(catalogName);
if (!catalogName.endsWith("/")) {
sb.append("/");
}
} else {
// use . for non-URI named catalogs: prod.db.table
sb.append(catalogName).append(".");
}

for (String level : identifier.namespace().levels()) {
sb.append(level).append(".");
}

sb.append(identifier.name());

return sb.toString();
return CatalogUtil.fullTableName(catalogName, identifier);
}

protected MetricsReporter metricsReporter() {
Expand Down
118 changes: 118 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseMetastoreOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreOperations.class);

public enum CommitStatus {
FAILURE,
SUCCESS,
UNKNOWN
}

/**
* Attempt to load the content and see if any current or past metadata location matches the one we
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
* may indicate the commit has failed but don't have proof that this is the case. Note that all
* the previous locations must also be searched on the chance that a second committer was able to
* successfully commit on top of our commit.
*
* @param contentName full name of the content
* @param newMetadataLocation the path of the new commit file
* @param properties properties for retry
* @param newMetadataCheckSupplier check if the latest metadata presents or not using metadata
* location for table, version id for view.
* @return Commit Status of Success, Failure or Unknown
*/
protected CommitStatus checkCommitStatus(
String contentName,
String newMetadataLocation,
Map<String, String> properties,
Supplier<Boolean> newMetadataCheckSupplier) {
int maxAttempts =
PropertyUtil.propertyAsInt(
properties, COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT);
long minWaitMs =
PropertyUtil.propertyAsLong(
properties, COMMIT_STATUS_CHECKS_MIN_WAIT_MS, COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
long maxWaitMs =
PropertyUtil.propertyAsLong(
properties, COMMIT_STATUS_CHECKS_MAX_WAIT_MS, COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
long totalRetryMs =
PropertyUtil.propertyAsLong(
properties,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);

AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);

Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
.onFailure(
(location, checkException) ->
LOG.error("Cannot check if commit to {} exists.", contentName, checkException))
.run(
location -> {
boolean commitSuccess = newMetadataCheckSupplier.get();

if (commitSuccess) {
LOG.info(
"Commit status check: Commit to {} of {} succeeded",
contentName,
newMetadataLocation);
status.set(CommitStatus.SUCCESS);
} else {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
contentName,
newMetadataLocation);
}
});

if (status.get() == CommitStatus.UNKNOWN) {
LOG.error(
"Cannot determine commit state to {}. Failed during checking {} times. "
+ "Treating commit state as unknown.",
contentName,
maxAttempts);
}
return status.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,12 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
Expand All @@ -42,14 +34,15 @@
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseMetastoreTableOperations implements TableOperations {
public abstract class BaseMetastoreTableOperations extends BaseMetastoreOperations
implements TableOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreTableOperations.class);

public static final String TABLE_TYPE_PROP = "table_type";
Expand Down Expand Up @@ -291,6 +284,8 @@ public long newSnapshotId() {
};
}

/** @deprecated Use {@link BaseMetastoreOperations.CommitStatus} */
@Deprecated
protected enum CommitStatus {
FAILURE,
SUCCESS,
Expand All @@ -309,65 +304,39 @@ protected enum CommitStatus {
* @return Commit Status of Success, Failure or Unknown
*/
protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
int maxAttempts =
PropertyUtil.propertyAsInt(
config.properties(), COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT);
long minWaitMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_MIN_WAIT_MS,
COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
long maxWaitMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_MAX_WAIT_MS,
COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
long totalRetryMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);

AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);

Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
.onFailure(
(location, checkException) ->
LOG.error("Cannot check if commit to {} exists.", tableName(), checkException))
.run(
location -> {
TableMetadata metadata = refresh();
String currentMetadataFileLocation = metadata.metadataFileLocation();
boolean commitSuccess =
currentMetadataFileLocation.equals(newMetadataLocation)
|| metadata.previousFiles().stream()
.anyMatch(log -> log.file().equals(newMetadataLocation));
if (commitSuccess) {
LOG.info(
"Commit status check: Commit to {} of {} succeeded",
tableName(),
newMetadataLocation);
status.set(CommitStatus.SUCCESS);
} else {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
tableName(),
newMetadataLocation);
}
});

if (status.get() == CommitStatus.UNKNOWN) {
LOG.error(
"Cannot determine commit state to {}. Failed during checking {} times. "
+ "Treating commit state as unknown.",
tableName(),
maxAttempts);
return CommitStatus.valueOf(
checkCommitStatus(
tableName(),
newMetadataLocation,
config.properties(),
() -> checkCurrentMetadataLocation(newMetadataLocation))
.name());
}

/**
* Checks the new metadata location presents or not after refreshing the table.
*
* @param newMetadataLocation newly written metadata location
* @return true if the new metadata location presents with current or previous metadata files.
*/
protected boolean checkCurrentMetadataLocation(String newMetadataLocation) {
TableMetadata metadata = refresh();
Preconditions.checkNotNull(metadata, "Unexpected null table metadata");
ImmutableList.Builder<String> builder = ImmutableList.builder();
String latestMetadataLocation = metadata.metadataFileLocation();
if (latestMetadataLocation != null) {
builder.add(latestMetadataLocation);
}
return status.get();

ImmutableList<String> allMetadataLocations =
builder
.addAll(
metadata.previousFiles().stream()
.map(TableMetadata.MetadataLogEntry::file)
.collect(Collectors.toList()))
.build();

return allMetadataLocations.contains(newMetadataLocation);
}

private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
Expand Down
Loading

0 comments on commit a9c09c9

Please sign in to comment.