Skip to content

Commit

Permalink
Hive: Arrange common part of the code for Iceberg View.
Browse files Browse the repository at this point in the history
  • Loading branch information
nk1506 committed Mar 19, 2024
1 parent 5c27843 commit 438c413
Show file tree
Hide file tree
Showing 12 changed files with 692 additions and 346 deletions.
21 changes: 1 addition & 20 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,26 +285,7 @@ private Map<String, String> tableOverrideProperties() {
}

protected static String fullTableName(String catalogName, TableIdentifier identifier) {
StringBuilder sb = new StringBuilder();

if (catalogName.contains("/") || catalogName.contains(":")) {
// use / for URI-like names: thrift://host:port/db.table
sb.append(catalogName);
if (!catalogName.endsWith("/")) {
sb.append("/");
}
} else {
// use . for non-URI named catalogs: prod.db.table
sb.append(catalogName).append(".");
}

for (String level : identifier.namespace().levels()) {
sb.append(level).append(".");
}

sb.append(identifier.name());

return sb.toString();
return CatalogUtil.fullTableName(catalogName, identifier);
}

protected MetricsReporter metricsReporter() {
Expand Down
118 changes: 118 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseMetastoreOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreOperations.class);

public enum CommitStatus {
FAILURE,
SUCCESS,
UNKNOWN
}

/**
* Attempt to load the content and see if any current or past metadata location matches the one we
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
* may indicate the commit has failed but don't have proof that this is the case. Note that all
* the previous locations must also be searched on the chance that a second committer was able to
* successfully commit on top of our commit.
*
* @param contentName full name of the content
* @param newMetadataLocation the path of the new commit file
* @param properties properties for retry
* @param newMetadataCheckSupplier check if the latest metadata presents or not using metadata
* location for table, version id for view.
* @return Commit Status of Success, Failure or Unknown
*/
protected CommitStatus checkCommitStatus(
String contentName,
String newMetadataLocation,
Map<String, String> properties,
Supplier<Boolean> newMetadataCheckSupplier) {
int maxAttempts =
PropertyUtil.propertyAsInt(
properties, COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT);
long minWaitMs =
PropertyUtil.propertyAsLong(
properties, COMMIT_STATUS_CHECKS_MIN_WAIT_MS, COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
long maxWaitMs =
PropertyUtil.propertyAsLong(
properties, COMMIT_STATUS_CHECKS_MAX_WAIT_MS, COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
long totalRetryMs =
PropertyUtil.propertyAsLong(
properties,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);

AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);

Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
.onFailure(
(location, checkException) ->
LOG.error("Cannot check if commit to {} exists.", contentName, checkException))
.run(
location -> {
boolean commitSuccess = newMetadataCheckSupplier.get();

if (commitSuccess) {
LOG.info(
"Commit status check: Commit to {} of {} succeeded",
contentName,
newMetadataLocation);
status.set(CommitStatus.SUCCESS);
} else {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
contentName,
newMetadataLocation);
}
});

if (status.get() == CommitStatus.UNKNOWN) {
LOG.error(
"Cannot determine commit state to {}. Failed during checking {} times. "
+ "Treating commit state as unknown.",
contentName,
maxAttempts);
}
return status.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,12 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
Expand All @@ -42,14 +34,15 @@
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseMetastoreTableOperations implements TableOperations {
public abstract class BaseMetastoreTableOperations extends BaseMetastoreOperations
implements TableOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreTableOperations.class);

public static final String TABLE_TYPE_PROP = "table_type";
Expand Down Expand Up @@ -291,6 +284,8 @@ public long newSnapshotId() {
};
}

