Skip to content

Commit

Permalink
[test] Add ut case to check hive clients pool deadlock (#4493)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxdzs0612 authored Nov 11, 2024
1 parent c25eeef commit 800e26d
Showing 1 changed file with 81 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.hive;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.client.ClientPool;
Expand All @@ -39,13 +40,15 @@
import org.junit.jupiter.api.Test;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY;
import static org.apache.paimon.hive.HiveCatalog.PAIMON_TABLE_TYPE_VALUE;
Expand Down Expand Up @@ -269,6 +272,84 @@ public void testAlterHiveTableParameters() {
}
}

@Test
public void testListTablesLock() {
try {
String databaseName = "test_db";
catalog.createDatabase(databaseName, false);

Map<String, String> options = new HashMap<>();
Schema addHiveTableParametersSchema =
new Schema(
Lists.newArrayList(
new DataField(0, "pk", DataTypes.INT()),
new DataField(1, "col1", DataTypes.STRING()),
new DataField(2, "col2", DataTypes.STRING())),
Collections.emptyList(),
Collections.emptyList(),
options,
"this is a hive table");

for (int i = 0; i < 100; i++) {
String tableName = "new_table" + i;
catalog.createTable(
Identifier.create(databaseName, tableName),
addHiveTableParametersSchema,
false);
}
List<String> tables1 = new ArrayList<>();
List<String> tables2 = new ArrayList<>();

Thread thread1 =
new Thread(
() -> {
System.out.println(
"First thread started at " + System.currentTimeMillis());
try {
tables1.addAll(catalog.listTables(databaseName));
} catch (Catalog.DatabaseNotExistException e) {
throw new RuntimeException(e);
}
});
Thread thread2 =
new Thread(
() -> {
System.out.println(
"Second thread started at " + System.currentTimeMillis());
try {
tables2.addAll(catalog.listTables(databaseName));
} catch (Catalog.DatabaseNotExistException e) {
throw new RuntimeException(e);
}
});

thread1.start();
thread2.start();

long timeout = 5000;
long startTime = System.currentTimeMillis();

AtomicBoolean deadlockDetected = new AtomicBoolean(false);
while (thread1.isAlive() || thread2.isAlive()) {
if (System.currentTimeMillis() - startTime > timeout) {
deadlockDetected.set(true);
thread1.interrupt();
thread2.interrupt();
break;
}

Thread.sleep(100);
}

assertThat(deadlockDetected).isFalse();
assertThat(tables1).size().isEqualTo(100);
assertThat(tables1).containsAll(tables2);
assertThat(tables2).containsAll(tables1);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
protected boolean supportsView() {
return true;
Expand Down

0 comments on commit 800e26d

Please sign in to comment.