Skip to content

Commit

Permalink
Fix batch deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-hotz committed Nov 11, 2024
1 parent 2c72226 commit fb3d860
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,9 @@ public void synchronizeDbWithIndex(ServiceContext context) throws Exception {
}

// remove from index metadata not in DBMS
try (BatchingDeletionSubmittor submittor = new BatchingDeletionSubmittor(docs.keySet().size())) {
for (String id : docs.keySet()) {
getSearchManager().deleteById(id, submittor);
LOGGER_DATA_MANAGER.debug("- removed record ({}) from index", id);
}
for (String id : docs.keySet()) {
getSearchManager().deleteByQuery(String.format("+id:%s", id));
LOGGER_DATA_MANAGER.debug("- removed record ({}) from index", id);
}
}

Expand Down Expand Up @@ -341,12 +339,11 @@ public void deleteMetadata(ServiceContext context, String metadataId, IDeletionS
boolean isMetadata = findOne.getDataInfo().getType() == MetadataType.METADATA;

deleteMetadataFromDB(context, metadataId);
// --- update search criteria
getSearchManager().deleteByUuid(findOne.getUuid(), submittor);
} else {
getSearchManager().deleteByQuery(String.format("+id:%s", metadataId));
}

// --- update search criteria
getSearchManager().deleteById(metadataId, submittor);
// _entityManager.flush();
// _entityManager.clear();
}

/**
Expand Down Expand Up @@ -387,7 +384,7 @@ public void purgeMetadata(ServiceContext context, String metadataId, boolean wit
@Override
public void deleteMetadataGroup(ServiceContext context, String metadataId) throws Exception {
deleteMetadataFromDB(context, metadataId);
getSearchManager().deleteById(metadataId, DirectDeletionSubmittor.INSTANCE);
getSearchManager().deleteByQuery(String.format("+id:%s", metadataId));
}

/**
Expand Down Expand Up @@ -787,7 +784,7 @@ public synchronized AbstractMetadata updateMetadata(final ServiceContext context
if (indexingMode != IndexingMode.none) {
// Delete old record if UUID changed
if (uuidBeforeUfo != null && !uuidBeforeUfo.equals(uuid)) {
getSearchManager().deleteByQuery(String.format("+uuid:\"%s\"", uuidBeforeUfo));
getSearchManager().deleteByUuid(uuidBeforeUfo, DirectDeletionSubmittor.INSTANCE);
}
metadataIndexer.indexMetadata(metadataId, DirectIndexSubmittor.INSTANCE, indexingMode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,10 @@ public void deleteMetadata(ServiceContext context, String metadataId, IDeletionS
}

deleteMetadataFromDB(context, metadataId);
getSearchManager().deleteByUuid(findOne.getUuid(), submittor);
} else {
getSearchManager().deleteByQuery(String.format("+id:%s", metadataId));
}

// --- update search criteria
getSearchManager().deleteById(metadataId, submittor);
// _entityManager.flush();
// _entityManager.clear();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ public void cancelEditingSession(ServiceContext context, String id) throws Excep

// --- remove metadata
xmlSerializer.delete(id, ServiceContext.get());
searchManager.deleteById(id, DirectDeletionSubmittor.INSTANCE);
searchManager.deleteByQuery(String.format("+id:%s", id));

// Unset METADATA_EDITING_CREATED_DRAFT flag
context.getUserSession().removeProperty(Geonet.Session.METADATA_EDITING_CREATED_DRAFT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.fao.geonet.kernel.datamanager.IMetadataUtils;
import org.fao.geonet.kernel.search.index.OverviewIndexFieldUpdater;
import org.fao.geonet.kernel.search.submission.IDeletionSubmittor;
import org.fao.geonet.kernel.search.submission.batch.BatchingDeletionSubmittor;
import org.fao.geonet.kernel.search.submission.batch.BatchingIndexSubmittor;
import org.fao.geonet.kernel.search.submission.IIndexSubmittor;
import org.fao.geonet.kernel.setting.SettingInfo;
Expand Down Expand Up @@ -874,19 +873,8 @@ public void deleteByQuery(String txt) throws Exception {
}

@Override
public void deleteById(String txt, IDeletionSubmittor submittor) throws Exception {
submittor.submitToIndex(txt, this);
}

@Override
public void delete(List<Integer> metadataIds) throws Exception {
try (BatchingDeletionSubmittor submittor = new BatchingDeletionSubmittor(metadataIds.size())) {
for (Integer id : metadataIds) {
deleteById(String.valueOf(id), submittor);
}
} catch (Exception e) {
LOGGER.error("Error while deleting metadata: {}", e.getMessage(), e);
}
public void deleteByUuid(String uuid, IDeletionSubmittor submittor) throws Exception {
submittor.submitToIndex(uuid, this);
}

public long getNumDocs(String query) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,9 @@ boolean rebuildIndex(ServiceContext context,
void deleteByQuery(String query) throws Exception;

/**
* deletes a document by its id.
* deletes a document by its uuid.
*/
void deleteById(String id, IDeletionSubmittor submittor) throws Exception;

/**
* deletes a list of documents.
*/
void delete(List<Integer> metadataIds) throws Exception;
void deleteByUuid(String uuid, IDeletionSubmittor submittor) throws Exception;

