Skip to content

Commit

Permalink
Smarter wait for state.
Browse files Browse the repository at this point in the history
  • Loading branch information
janvanmansum committed Dec 15, 2024
1 parent 117e2f9 commit 3c96551
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 52 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
<dependency>
<groupId>nl.knaw.dans</groupId>
<artifactId>dans-dataverse-client-lib</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>nl.knaw.dans</groupId>
Expand Down
18 changes: 11 additions & 7 deletions src/main/assembly/dist/cfg/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,18 @@ ingest:
#
maxNumberOfFilesPerUploadBatch: 1000
maxByteSizePerUploadBatch: 500MiB

#
# The service waits for the dataset to reach the released state before it continues processing the next deposit. These settings control how long the service waits,
# before giving up and marking the deposit as failed.
#
waitForReleasedState:
# maxWaitTime = 5h
# # It is not useful to check directly after sending the publication request. The publication will verify all checksums.
# initialWaitTimePerMegaByte = 10s (or initialWaitTimePerFile = 10s)
# timeBetweenChecks = 10s
# 10s * 720 = 2 hours
maxNumberOfRetries: 720
timeBetweenChecks: 10s
# Start polling for the dataset state after this time x number of files in the dataset. It is expected that releasing a dataset takes at least this amount of time.
leadTimePerFile: 200ms
# Give up waiting for the dataset to be in the expected state after this time. The lead time is *not* included in this timeout.
timeout: 3h
# The interval between polling the dataset state.
pollingInterval: 5s

#
# Settings related to the conversion of deposits from the legacy format to the format used by the ingest service. Set to null to disable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ public void run(final DdDataverseIngestConfiguration configuration, final Enviro
var dataverseService = DataverseServiceImpl.builder()
.dataverseClient(dataverseClient)
.metadataKeys(configuration.getIngest().getMetadataKeys())
.millisecondsBetweenChecks(configuration.getIngest().getWaitForReleasedState().getTimeBetweenChecks().toMilliseconds())
.maxNumberOfRetries(configuration.getIngest().getWaitForReleasedState().getMaxNumberOfRetries())
.timeout(configuration.getIngest().getWaitForReleasedState().getTimeout().toMilliseconds())
.leadTimePerFile(configuration.getIngest().getWaitForReleasedState().getLeadTimePerFile().toMilliseconds())
.pollingInterval(configuration.getIngest().getWaitForReleasedState().getPollingInterval().toMilliseconds())
.build();
var utilityServices = UtilityServicesImpl.builder()
.tempDir(configuration.getIngest().getTempDir())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