/** @deprecated Use {@link BaseMetastoreOperations.CommitStatus} */
@Deprecated
protected enum CommitStatus {
FAILURE,
SUCCESS,
Expand All @@ -309,65 +304,39 @@ protected enum CommitStatus {
* @return Commit Status of Success, Failure or Unknown
*/
protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
int maxAttempts =
PropertyUtil.propertyAsInt(
config.properties(), COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT);
long minWaitMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_MIN_WAIT_MS,
COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
long maxWaitMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_MAX_WAIT_MS,
COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
long totalRetryMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);

AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);

Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
.onFailure(
(location, checkException) ->
LOG.error("Cannot check if commit to {} exists.", tableName(), checkException))
.run(
location -> {
TableMetadata metadata = refresh();
String currentMetadataFileLocation = metadata.metadataFileLocation();
boolean commitSuccess =
currentMetadataFileLocation.equals(newMetadataLocation)
|| metadata.previousFiles().stream()
.anyMatch(log -> log.file().equals(newMetadataLocation));
if (commitSuccess) {
LOG.info(
"Commit status check: Commit to {} of {} succeeded",
tableName(),
newMetadataLocation);
status.set(CommitStatus.SUCCESS);
} else {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
tableName(),
newMetadataLocation);
}
});

if (status.get() == CommitStatus.UNKNOWN) {
LOG.error(
"Cannot determine commit state to {}. Failed during checking {} times. "
+ "Treating commit state as unknown.",
tableName(),
maxAttempts);
return CommitStatus.valueOf(
checkCommitStatus(
tableName(),
newMetadataLocation,
config.properties(),
() -> checkCurrentMetadataLocation(newMetadataLocation))
.name());
}

/**
* Checks the new metadata location presents or not after refreshing the table.
*
* @param newMetadataLocation newly written metadata location
* @return true if the new metadata location presents with current or previous metadata files.
*/
protected boolean checkCurrentMetadataLocation(String newMetadataLocation) {
TableMetadata metadata = refresh();
Preconditions.checkNotNull(metadata, "Unexpected null table metadata");
ImmutableList.Builder<String> builder = ImmutableList.builder();
String latestMetadataLocation = metadata.metadataFileLocation();
if (latestMetadataLocation != null) {
builder.add(latestMetadataLocation);
}
return status.get();

ImmutableList<String> allMetadataLocations =
builder
.addAll(
metadata.previousFiles().stream()
.map(TableMetadata.MetadataLogEntry::file)
.collect(Collectors.toList()))
.build();

return allMetadataLocations.contains(newMetadataLocation);
}

private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
Expand Down
37 changes: 37 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.common.DynMethods;
Expand All @@ -46,6 +47,7 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.iceberg.view.ViewMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -136,6 +138,18 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {
deleteFile(io, metadata.metadataFileLocation(), "metadata");
}

/**
* Drops view metadata files referenced by ViewMetadata.
*
* <p>This should be called by dropView implementations
*
* @param io a FileIO to use for deletes
* @param metadata the last valid ViewMetadata instance for a dropped view.
*/
public static void dropViewMetadata(FileIO io, ViewMetadata metadata) {
deleteFile(io, metadata.metadataFileLocation(), "metadata");
}

@SuppressWarnings("DangerousStringInternUsage")
private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
// keep track of deleted files in a map that can be cleaned up when memory runs low
Expand Down Expand Up @@ -473,4 +487,27 @@ public static MetricsReporter loadMetricsReporter(Map<String, String> properties

return reporter;
}

public static String fullTableName(String catalogName, TableIdentifier identifier) {
StringBuilder sb = new StringBuilder();

if (catalogName.contains("/") || catalogName.contains(":")) {
// use / for URI-like names: thrift://host:port/db.table
sb.append(catalogName);
if (!catalogName.endsWith("/")) {
sb.append("/");
}
} else {
// use . for non-URI named catalogs: prod.db.table
sb.append(catalogName).append(".");
}

for (String level : identifier.namespace().levels()) {
sb.append(level).append(".");
}

sb.append(identifier.name());

return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.BaseMetastoreOperations;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
Expand All @@ -35,7 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseViewOperations implements ViewOperations {
public abstract class BaseViewOperations extends BaseMetastoreOperations implements ViewOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseViewOperations.class);

private static final String METADATA_FOLDER_NAME = "metadata";
Expand Down
Loading

0 comments on commit 438c413

Please sign in to comment.