boolean isIndexWritable(String indexName) throws IOException, ElasticsearchException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ public class DirectDeletionSubmittor implements IDeletionSubmittor {
private DirectDeletionSubmittor() {}

@Override
public void submitToIndex(String id, EsSearchManager searchManager) throws IOException {
public void submitToIndex(String uuid, EsSearchManager searchManager) throws IOException {
EsRestClient restClient = searchManager.getClient();
List<String> documents = Collections.singletonList(id);
List<String> documents = Collections.singletonList(uuid);

BulkRequest bulkRequest = restClient.buildDeleteBulkRequest(searchManager.getDefaultIndex(), documents);
final BulkResponse bulkItemResponses = restClient.getClient().bulk(bulkRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

public interface IDeletionSubmittor {

void submitToIndex(String id, EsSearchManager searchManager) throws IOException;
void submitToIndex(String uuid, EsSearchManager searchManager) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,21 @@
public class BatchingDeletionSubmittor extends BatchingSubmittorBase<BatchingDeletionSubmittor.State> implements IDeletionSubmittor {

protected static class State extends StateBase {
private final List<String> listOfIDsToDelete = new ArrayList<>();
private final List<String> listOfUUIDsToDelete = new ArrayList<>();

@Override
protected void cleanUp() {
// Send any remaining pending documents
if (!listOfIDsToDelete.isEmpty()) {
deleteDocumentsFromIndex(listOfIDsToDelete);
if (!listOfUUIDsToDelete.isEmpty()) {
deleteDocumentsFromIndex(listOfUUIDsToDelete);
}
}

private void deleteDocumentsFromIndex(List<String> toDelete) {
EsRestClient restClient = searchManager.getClient();
BulkRequest bulkRequest = restClient.buildDeleteBulkRequest(searchManager.getDefaultIndex(), toDelete);
CompletableFuture<Void> currentIndexFuture = restClient.getAsyncClient().bulk(bulkRequest)
.thenAcceptAsync(bulkItemResponses -> {
searchManager.handleDeletionResponse(bulkItemResponses, toDelete);
});
.thenAcceptAsync(bulkItemResponses -> searchManager.handleDeletionResponse(bulkItemResponses, toDelete));
queueFuture(currentIndexFuture);
}
}
Expand All @@ -43,17 +41,17 @@ public BatchingDeletionSubmittor(int estimatedTotalSize) {
}

@Override
public void submitToIndex(String id, EsSearchManager searchManager) throws IOException {
public void submitToIndex(String uuid, EsSearchManager searchManager) {
if (state.closed) {
throw new IllegalStateException("Attempting to use a closed " + this.getClass().getSimpleName());
}

state.searchManager = searchManager;
List<String> listOfIDsToDelete = state.listOfIDsToDelete;
listOfIDsToDelete.add(id);
if (listOfIDsToDelete.size() >= commitInterval) {
List<String> toDelete = new ArrayList<>(listOfIDsToDelete);
listOfIDsToDelete.clear();
List<String> listOfUUIDsToDelete = state.listOfUUIDsToDelete;
listOfUUIDsToDelete.add(uuid);
if (listOfUUIDsToDelete.size() >= commitInterval) {
List<String> toDelete = new ArrayList<>(listOfUUIDsToDelete);
listOfUUIDsToDelete.clear();
state.deleteDocumentsFromIndex(toDelete);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ public void testDeleteBatchMetadata() throws Exception {

dataManager.batchDeleteMetadataAndUpdateIndex(spec);

Thread.sleep(500);

assertEquals(0, metadataRepository.findAll(spec).size());
assertEquals(0, searchManager.query(String.format("uuid:(%s OR %s)", uuid1, uuid2), null, 0, 10).hits().hits().size());
}
Expand Down
7 changes: 4 additions & 3 deletions index/src/main/java/org/fao/geonet/index/es/EsRestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,16 @@ public BulkRequest buildIndexBulkRequest(String index, Map<String, String> docs)
return requestBuilder.build();
}

public BulkRequest buildDeleteBulkRequest(String index, List<String> deletionIds) {
public BulkRequest buildDeleteBulkRequest(String index, List<String> deletionUUIDs) {
checkActivated();

BulkRequest.Builder requestBuilder = new BulkRequest.Builder()
.index(index)
.refresh(Refresh.True);

for (String deletionQuery : deletionIds) {
for (String uuid : deletionUUIDs) {
requestBuilder.operations(op -> op.delete(del -> del.index(index)
.id(deletionQuery)));
.id(uuid)));
}

return requestBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void removeDraft(MetadataDraft draft) {

// --- remove metadata
xmlSerializer.delete(String.valueOf(id), ServiceContext.get());
searchManager.deleteById(String.valueOf(id), DirectDeletionSubmittor.INSTANCE);
searchManager.deleteByUuid(draft.getUuid(), DirectDeletionSubmittor.INSTANCE);
} catch (Exception e) {
Log.error(Geonet.DATA_MANAGER, "Couldn't cleanup draft " + draft, e);
}
Expand Down

0 comments on commit fb3d860

Please sign in to comment.