@Data
public class WaitForReleasedStateConfig {
private int maxNumberOfRetries = 10;

private Duration timeBetweenChecks = Duration.seconds(10);
private Duration timeout = Duration.minutes(30);

private Duration leadTimePerFile = Duration.seconds(5);

private Duration pollingInterval = Duration.seconds(10);

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@
import nl.knaw.dans.dvingest.core.DataverseIngestBag;
import nl.knaw.dans.dvingest.core.service.DataverseService;
import nl.knaw.dans.dvingest.core.service.UtilityServices;
import nl.knaw.dans.dvingest.core.yaml.UpdateState;
import nl.knaw.dans.dvingest.core.yaml.UpdateStateRoot;
import nl.knaw.dans.lib.dataverse.DataverseException;

import java.io.IOException;
import java.util.UUID;

/**
* Processes a bag, creating and/or editing a dataset version in Dataverse.
* Processes a bag, creating and/or editing a dataset version in Dataverse. A BagProcessor is created for each deposit.
*/
@Slf4j
public class BagProcessor {
Expand All @@ -53,7 +51,7 @@ public String run(String targetPid) throws IOException, DataverseException {
filesEditor.editFiles(targetPid);
metadataEditor.editMetadata(targetPid);
permissionsEditor.editPermissions(targetPid);
stateUpdater.updateState(targetPid);
stateUpdater.updateState(targetPid, filesEditor.getFilesInDatasetCache().getNumberOfFilesInDataset());
return targetPid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ public void downloadFromDataset(@NonNull String pid) throws IOException, Dataver
initialized = true;
}

/**
* Returns the number of files in the dataset.
*
* @return the number of files in the dataset
*/
public int getNumberOfFilesInDataset() {
return filesInDataset.size();
}

private String getPath(@NonNull FileMeta file) {
var dataversePath = new DataversePath(file.getDirectoryLabel(), file.getLabel());
return dataversePath.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@
public class StateUpdater {
private final UUID depositId;
private final UpdateAction updateAction;

private final DataverseService dataverseService;

private String pid;
private int numberOfFilesInDataset;

public void updateState(String pid) throws DataverseException, IOException {
public void updateState(String pid, int numberOfFilesInDataset) throws DataverseException, IOException {
this.pid = pid;
this.numberOfFilesInDataset = numberOfFilesInDataset;

if (updateAction instanceof PublishAction) {
publishVersion(((PublishAction) updateAction).getUpdateType());
}
Expand All @@ -49,14 +53,14 @@ else if (updateAction instanceof ReleaseMigratedAction) {
private void publishVersion(UpdateType updateType) throws DataverseException, IOException {
log.debug("Start publishing version for deposit {}", depositId);
dataverseService.publishDataset(pid, updateType);
dataverseService.waitForState(pid, "RELEASED");
dataverseService.waitForReleasedState(pid, numberOfFilesInDataset);
log.debug("End publishing version for deposit {}", depositId);
}

public void releaseMigrated(String date) throws DataverseException, IOException {
log.debug("Start releasing migrated version for deposit {}", depositId);
dataverseService.releaseMigratedDataset(pid, date);
dataverseService.waitForState(pid, "RELEASED");
dataverseService.waitForReleasedState(pid, numberOfFilesInDataset);
log.debug("End releasing migrated version for deposit {}", depositId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ public interface DataverseService {

String getDatasetUrnNbn(String datasetId) throws IOException, DataverseException;

void waitForState(String persistentId, String state) throws DataverseException;

void updateMetadata(String targetDatasetPid, DatasetVersion datasetMetadata) throws DataverseException, IOException;

void updateFileMetadata(int id, FileMeta newMeta) throws DataverseException, IOException;
Expand Down Expand Up @@ -80,4 +78,6 @@ public interface DataverseService {
void importDataset(String pid, Dataset dataset) throws IOException, DataverseException;

void releaseMigratedDataset(String pid, String date) throws DataverseException, IOException;

void waitForReleasedState(String persistentId, int numberOfFilesInDataset) throws DataverseException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@ public class DataverseServiceImpl implements DataverseService {
private final DataverseClient dataverseClient;

@Builder.Default
private int maxNumberOfRetries = 10;
private long pollingInterval = 3000; // 3 seconds

@Builder.Default
private long millisecondsBetweenChecks = 3000;
private long leadTimePerFile = 200;

@Builder.Default
private long timeout = 1800000; // 30 minutes

@Builder.Default
private Map<String, String> metadataKeys = new HashMap<>();
Expand Down Expand Up @@ -231,35 +234,15 @@ public void releaseMigratedDataset(String pid, String date) throws DataverseExce
log.debug(result.getEnvelopeAsString());
}

// TODO: move this to dans-dataverse-client-lib; it is similar to awaitLockState.
public void waitForState(String datasetId, String expectedState) {
var numberOfTimesTried = 0;
var state = "";

public void waitForReleasedState(String pid, int numberOfFilesInDataset) throws DataverseException, IOException {
long leadTime = numberOfFilesInDataset * leadTimePerFile;
log.debug("Waiting {} ms before first check", leadTime);
try {
state = getDatasetState(datasetId);
log.debug("Initial state for dataset {} is {}", datasetId, state);
while (!expectedState.equals(state) && numberOfTimesTried < maxNumberOfRetries) {
log.debug("Sleeping for {} milliseconds before checking again", millisecondsBetweenChecks);
Thread.sleep(millisecondsBetweenChecks);

state = getDatasetState(datasetId);
numberOfTimesTried += 1;
log.debug("Current state for dataset {} is {}, tried {} of {} times", datasetId, state, numberOfTimesTried, maxNumberOfRetries);
}

if (!expectedState.equals(state)) {
throw new IllegalStateException(String.format(
"Dataset did not become %s within the wait period (%d seconds); current state is %s",
expectedState, (maxNumberOfRetries * millisecondsBetweenChecks), state
));
}
Thread.sleep(leadTime);
}
catch (InterruptedException e) {
throw new RuntimeException("Dataset state check was interrupted; last know state is " + state);
}
catch (IOException | DataverseException e) {
throw new RuntimeException(e);
log.error("Interrupted during lead time. Continuing", e);
}
dataverseClient.dataset(pid).awaitState("RELEASED", timeout, pollingInterval);
}
}
14 changes: 11 additions & 3 deletions src/test/resources/debug-etc/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,18 @@ ingest:
# See: https://guides.dataverse.org/en/latest/installation/config.html#zipuploadfileslimit
#
maxNumberOfFilesPerUploadBatch: 1000

#
# The service waits for the dataset to reach the released state before it continues processing the next deposit. These settings control how long the service waits,
# before giving up and marking the deposit as failed.
#
waitForReleasedState:
# 10s * 360 = 1 hour
maxNumberOfRetries: 360
timeBetweenChecks: 2s
# Start polling for the dataset state after this time x number of files in the dataset. It is expected that releasing a dataset takes at least this amount of time.
leadTimePerFile: 200ms
# Give up waiting for the dataset to be in the expected state after this time. The lead time is *not* included in this timeout.
timeout: 5m
# The interval between polling the dataset state.
pollingInterval: 1s

#
# Settings related to the conversion of deposits from the legacy format to the format used by the ingest service. Set to null to disable.
Expand Down

0 comments on commit 3c96551

Please sign in to comment.