Skip to content

Commit

Permalink
[hive] Fix listTablesImpl possible timeout issue (apache#4800)
Browse files Browse the repository at this point in the history
  • Loading branch information
XiaoHongbo-Hope authored Dec 30, 2024
1 parent 8bad4e0 commit 71921c5
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.paimon.view.View;
import org.apache.paimon.view.ViewImpl;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

import org.apache.flink.table.hive.LegacyHiveClasses;
Expand Down Expand Up @@ -143,7 +144,7 @@ public class HiveCatalog extends AbstractCatalog {
private static final String HIVE_PREFIX = "hive.";
public static final String HIVE_SITE_FILE = "hive-site.xml";
private static final String HIVE_EXTERNAL_TABLE_PROP = "EXTERNAL";

private static final int DEFAULT_TABLE_BATCH_SIZE = 300;
private final HiveConf hiveConf;
private final String clientClassName;
private final Options options;
Expand Down Expand Up @@ -442,8 +443,34 @@ protected void alterDatabaseImpl(String name, List<PropertyChange> changes) {
protected List<String> listTablesImpl(String databaseName) {
try {
List<String> tableNames = clients.run(client -> client.getAllTables(databaseName));
int batchSize = getBatchGetTableSize();
List<Table> hmsTables =
clients.run(client -> client.getTableObjectsByName(databaseName, tableNames));
Lists.partition(tableNames, batchSize).stream()
.flatMap(
batchTableNames -> {
try {
return clients
.run(
client ->
client.getTableObjectsByName(
databaseName,
batchTableNames))
.stream();
} catch (TException e) {
throw new RuntimeException(
"Failed to getTableObjectsByName in database "
+ databaseName,
e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(
"Interrupted in call to getTableObjectsByName "
+ databaseName,
e);
}
})
.collect(Collectors.toList());

List<String> result = new ArrayList<>(hmsTables.size());
for (Table table : hmsTables) {
if (isPaimonTable(table) || (!formatTableDisabled() && isFormatTable(table))) {
Expand Down Expand Up @@ -1414,4 +1441,27 @@ public static HiveConf createHiveConf(CatalogContext context) {
public static String possibleHiveConfPath() {
return System.getenv("HIVE_CONF_DIR");
}

public int getBatchGetTableSize() {
try {
int size =
Integer.parseInt(
this.hiveConf.get(
HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX.varname,
String.valueOf(
HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX
.getDefaultValue())));
if (size < 1) {
return DEFAULT_TABLE_BATCH_SIZE;
} else {
return size;
}
} catch (Exception e) {
LOG.warn(
"parse batch size failed {}, use default batch size",
this.hiveConf.get(HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX.varname),
e);
return DEFAULT_TABLE_BATCH_SIZE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.apache.paimon.hive.HiveCatalog.TABLE_TYPE_PROP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

/** Tests for {@link HiveCatalog}. */
Expand Down Expand Up @@ -354,6 +355,58 @@ public void testListTablesLock() {
}
}

@Test
public void testListTables() throws Exception {
String databaseName = "testListTables";
catalog.dropDatabase(databaseName, true, true);
catalog.createDatabase(databaseName, true);
for (int i = 0; i < 500; i++) {
catalog.createTable(
Identifier.create(databaseName, "table" + i),
Schema.newBuilder().column("col", DataTypes.INT()).build(),
true);
}

// use default 300
List<String> defaultBatchTables = catalog.listTables(databaseName);

// use custom 400
HiveConf hiveConf = new HiveConf();
hiveConf.set(HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX.varname, "400");
String metastoreClientClass = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
List<String> customBatchTables;
try (HiveCatalog customCatalog =
new HiveCatalog(fileIO, hiveConf, metastoreClientClass, warehouse)) {
customBatchTables = customCatalog.listTables(databaseName);
} catch (Exception e) {
throw e;
}
assertEquals(defaultBatchTables.size(), customBatchTables.size());
defaultBatchTables.sort(String::compareTo);
customBatchTables.sort(String::compareTo);
for (int i = 0; i < defaultBatchTables.size(); i++) {
assertEquals(defaultBatchTables.get(i), customBatchTables.get(i));
}

// use invalid batch size
HiveConf invalidHiveConf = new HiveConf();
invalidHiveConf.set(HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX.varname, "dummy");
List<String> invalidBatchSizeTables;
try (HiveCatalog invalidBatchSizeCatalog =
new HiveCatalog(fileIO, invalidHiveConf, metastoreClientClass, warehouse)) {
invalidBatchSizeTables = invalidBatchSizeCatalog.listTables(databaseName);
} catch (Exception e) {
throw e;
}
assertEquals(defaultBatchTables.size(), invalidBatchSizeTables.size());
invalidBatchSizeTables.sort(String::compareTo);
for (int i = 0; i < defaultBatchTables.size(); i++) {
assertEquals(defaultBatchTables.get(i), invalidBatchSizeTables.get(i));
}

catalog.dropDatabase(databaseName, true, true);
}

@Override
protected boolean supportsView() {
return true;
Expand Down

0 comments on commit 71921c5

Please sign in to comment.