Skip to content

Commit

Permalink
[hive] Fix cache bug in hive client pool (apache#4010)
Browse files Browse the repository at this point in the history
  • Loading branch information
mircodee authored Aug 26, 2024
1 parent 97d343f commit 7ef5091
Show file tree
Hide file tree
Showing 3 changed files with 545 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ public void setUp() throws Exception {
@AfterEach
void tearDown() throws Exception {
if (catalog != null) {
List<String> dbs = catalog.listDatabases();
for (String db : dbs) {
try {
catalog.dropDatabase(db, true, true);
} catch (Exception ignored) {
}
}
catalog.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -77,9 +78,18 @@ public CachedClientPool(Configuration conf, Options options, String clientClassN
this.conf = conf;
this.clientPoolSize = options.get(CLIENT_POOL_SIZE);
this.evictionInterval = options.get(CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS);
this.key = extractKey(options.get(CLIENT_POOL_CACHE_KEYS), conf);
this.key = extractKey(clientClassName, options.get(CLIENT_POOL_CACHE_KEYS), conf);
this.clientClassName = clientClassName;
init();
// set ugi information to hms client
try {
run(client -> null);
} catch (TException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

@VisibleForTesting
Expand Down Expand Up @@ -142,9 +152,10 @@ public void execute(ExecuteAction<IMetaStoreClient, TException> action, boolean
}

@VisibleForTesting
static Key extractKey(String cacheKeys, Configuration conf) {
static Key extractKey(String clientClassName, String cacheKeys, Configuration conf) {
// generate key elements in a certain order, so that the Key instances are comparable
List<Object> elements = Lists.newArrayList();
elements.add(clientClassName);
elements.add(conf.get(HiveConf.ConfVars.METASTOREURIS.varname, ""));
elements.add(HiveCatalogOptions.IDENTIFIER);
if (cacheKeys == null || cacheKeys.isEmpty()) {
Expand Down Expand Up @@ -219,6 +230,38 @@ public List<Object> elements() {
public static Key of(List<Object> elements) {
return new Key(elements);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final Key that = (Key) o;
if (this.elements.size() != that.elements.size()) {
return false;
}
for (int i = 0; i < elements.size(); i++) {
if (!Objects.equals(this.elements.get(i), that.elements.get(i))) {
return false;
}
}
return true;
}

@Override
public int hashCode() {
int hashCode = 0;
synchronized (elements) {
for (Object p : elements) {
hashCode ^= p.hashCode();
}
}
return hashCode;
}
}

static class ConfElement {
Expand All @@ -239,6 +282,23 @@ public String value() {
return value;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
ConfElement other = (ConfElement) obj;
return Objects.equals(key, other.key) && Objects.equals(value, other.value);
}

@Override
public int hashCode() {
return Objects.hash(key, value);
}

public static ConfElement of(String key, String value) {
return new ConfElement(key, value);
}
Expand Down
Loading

0 comments on commit 7ef5091

Please sign in to comment.