Skip to content

Commit

Permalink
HDFS-17529. RBF: Improve router state store cache entry deletion (#6833)
Browse files Browse the repository at this point in the history
  • Loading branch information
Felix Nguyen authored May 24, 2024
1 parent d168d3f commit f5c5d35
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -172,7 +173,7 @@ private boolean isUpdateTime() {
*/
public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
List<R> commitRecords = new ArrayList<>();
List<R> deleteRecords = new ArrayList<>();
List<R> toDeleteRecords = new ArrayList<>();
List<R> newRecords = query.getRecords();
long currentDriverTime = query.getTimestamp();
if (newRecords == null || currentDriverTime <= 0) {
Expand All @@ -182,13 +183,8 @@ public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
for (R record : newRecords) {
if (record.shouldBeDeleted(currentDriverTime)) {
String recordName = StateStoreUtils.getRecordName(record.getClass());
if (getDriver().remove(record)) {
deleteRecords.add(record);
LOG.info("Deleted State Store record {}: {}", recordName, record);
} else {
LOG.warn("Couldn't delete State Store record {}: {}", recordName,
record);
}
LOG.info("State Store record to delete {}: {}", recordName, record);
toDeleteRecords.add(record);
} else if (!record.isExpired() && record.checkExpired(currentDriverTime)) {
String recordName = StateStoreUtils.getRecordName(record.getClass());
LOG.info("Override State Store record {}: {}", recordName, record);
Expand All @@ -198,8 +194,12 @@ public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
if (commitRecords.size() > 0) {
getDriver().putAll(commitRecords, true, false);
}
if (deleteRecords.size() > 0) {
newRecords.removeAll(deleteRecords);
if (!toDeleteRecords.isEmpty()) {
for (Map.Entry<R, Boolean> entry : getDriver().removeMultiple(toDeleteRecords).entrySet()) {
if (entry.getValue()) {
newRecords.remove(entry.getKey());
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -127,6 +128,17 @@ <T extends BaseRecord> StateStoreOperationResult putAll(
@AtMostOnce
<T extends BaseRecord> boolean remove(T record) throws IOException;

/**
* Remove multiple records.
*
* @param <T> Record class of the records.
* @param records Records to be removed.
* @return Map of record to a boolean indicating if the record has being removed successfully.
* @throws IOException Throws exception if unable to query the data store.
*/
@AtMostOnce
<T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> records) throws IOException;

/**
* Remove all records of this class from the store.
*
Expand All @@ -152,4 +164,17 @@ <T extends BaseRecord> StateStoreOperationResult putAll(
<T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
throws IOException;

/**
* Remove all records of a specific class that match any query in a list of queries.
* Requires the getAll implementation to fetch fresh records on each call.
*
* @param clazz The class to match the records with.
* @param queries Queries (logical OR) to filter what to remove.
* @param <T> Record class of the records.
* @return Map of query to number of records removed by that query.
* @throws IOException Throws exception if unable to query the data store.
*/
@AtMostOnce
<T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz, List<Query<T>> queries)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record) throws IOException {
Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
return remove(recordClass, query) == 1;
}

@Override
public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> records) throws IOException {
assert !records.isEmpty();
// Fall back to iterative remove() calls if all records don't share 1 class
Class<? extends BaseRecord> expectedClazz = records.get(0).getClass();
if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) {
Map<T, Boolean> result = new HashMap<>();
for (T record : records) {
result.put(record, remove(record));
}
return result;
}

final List<Query<T>> queries = new ArrayList<>();
for (T record : records) {
queries.add(new Query<>(record));
}
@SuppressWarnings("unchecked")
Class<T> recordClass = (Class<T>) StateStoreUtils.getRecordClass(expectedClazz);
Map<Query<T>, Integer> result = remove(recordClass, queries);
return result.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().getPartial(), e -> e.getValue() == 1));
}

public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz,
List<Query<T>> queries) throws IOException {
Map<Query<T>, Integer> result = new HashMap<>();
for (Query<T> query : queries) {
result.put(query, remove(clazz, query));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -284,51 +288,86 @@ public <T extends BaseRecord> StateStoreOperationResult putAll(
}

@Override
public <T extends BaseRecord> int remove(
Class<T> clazz, Query<T> query) throws IOException {
public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz,
List<Query<T>> queries) throws IOException {
verifyDriverReady();
if (query == null) {
return 0;
// Track how many entries are deleted by each query
Map<Query<T>, Integer> ret = new HashMap<>();
final List<T> trueRemoved = Collections.synchronizedList(new ArrayList<>());
if (queries.isEmpty()) {
return ret;
}

// Read the current data
long start = monotonicNow();
List<T> records = null;
List<T> records;
try {
QueryResult<T> result = get(clazz);
records = result.getRecords();
} catch (IOException ex) {
LOG.error("Cannot get existing records", ex);
getMetrics().addFailure(monotonicNow() - start);
return 0;
return ret;
}

// Check the records to remove
String znode = getZNodeForClass(clazz);
List<T> recordsToRemove = filterMultiple(query, records);
Set<T> recordsToRemove = new HashSet<>();
Map<Query<T>, List<T>> queryToRecords = new HashMap<>();
for (Query<T> query : queries) {
List<T> filtered = filterMultiple(query, records);
queryToRecords.put(query, filtered);
recordsToRemove.addAll(filtered);
}

// Remove the records
int removed = 0;
for (T existingRecord : recordsToRemove) {
List<Callable<Void>> callables = new ArrayList<>();
recordsToRemove.forEach(existingRecord -> callables.add(() -> {
LOG.info("Removing \"{}\"", existingRecord);
try {
String primaryKey = getPrimaryKey(existingRecord);
String path = getNodePath(znode, primaryKey);
if (zkManager.delete(path)) {
removed++;
trueRemoved.add(existingRecord);
} else {
LOG.error("Did not remove \"{}\"", existingRecord);
}
} catch (Exception e) {
LOG.error("Cannot remove \"{}\"", existingRecord, e);
getMetrics().addFailure(monotonicNow() - start);
}
return null;
}));
try {
if (enableConcurrent) {
executorService.invokeAll(callables);
} else {
for (Callable<Void> callable : callables) {
callable.call();
}
}
} catch (Exception e) {
LOG.error("Record removal failed : {}", e.getMessage(), e);
}
long end = monotonicNow();
if (removed > 0) {
if (!trueRemoved.isEmpty()) {
getMetrics().addRemove(end - start);
}
return removed;
// Generate return map
for (Map.Entry<Query<T>, List<T>> entry : queryToRecords.entrySet()) {
for (T record : entry.getValue()) {
if (trueRemoved.contains(record)) {
ret.compute(entry.getKey(), (k, v) -> (v == null) ? 1 : v + 1);
}
}
}
return ret;
}

@Override
public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
throws IOException {
return remove(clazz, Collections.singletonList(query)).get(query);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,28 @@ public void testAsyncPerformance() throws Exception {
insertList.add(newRecord);
}
// Insert Multiple on sync mode
long startSync = Time.now();
long startSyncPut = Time.now();
stateStoreDriver.putAll(insertList, true, false);
long endSync = Time.now();
long endSyncPut = Time.now();
// Removing 1000 records synchronously is painfully slow so test with only 5 records
// Then remove the rest with removeAll()
long startSyncRemove = Time.now();
for (MountTable entry : insertList.subList(0, 5)) {
stateStoreDriver.remove(entry);
}
long endSyncRemove = Time.now();
stateStoreDriver.removeAll(MembershipState.class);

stateStoreDriver.setEnableConcurrent(true);
// Insert Multiple on async mode
long startAsync = Time.now();
long startAsyncPut = Time.now();
stateStoreDriver.putAll(insertList, true, false);
long endAsync = Time.now();
assertTrue((endSync - startSync) > (endAsync - startAsync));
long endAsyncPut = Time.now();
long startAsyncRemove = Time.now();
stateStoreDriver.removeMultiple(insertList.subList(0, 5));
long endAsyncRemove = Time.now();
assertTrue((endSyncPut - startSyncPut) > (endAsyncPut - startAsyncPut));
assertTrue((endSyncRemove - startSyncRemove) > (endAsyncRemove - startAsyncRemove));
}

@Test
Expand Down

0 comments on commit f5c5d35

Please sign in to comment.