diff --git a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java index 7fa42c266..f548aa135 100644 --- a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java +++ b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java @@ -201,7 +201,7 @@ public static synchronized ComputerOptions instance() { public static final ConfigOption SNAPSHOT_WRITE = new ConfigOption<>( "snapshot.write", - "Whether write snapshot of input vertex and edge partitions", + "Whether to write snapshot of input vertex/edge partitions.", allowValues(true, false), false ); @@ -209,15 +209,15 @@ public static synchronized ComputerOptions instance() { public static final ConfigOption SNAPSHOT_LOAD = new ConfigOption<>( "snapshot.load", - "Whether use snapshot of input vertex and edge partitions", + "Whether to load from snapshot of vertex/edge partitions.", allowValues(true, false), false ); - public static final ConfigOption SNAPSHOT_VIEW_KEY = + public static final ConfigOption SNAPSHOT_NAME = new ConfigOption<>( - "snapshot.view_key", - "View key of target snapshot", + "snapshot.name", + "The user-defined snapshot name.", null, "" ); @@ -225,7 +225,7 @@ public static synchronized ComputerOptions instance() { public static final ConfigOption SNAPSHOT_MINIO_ENDPOINT = new ConfigOption<>( "snapshot.minio_endpoint", - "MinIO endpoint", + "The endpoint of MinIO, MinIO can be used to store snapshots.", null, "" ); @@ -233,7 +233,7 @@ public static synchronized ComputerOptions instance() { public static final ConfigOption SNAPSHOT_MINIO_ACCESS_KEY = new ConfigOption<>( "snapshot.minio_access_key", - "MinIO access key", + "The access key of MinIO.", null, "" ); @@ -241,7 +241,7 @@ public static synchronized ComputerOptions instance() { public static final ConfigOption SNAPSHOT_MINIO_SECRET_KEY = new ConfigOption<>( "snapshot.minio_secret_key", - "MinIO secret key", + "The secret key of MinIO.", null, "" ); @@ -249,7 +249,7 @@ public static synchronized ComputerOptions instance() { public static final ConfigOption SNAPSHOT_MINIO_BUCKET_NAME = new ConfigOption<>( "snapshot.minio_bucket_name", - "MinIO bucket name", + "The bucket name of MinIO.", null, "" ); diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java index a2f37c259..54237d4f3 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java @@ -91,10 +91,14 @@ public void init(Config config) { SuperstepFileGenerator fileGenerator = new SuperstepFileGenerator( this.fileManager, Constants.INPUT_SUPERSTEP); - this.vertexPartitions = new VertexMessageRecvPartitions( - this.context, fileGenerator, this.sortManager, this.snapshotManager); - this.edgePartitions = new EdgeMessageRecvPartitions( - this.context, fileGenerator, this.sortManager, this.snapshotManager); + this.vertexPartitions = new VertexMessageRecvPartitions(this.context, + fileGenerator, + this.sortManager, + this.snapshotManager); + this.edgePartitions = new EdgeMessageRecvPartitions(this.context, + fileGenerator, + this.sortManager, + this.snapshotManager); this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT); // One for vertex and one for edge. this.expectedFinishMessages = this.workerCount * 2; @@ -109,8 +113,10 @@ public void init(Config config) { public void beforeSuperstep(Config config, int superstep) { SuperstepFileGenerator fileGenerator = new SuperstepFileGenerator( this.fileManager, superstep); - this.messagePartitions = new ComputeMessageRecvPartitions( - this.context, fileGenerator, this.sortManager, this.snapshotManager); + this.messagePartitions = new ComputeMessageRecvPartitions(this.context, + fileGenerator, + this.sortManager, + this.snapshotManager); this.expectedFinishMessages = this.workerCount; this.finishMessagesFuture = new CompletableFuture<>(); this.finishMessagesCount.set(this.expectedFinishMessages); diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java index b63935828..4734ee5aa 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java @@ -61,7 +61,7 @@ public class SnapshotManager implements Manager { private final int partitionCount; private final boolean loadSnapshot; private final boolean writeSnapshot; - private final String viewKey; + private final String snapshotName; private MinioClient minioClient; private String bucketName; @@ -78,7 +78,7 @@ public SnapshotManager(ComputerContext context, MessageSendManager sendManager, this.workerInfo = workerInfo; this.partitioner = context.config().createObject(ComputerOptions.WORKER_PARTITIONER); this.partitionCount = context.config().get(ComputerOptions.JOB_PARTITIONS_COUNT); - this.viewKey = context.config().get(ComputerOptions.SNAPSHOT_VIEW_KEY); + this.snapshotName = context.config().get(ComputerOptions.SNAPSHOT_NAME); } @Override @@ -243,8 +243,8 @@ private void clearObjectsIfExist(String dirName) throws Exception { } private String generateObjectDirName(MessageType messageType, int partitionId) { - // dir name: {VIEW_KEY}/{PARTITIONER}/{PARTITION_COUNT}/VERTEX/{PARTITION_ID}/ - Path path = Paths.get(this.viewKey, + // dir name: {SNAPSHOT_NAME}/{PARTITIONER}/{PARTITION_COUNT}/VERTEX/{PARTITION_ID}/ + Path path = Paths.get(this.snapshotName, this.partitioner.getClass().getSimpleName(), String.valueOf(this.partitionCount), messageType.name(), diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java index a43baef84..63f0c5b2d 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java @@ -106,9 +106,9 @@ public void setup() { sortManager); this.managers.add(receiveManager); SnapshotManager snapshotManager = new SnapshotManager(context(), - null, + null, receiveManager, - null); + null); this.managers.add(snapshotManager); this.managers.initAll(this.config); this.connectionId = new ConnectionId(new InetSocketAddress("localhost", @@ -231,7 +231,7 @@ private static void addMessages(Consumer consumer) message.add(id); ReceiverUtil.consumeBuffer(ReceiverUtil.writeMessage(id, message), - consumer); + consumer); } } } diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java index b6c427596..f164d868b 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java @@ -139,9 +139,9 @@ private void testEdgeFreq(EdgeFrequency freq) sortManager); this.managers.add(receiveManager); SnapshotManager snapshotManager = new SnapshotManager(context(), - null, + null, receiveManager, - null); + null); this.managers.add(snapshotManager); this.managers.initAll(this.config); ConnectionId connectionId = new ConnectionId(new InetSocketAddress( diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java index 3889f80f7..e2ad3de58 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java @@ -92,9 +92,9 @@ public void setup() { sortManager); this.managers.add(receiveManager); SnapshotManager snapshotManager = new SnapshotManager(context(), - null, + null, receiveManager, - null); + null); this.managers.add(snapshotManager); this.managers.initAll(this.config); this.connectionId = new ConnectionId(new InetSocketAddress("localhost", diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java index 6c8673c54..cf8e3e611 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java @@ -57,9 +57,9 @@ public void test() { fileManager, sortManager); SnapshotManager snapshotManager = new SnapshotManager(context(), - null, + null, recvManager, - null); + null); recvManager.init(config); ConnectionManager connManager = new TransportConnectionManager(); DataServerManager serverManager = new DataServerManager(connManager, diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java index 8887b7395..b423d3861 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java @@ -76,9 +76,9 @@ public void setup() { this.fileManager, this.sortManager); this.snapshotManager = new SnapshotManager(context(), - null, + null, receiveManager, - null); + null); this.receiveManager.init(this.config); this.connectionId = new ConnectionId( new InetSocketAddress("localhost",8081),