Skip to content

Commit

Permalink
Hive: Add View support for HIVE catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
nk1506 committed Mar 6, 2024
1 parent 08e31ce commit bb2efa4
Show file tree
Hide file tree
Showing 19 changed files with 1,116 additions and 400 deletions.
13 changes: 13 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,9 @@ acceptedBreaks:
- code: "java.class.removed"
old: "class org.apache.iceberg.rest.requests.UpdateTableRequest.Builder"
justification: "Removing deprecated code"
- code: "java.class.removed"
old: "enum org.apache.iceberg.BaseMetastoreTableOperations.CommitStatus"
justification: "Moved CommitStatus to BaseMetastoreOperations for better reach"
- code: "java.class.removed"
old: "interface org.apache.iceberg.actions.RewriteStrategy"
justification: "Removing deprecated code"
Expand All @@ -977,6 +980,10 @@ acceptedBreaks:
- code: "java.field.serialVersionUIDChanged"
new: "field org.apache.iceberg.util.SerializableMap<K, V>.serialVersionUID"
justification: "Serialization is not be used"
- code: "java.method.removed"
old: "method java.lang.String org.apache.iceberg.BaseMetastoreCatalog::fullTableName(java.lang.String,\
\ org.apache.iceberg.catalog.TableIdentifier)"
justification: "Moved to CatalogUtil for better usability"
- code: "java.method.removed"
old: "method org.apache.iceberg.TableOperations org.apache.iceberg.BaseMetadataTable::operations()"
justification: "Removing deprecated code"
Expand Down Expand Up @@ -1018,6 +1025,12 @@ acceptedBreaks:
old: "method void org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan::<init>(org.apache.iceberg.Table,\
\ org.apache.iceberg.Schema, org.apache.iceberg.TableScanContext)"
justification: "Removing deprecated code"
- code: "java.method.returnTypeChanged"
old: "method org.apache.iceberg.BaseMetastoreTableOperations.CommitStatus org.apache.iceberg.BaseMetastoreTableOperations::checkCommitStatus(java.lang.String,\
\ org.apache.iceberg.TableMetadata)"
new: "method org.apache.iceberg.BaseMetastoreOperations.CommitStatus org.apache.iceberg.BaseMetastoreTableOperations::checkCommitStatus(java.lang.String,\
\ org.apache.iceberg.TableMetadata)"
justification: "Moved CommitStatus to BaseMetastoreOperations for better reach"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.exceptions;

import com.google.errorprone.annotations.FormatMethod;

/** NoSuchIcebergViewException thrown when a view is found, but it is not an Iceberg view. */
public class NoSuchIcebergViewException extends NoSuchViewException {
@FormatMethod
public NoSuchIcebergViewException(String message, Object... args) {
super(message, args);
}

@FormatMethod
public static void check(boolean test, String message, Object... args) {
if (!test) {
throw new NoSuchIcebergViewException(message, args);
}
}
}
53 changes: 53 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.io.Serializable;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.iceberg.util.PropertyUtil;

/** A base class for {@link TableMetadata} and {@link org.apache.iceberg.view.ViewMetadata} */
public interface BaseMetadata extends Serializable {

String location();

Map<String, String> properties();

@Nullable
String metadataFileLocation();

Schema schema();

default String property(String property, String defaultValue) {
return properties().getOrDefault(property, defaultValue);
}

default boolean propertyAsBoolean(String property, boolean defaultValue) {
return PropertyUtil.propertyAsBoolean(properties(), property, defaultValue);
}

default int propertyAsInt(String property, int defaultValue) {
return PropertyUtil.propertyAsInt(properties(), property, defaultValue);
}

default long propertyAsLong(String property, long defaultValue) {
return PropertyUtil.propertyAsLong(properties(), property, defaultValue);
}
}
30 changes: 4 additions & 26 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public Table loadTable(TableIdentifier identifier) {
}

} else {
result = new BaseTable(ops, fullTableName(name(), identifier), metricsReporter());
result =
new BaseTable(ops, CatalogUtil.fullTableName(name(), identifier), metricsReporter());
}

} else if (isValidMetadataIdentifier(identifier)) {
Expand Down Expand Up @@ -88,7 +89,7 @@ public Table registerTable(TableIdentifier identifier, String metadataFileLocati
TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
ops.commit(null, metadata);

return new BaseTable(ops, fullTableName(name(), identifier), metricsReporter());
return new BaseTable(ops, CatalogUtil.fullTableName(name(), identifier), metricsReporter());
}

@Override
Expand Down Expand Up @@ -203,7 +204,7 @@ public Table create() {
throw new AlreadyExistsException("Table was created concurrently: %s", identifier);
}

return new BaseTable(ops, fullTableName(name(), identifier), metricsReporter());
return new BaseTable(ops, CatalogUtil.fullTableName(name(), identifier), metricsReporter());
}

