Skip to content

Commit

Permalink
[No-Merge] Test Hive_view with Spark
Browse files Browse the repository at this point in the history
  • Loading branch information
nk1506 committed Apr 2, 2024
1 parent a8a0ba4 commit 21b4255
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testSuppressUnlockExceptions() {

AtomicReference<HiveLock> lockRef = new AtomicReference<>();

when(spyOps.lockObject(metadataV1, catalog.getConf(), catalog.name()))
when(spyOps.lockObject(metadataV1))
.thenAnswer(
i -> {
HiveLock lock = (HiveLock) i.callRealMethod();
Expand Down Expand Up @@ -273,11 +273,11 @@ public void testThriftExceptionConcurrentCommit() throws TException, Interrupted
AtomicReference<HiveLock> lock = new AtomicReference<>();
doAnswer(
l -> {
lock.set(ops.lockObject(metadataV1, catalog.getConf(), catalog.name()));
lock.set(ops.lockObject(metadataV1));
return lock.get();
})
.when(spyOps)
.lockObject(metadataV1, catalog.getConf(), catalog.name());
.lockObject(metadataV1);

concurrentCommitAndThrowException(ops, spyOps, table, lock);

Expand Down Expand Up @@ -332,7 +332,7 @@ public void testNoLockThriftExceptionConcurrentCommit() throws TException, Inter
HiveTableOperations spyOps = spy(ops);

// Sets NoLock
doReturn(new NoLock()).when(spyOps).lockObject(any(), any(), any());
doReturn(new NoLock()).when(spyOps).lockObject(any());

// Simulate a concurrent table modification error
doThrow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class TestHiveMetastore {

// It's tricky to clear all static fields in an HMS instance in order to switch derby root dir.
// Therefore, we reuse the same derby root between tests and remove it after JVM exits.
private static final File HIVE_LOCAL_DIR;
public static final File HIVE_LOCAL_DIR;
private static final String DERBY_PATH;

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalogConfig;
Expand All @@ -46,6 +47,7 @@
import org.apache.iceberg.view.ViewHistoryEntry;
import org.apache.iceberg.view.ViewProperties;
import org.apache.iceberg.view.ViewVersion;
import org.apache.spark.SparkException;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -81,9 +83,9 @@ public void removeTable() {
public static Object[][] parameters() {
return new Object[][] {
{
SparkCatalogConfig.SPARK_WITH_VIEWS.catalogName(),
SparkCatalogConfig.SPARK_WITH_VIEWS.implementation(),
SparkCatalogConfig.SPARK_WITH_VIEWS.properties()
SparkCatalogConfig.SPARK_WITH_HIVE_VIEWS.catalogName(),
SparkCatalogConfig.SPARK_WITH_HIVE_VIEWS.implementation(),
SparkCatalogConfig.SPARK_WITH_HIVE_VIEWS.properties()
}
};
}
Expand Down Expand Up @@ -756,7 +758,7 @@ public void renameViewToDifferentTargetCatalog() {
assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO spark_catalog.%s", viewName, renamedView))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(
"Cannot move view between catalogs: from=spark_with_views and to=spark_catalog");
"Cannot move view between catalogs: from=spark_with_hive_views and to=spark_catalog");
}

@TestTemplate
Expand Down Expand Up @@ -900,8 +902,8 @@ public void dropV1View() {
String v1View = "v1ViewToBeDropped";
sql("USE spark_catalog");
sql("CREATE NAMESPACE IF NOT EXISTS %s", NAMESPACE);
sql("CREATE TABLE %s (id INT, data STRING)", tableName);
sql("CREATE VIEW %s AS SELECT id FROM %s", v1View, tableName);
sql("CREATE TABLE IF NOT EXISTS %s (id INT, data STRING)", tableName);
sql("CREATE VIEW IF NOT EXISTS %s AS SELECT id FROM %s", v1View, tableName);
sql("USE %s", catalogName);
assertThat(
v1SessionCatalog()
Expand Down Expand Up @@ -1318,8 +1320,8 @@ public void createViewWithSubqueryExpressionInFilterThatIsRewritten()
sql("USE spark_catalog");

assertThatThrownBy(() -> sql(sql))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(String.format("The table or view `%s` cannot be found", tableName));
.isInstanceOf(SparkException.class)
.hasMessageContaining("Exception thrown in awaitResult");

// the underlying SQL in the View should be rewritten to have catalog & namespace
assertThat(sql("SELECT * FROM %s.%s.%s", catalogName, NAMESPACE, viewName))
Expand All @@ -1343,8 +1345,8 @@ public void createViewWithSubqueryExpressionInQueryThatIsRewritten() throws NoSu
sql("USE spark_catalog");

assertThatThrownBy(() -> sql(sql))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(String.format("The table or view `%s` cannot be found", tableName));
.isInstanceOf(SparkException.class)
.hasMessageContaining("Exception thrown in awaitResult");

// the underlying SQL in the View should be rewritten to have catalog & namespace
assertThat(sql("SELECT * FROM %s.%s.%s", catalogName, NAMESPACE, viewName))
Expand Down Expand Up @@ -1381,8 +1383,8 @@ public void describeExtendedView() {
row(
"View Properties",
String.format(
"['format-version' = '1', 'location' = '/%s/%s', 'provider' = 'iceberg']",
NAMESPACE, viewName),
"['format-version' = '1', 'location' = 'file:%s/%s', 'provider' = 'iceberg']",
TestHiveMetastore.HIVE_LOCAL_DIR, viewName),
""));
}

Expand Down Expand Up @@ -1430,29 +1432,29 @@ public void showViews() throws NoSuchTableException {
Object[] tempView = row("", "tempviewforlisting", true);
assertThat(sql("SHOW VIEWS"))
.contains(
row(NAMESPACE.toString(), "prefixV2", false),
row(NAMESPACE.toString(), "prefixV3", false),
row(NAMESPACE.toString(), "prefixv2", false),
row(NAMESPACE.toString(), "prefixv3", false),
row(NAMESPACE.toString(), "v1", false),
tempView);

assertThat(sql("SHOW VIEWS IN %s", catalogName))
assertThat(sql("SHOW VIEWS IN %s.%s", catalogName, NAMESPACE))
.contains(
row(NAMESPACE.toString(), "prefixV2", false),
row(NAMESPACE.toString(), "prefixV3", false),
row(NAMESPACE.toString(), "prefixv2", false),
row(NAMESPACE.toString(), "prefixv3", false),
row(NAMESPACE.toString(), "v1", false),
tempView);

assertThat(sql("SHOW VIEWS IN %s.%s", catalogName, NAMESPACE))
.contains(
row(NAMESPACE.toString(), "prefixV2", false),
row(NAMESPACE.toString(), "prefixV3", false),
row(NAMESPACE.toString(), "prefixv2", false),
row(NAMESPACE.toString(), "prefixv3", false),
row(NAMESPACE.toString(), "v1", false),
tempView);

assertThat(sql("SHOW VIEWS LIKE 'pref*'"))
.contains(
row(NAMESPACE.toString(), "prefixV2", false),
row(NAMESPACE.toString(), "prefixV3", false));
row(NAMESPACE.toString(), "prefixv2", false),
row(NAMESPACE.toString(), "prefixv3", false));

assertThat(sql("SHOW VIEWS LIKE 'non-existing'")).isEmpty();

Expand All @@ -1477,8 +1479,8 @@ public void showViewsWithCurrentNamespace() {
sql("CREATE VIEW %s.%s AS SELECT * FROM %s.%s", namespaceOne, viewOne, NAMESPACE, tableName);
sql("CREATE VIEW %s.%s AS SELECT * FROM %s.%s", namespaceTwo, viewTwo, NAMESPACE, tableName);

Object[] v1 = row(namespaceOne, viewOne, false);
Object[] v2 = row(namespaceTwo, viewTwo, false);
Object[] v1 = row(namespaceOne, viewOne.toLowerCase(), false);
Object[] v2 = row(namespaceTwo, viewTwo.toLowerCase(), false);

assertThat(sql("SHOW VIEWS IN %s.%s", catalogName, namespaceOne))
.contains(v1)
Expand Down Expand Up @@ -1509,10 +1511,10 @@ public void showCreateSimpleView() {
+ " data)\n"
+ "TBLPROPERTIES (\n"
+ " 'format-version' = '1',\n"
+ " 'location' = '/%s/%s',\n"
+ " 'location' = 'file:%s/%s',\n"
+ " 'provider' = 'iceberg')\n"
+ "AS\n%s\n",
catalogName, NAMESPACE, viewName, NAMESPACE, viewName, sql);
catalogName, NAMESPACE, viewName, TestHiveMetastore.HIVE_LOCAL_DIR, viewName, sql);
assertThat(sql("SHOW CREATE TABLE %s", viewName)).containsExactly(row(expected));
}

Expand All @@ -1536,10 +1538,10 @@ public void showCreateComplexView() {
+ " 'format-version' = '1',\n"
+ " 'key1' = 'val1',\n"
+ " 'key2' = 'val2',\n"
+ " 'location' = '/%s/%s',\n"
+ " 'location' = 'file:%s/%s',\n"
+ " 'provider' = 'iceberg')\n"
+ "AS\n%s\n",
catalogName, NAMESPACE, viewName, NAMESPACE, viewName, sql);
catalogName, NAMESPACE, viewName, TestHiveMetastore.HIVE_LOCAL_DIR, viewName, sql);
assertThat(sql("SHOW CREATE TABLE %s", viewName)).containsExactly(row(expected));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Map;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

Expand All @@ -44,6 +45,16 @@ public enum SparkCatalogConfig {
"cache-enabled",
"false" // Spark will delete tables using v1, leaving the cache out of sync
)),
SPARK_WITH_HIVE_VIEWS(
"spark_with_hive_views",
SparkCatalog.class.getName(),
ImmutableMap.of(
CatalogProperties.CATALOG_IMPL,
HiveCatalog.class.getName(),
"default-namespace",
"default",
"cache-enabled",
"false")),
SPARK_WITH_VIEWS(
"spark_with_views",
SparkCatalog.class.getName(),
Expand Down

0 comments on commit 21b4255

Please sign in to comment.