diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java index 8956ded90..da5b49841 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java @@ -42,6 +42,7 @@ import org.slf4j.Logger; import java.io.File; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.LinkedList; import java.util.List; @@ -75,8 +76,7 @@ public SnapshotManager(ComputerContext context, MessageSendManager sendManager, this.recvManager.setSnapshotManager(this); this.workerInfo = workerInfo; - this.partitioner = context.config().createObject( - ComputerOptions.WORKER_PARTITIONER); + this.partitioner = context.config().createObject(ComputerOptions.WORKER_PARTITIONER); this.partitionCount = context.config().get(ComputerOptions.JOB_PARTITIONS_COUNT); this.viewKey = context.config().get(ComputerOptions.SNAPSHOT_VIEW_KEY); } @@ -94,9 +94,9 @@ public void init(Config config) { this.bucketName = config.get(ComputerOptions.SNAPSHOT_MINIO_BUCKET_NAME); if (StringUtils.isNotEmpty(endpoint)) { this.minioClient = MinioClient.builder() - .endpoint(endpoint) - .credentials(accessKey, secretKey) - .build(); + .endpoint(endpoint) + .credentials(accessKey, secretKey) + .build(); } } @@ -116,7 +116,7 @@ public boolean writeSnapshot() { public void upload(MessageType messageType, int partitionId, List outputFiles) { if (this.loadSnapshot()) { LOG.info("No later {} snapshots have to be uploaded", - messageType.name().toLowerCase(Locale.ROOT)); + messageType.name().toLowerCase(Locale.ROOT)); return; } this.uploadObjects(messageType, partitionId, outputFiles); @@ -149,7 +149,7 @@ private void uploadObjects(MessageType messageType, int partitionId, } LOG.info("Upload {} snapshots for partition {}", - messageType.name().toLowerCase(Locale.ROOT), partitionId); + messageType.name().toLowerCase(Locale.ROOT), partitionId); for (String outputFile : outputFiles) { String objectName = dirName + new File(outputFile).getName(); this.uploadObject(outputFile, objectName); @@ -164,9 +164,9 @@ private void downloadObjects(MessageType messageType, int partitionId) { try { Iterable> snapshotFiles = this.minioClient.listObjects( ListObjectsArgs.builder() - .bucket(this.bucketName) - .prefix(dirName) - .build()); + .bucket(this.bucketName) + .prefix(dirName) + .build()); if (!snapshotFiles.iterator().hasNext()) { throw new ComputerException("Empty snapshot directory %s", dirName); @@ -184,34 +184,33 @@ private void downloadObjects(MessageType messageType, int partitionId) { this.recvManager.handle(messageType, partitionId, fileRegionBuffer); } } catch (Exception e) { - throw new ComputerException("Download snapshots from %s failed", dirName, e); + throw new ComputerException("Failed to download snapshots from %s", dirName, e); } } private void uploadObject(String fileName, String objectName) { try { this.minioClient.uploadObject(UploadObjectArgs.builder() - .bucket(this.bucketName) - .object(objectName) - .filename(fileName) - .build()); + .bucket(this.bucketName) + .object(objectName) + .filename(fileName) + .build()); } catch (Exception e) { - throw new ComputerException("Upload snapshot %s to %s failed", - fileName, objectName, e); + throw new ComputerException("Failed to upload snapshot %s to %s", + fileName, objectName, e); } } private void downloadObject(String objectName, String outputPath) { try { - this.minioClient.downloadObject( - DownloadObjectArgs.builder() - .bucket(this.bucketName) - .object(objectName) - .filename(outputPath) - .build()); + this.minioClient.downloadObject(DownloadObjectArgs.builder() + .bucket(this.bucketName) + .object(objectName) + .filename(outputPath) + .build()); } catch (Exception e) { - throw new ComputerException("Download snapshot from %s to %s failed", - objectName, outputPath, e); + throw new ComputerException("Failed to download snapshot from %s to %s", + objectName, outputPath, e); } } @@ -219,9 +218,9 @@ private void clearObjectsIfExist(String dirName) throws Exception { List objects = new LinkedList<>(); Iterable> snapshotFiles = this.minioClient.listObjects( ListObjectsArgs.builder() - .bucket(this.bucketName) - .prefix(dirName) - .build()); + .bucket(this.bucketName) + .prefix(dirName) + .build()); if (!snapshotFiles.iterator().hasNext()) { return; } @@ -232,25 +231,24 @@ private void clearObjectsIfExist(String dirName) throws Exception { objects.add(new DeleteObject(item.objectName())); } Iterable> results = - minioClient.removeObjects( - RemoveObjectsArgs.builder() - .bucket(this.bucketName) - .objects(objects) - .build()); + minioClient.removeObjects(RemoveObjectsArgs.builder() + .bucket(this.bucketName) + .objects(objects) + .build()); for (Result result : results) { DeleteError error = result.get(); - throw new ComputerException ( - "Error in deleting snapshot " + error.objectName() + "; " + error.message()); + throw new ComputerException("Failed to delete snapshot %s, error message: %s", + error.objectName(), error.message()); } } private String generateObjectDirName(MessageType messageType, int partitionId) { // dir name: {VIEW_KEY}/{PARTITIONER}/{PARTITION_COUNT}/VERTEX/{PARTITION_ID}/ - return Paths.get( - this.viewKey, - this.partitioner.getClass().getSimpleName(), - String.valueOf(this.partitionCount), - messageType.name(), - String.valueOf(partitionId)).toString() + "/"; + Path path = Paths.get(this.viewKey, + this.partitioner.getClass().getSimpleName(), + String.valueOf(this.partitionCount), + messageType.name(), + String.valueOf(partitionId)); + return path + "/"; } }