@Override
Expand Down Expand Up @@ -284,29 +285,6 @@ private Map<String, String> tableOverrideProperties() {
}
}

protected static String fullTableName(String catalogName, TableIdentifier identifier) {
StringBuilder sb = new StringBuilder();

if (catalogName.contains("/") || catalogName.contains(":")) {
// use / for URI-like names: thrift://host:port/db.table
sb.append(catalogName);
if (!catalogName.endsWith("/")) {
sb.append("/");
}
} else {
// use . for non-URI named catalogs: prod.db.table
sb.append(catalogName).append(".");
}

for (String level : identifier.namespace().levels()) {
sb.append(level).append(".");
}

sb.append(identifier.name());

return sb.toString();
}

protected MetricsReporter metricsReporter() {
if (metricsReporter == null) {
metricsReporter = CatalogUtil.loadMetricsReporter(properties());
Expand Down
119 changes: 119 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseMetastoreOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreOperations.class);

public enum CommitStatus {
FAILURE,
SUCCESS,
UNKNOWN
}

/**
* Attempt to load the entity and see if any current or past metadata location matches the one we
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
* may indicate the commit has failed but don't have proof that this is the case. Note that all
* the previous locations must also be searched on the chance that a second committer was able to
* successfully commit on top of our commit.
*
* @param entityName full name of the entity
* @param newMetadataLocation the path of the new commit file
* @param properties properties for retry
* @param loadMetadataLocations supply all the metadata locations
* @return Commit Status of Success, Failure or Unknown
*/
protected CommitStatus checkCommitStatus(
String entityName,
String newMetadataLocation,
Map<String, String> properties,
Supplier<List<String>> loadMetadataLocations) {
int maxAttempts =
PropertyUtil.propertyAsInt(
properties, COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT);
long minWaitMs =
PropertyUtil.propertyAsLong(
properties, COMMIT_STATUS_CHECKS_MIN_WAIT_MS, COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
long maxWaitMs =
PropertyUtil.propertyAsLong(
properties, COMMIT_STATUS_CHECKS_MAX_WAIT_MS, COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
long totalRetryMs =
PropertyUtil.propertyAsLong(
properties,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);

AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);

Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
.onFailure(
(location, checkException) ->
LOG.error("Cannot check if commit to {} exists.", entityName, checkException))
.run(
location -> {
List<String> allMetadataLocations = loadMetadataLocations.get();
boolean commitSuccess = allMetadataLocations.contains(newMetadataLocation);

if (commitSuccess) {
LOG.info(
"Commit status check: Commit to {} of {} succeeded",
entityName,
newMetadataLocation);
status.set(CommitStatus.SUCCESS);
} else {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
entityName,
newMetadataLocation);
}
});

if (status.get() == CommitStatus.UNKNOWN) {
LOG.error(
"Cannot determine commit state to {}. Failed during checking {} times. "
+ "Treating commit state as unknown.",
entityName,
maxAttempts);
}
return status.get();
}
}
Loading

0 comments on commit bb2efa4

Please sign in to comment.