Skip to content

Commit

Permalink
fix format
Browse files Browse the repository at this point in the history
  • Loading branch information
Radeity committed Sep 25, 2023
1 parent 9a2fdb1 commit 8b6000c
Showing 1 changed file with 40 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.slf4j.Logger;

import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -75,8 +76,7 @@ public SnapshotManager(ComputerContext context, MessageSendManager sendManager,
this.recvManager.setSnapshotManager(this);

this.workerInfo = workerInfo;
this.partitioner = context.config().createObject(
ComputerOptions.WORKER_PARTITIONER);
this.partitioner = context.config().createObject(ComputerOptions.WORKER_PARTITIONER);
this.partitionCount = context.config().get(ComputerOptions.JOB_PARTITIONS_COUNT);
this.viewKey = context.config().get(ComputerOptions.SNAPSHOT_VIEW_KEY);
}
Expand All @@ -94,9 +94,9 @@ public void init(Config config) {
this.bucketName = config.get(ComputerOptions.SNAPSHOT_MINIO_BUCKET_NAME);
if (StringUtils.isNotEmpty(endpoint)) {
this.minioClient = MinioClient.builder()
.endpoint(endpoint)
.credentials(accessKey, secretKey)
.build();
.endpoint(endpoint)
.credentials(accessKey, secretKey)
.build();
}
}

Expand All @@ -116,7 +116,7 @@ public boolean writeSnapshot() {
public void upload(MessageType messageType, int partitionId, List<String> outputFiles) {
if (this.loadSnapshot()) {
LOG.info("No later {} snapshots have to be uploaded",
messageType.name().toLowerCase(Locale.ROOT));
messageType.name().toLowerCase(Locale.ROOT));
return;
}
this.uploadObjects(messageType, partitionId, outputFiles);
Expand Down Expand Up @@ -145,11 +145,11 @@ private void uploadObjects(MessageType messageType, int partitionId,
try {
this.clearObjectsIfExist(dirName);
} catch (Exception e) {
throw new ComputerException("Clear out-dated snapshots from %s failed", dirName, e);
throw new ComputerException("Failed to clear out-dated snapshots from %s", dirName, e);
}

LOG.info("Upload {} snapshots for partition {}",
messageType.name().toLowerCase(Locale.ROOT), partitionId);
messageType.name().toLowerCase(Locale.ROOT), partitionId);
for (String outputFile : outputFiles) {
String objectName = dirName + new File(outputFile).getName();
this.uploadObject(outputFile, objectName);
Expand All @@ -164,9 +164,9 @@ private void downloadObjects(MessageType messageType, int partitionId) {
try {
Iterable<Result<Item>> snapshotFiles = this.minioClient.listObjects(
ListObjectsArgs.builder()
.bucket(this.bucketName)
.prefix(dirName)
.build());
.bucket(this.bucketName)
.prefix(dirName)
.build());

if (!snapshotFiles.iterator().hasNext()) {
throw new ComputerException("Empty snapshot directory %s", dirName);
Expand All @@ -184,44 +184,43 @@ private void downloadObjects(MessageType messageType, int partitionId) {
this.recvManager.handle(messageType, partitionId, fileRegionBuffer);
}
} catch (Exception e) {
throw new ComputerException("Download snapshots from %s failed", dirName, e);
throw new ComputerException("Failed to download snapshots from %s", dirName, e);
}
}

private void uploadObject(String fileName, String objectName) {
try {
this.minioClient.uploadObject(UploadObjectArgs.builder()
.bucket(this.bucketName)
.object(objectName)
.filename(fileName)
.build());
.bucket(this.bucketName)
.object(objectName)
.filename(fileName)
.build());
} catch (Exception e) {
throw new ComputerException("Upload snapshot %s to %s failed",
fileName, objectName, e);
throw new ComputerException("Failed to upload snapshot %s to %s",
fileName, objectName, e);
}
}

private void downloadObject(String objectName, String outputPath) {
try {
this.minioClient.downloadObject(
DownloadObjectArgs.builder()
.bucket(this.bucketName)
.object(objectName)
.filename(outputPath)
.build());
this.minioClient.downloadObject(DownloadObjectArgs.builder()
.bucket(this.bucketName)
.object(objectName)
.filename(outputPath)
.build());
} catch (Exception e) {
throw new ComputerException("Download snapshot from %s to %s failed",
objectName, outputPath, e);
throw new ComputerException("Failed to download snapshot from %s to %s",
objectName, outputPath, e);
}
}

private void clearObjectsIfExist(String dirName) throws Exception {
List<DeleteObject> objects = new LinkedList<>();
Iterable<Result<Item>> snapshotFiles = this.minioClient.listObjects(
ListObjectsArgs.builder()
.bucket(this.bucketName)
.prefix(dirName)
.build());
.bucket(this.bucketName)
.prefix(dirName)
.build());
if (!snapshotFiles.iterator().hasNext()) {
return;
}
Expand All @@ -232,25 +231,24 @@ private void clearObjectsIfExist(String dirName) throws Exception {
objects.add(new DeleteObject(item.objectName()));
}
Iterable<Result<DeleteError>> results =
minioClient.removeObjects(
RemoveObjectsArgs.builder()
.bucket(this.bucketName)
.objects(objects)
.build());
minioClient.removeObjects(RemoveObjectsArgs.builder()
.bucket(this.bucketName)
.objects(objects)
.build());
for (Result<DeleteError> result : results) {
DeleteError error = result.get();
throw new ComputerException (
"Error in deleting snapshot " + error.objectName() + "; " + error.message());
throw new ComputerException("Failed to delete snapshot %s, error message: %s",
error.objectName(), error.message());
}
}

private String generateObjectDirName(MessageType messageType, int partitionId) {
// dir name: {VIEW_KEY}/{PARTITIONER}/{PARTITION_COUNT}/VERTEX/{PARTITION_ID}/
return Paths.get(
this.viewKey,
this.partitioner.getClass().getSimpleName(),
String.valueOf(this.partitionCount),
messageType.name(),
String.valueOf(partitionId)).toString() + "/";
Path path = Paths.get(this.viewKey,
this.partitioner.getClass().getSimpleName(),
String.valueOf(this.partitionCount),
messageType.name(),
String.valueOf(partitionId));
return path + "/";
}
}

0 comments on commit 8b6000c

Please sign in to comment.