From 2e45cd2983436172589802b35a6c2715a1af90bc Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Sun, 22 Oct 2023 11:54:56 +0800 Subject: [PATCH] feat(core): support load vertex/edge snapshot (#269) --- .../computer/core/config/ComputerOptions.java | 56 ++++ computer-core/pom.xml | 5 + .../core/input/WorkerInputManager.java | 21 +- .../core/network/buffer/FileRegionBuffer.java | 5 + .../core/receiver/MessageRecvManager.java | 24 +- .../core/receiver/MessageRecvPartition.java | 4 + .../core/receiver/MessageRecvPartitions.java | 19 +- .../edge/EdgeMessageRecvPartitions.java | 16 +- .../message/ComputeMessageRecvPartitions.java | 13 +- .../vertex/VertexMessageRecvPartitions.java | 16 +- .../core/snapshot/SnapshotManager.java | 254 ++++++++++++++++++ .../computer/core/worker/WorkerService.java | 10 +- computer-dist/release-docs/LICENSE | 1 + .../release-docs/licenses/LICENSE-minio.txt | 202 ++++++++++++++ computer-test/pom.xml | 4 + .../core/compute/ComputeManagerTest.java | 8 +- .../core/compute/input/EdgesInputTest.java | 6 + .../core/compute/input/MessageInputTest.java | 6 + .../core/network/DataServerManagerTest.java | 5 + .../core/receiver/MessageRecvManagerTest.java | 6 + pom.xml | 1 + 21 files changed, 657 insertions(+), 25 deletions(-) create mode 100644 computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java create mode 100644 computer-dist/release-docs/licenses/LICENSE-minio.txt diff --git a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java index cce7447dc..43f3b6ca6 100644 --- a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java +++ b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java @@ -198,6 +198,62 @@ public static synchronized ComputerOptions instance() { "" ); + public static final ConfigOption SNAPSHOT_WRITE = + new ConfigOption<>( + "snapshot.write", + "Whether to write snapshot of input vertex/edge partitions.", + allowValues(true, false), + false + ); + + public static final ConfigOption SNAPSHOT_LOAD = + new ConfigOption<>( + "snapshot.load", + "Whether to load from snapshot of vertex/edge partitions.", + allowValues(true, false), + false + ); + + public static final ConfigOption SNAPSHOT_NAME = + new ConfigOption<>( + "snapshot.name", + "The user-defined snapshot name.", + null, + "" + ); + + public static final ConfigOption SNAPSHOT_MINIO_ENDPOINT = + new ConfigOption<>( + "snapshot.minio_endpoint", + "The endpoint of MinIO, MinIO can be used to store snapshots.", + null, + "" + ); + + public static final ConfigOption SNAPSHOT_MINIO_ACCESS_KEY = + new ConfigOption<>( + "snapshot.minio_access_key", + "The access key of MinIO.", + null, + "" + ); + + public static final ConfigOption SNAPSHOT_MINIO_SECRET_KEY = + new ConfigOption<>( + "snapshot.minio_secret_key", + "The secret key of MinIO.", + null, + "" + ); + + public static final ConfigOption SNAPSHOT_MINIO_BUCKET_NAME = + new ConfigOption<>( + "snapshot.minio_bucket_name", + "The bucket name of MinIO.", + null, + "" + ); + public static final ConfigOption INPUT_SEND_THREAD_NUMS = new ConfigOption<>( "input.send_thread_nums", diff --git a/computer-core/pom.xml b/computer-core/pom.xml index 6344acbb2..ee53458e6 100644 --- a/computer-core/pom.xml +++ b/computer-core/pom.xml @@ -60,6 +60,11 @@ org.apache.hadoop hadoop-common + + io.minio + minio + ${minio-version} + org.apache.hadoop hadoop-hdfs-client diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java index c5e4a766c..1a3cd2c86 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java @@ -34,6 +34,7 @@ import org.apache.hugegraph.computer.core.network.message.MessageType; import org.apache.hugegraph.computer.core.rpc.InputSplitRpcService; import org.apache.hugegraph.computer.core.sender.MessageSendManager; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.worker.load.LoadService; import org.apache.hugegraph.util.ExecutorUtil; import org.apache.hugegraph.util.Log; @@ -60,9 +61,13 @@ public class WorkerInputManager implements Manager { */ private final MessageSendManager sendManager; + private final SnapshotManager snapshotManager; + public WorkerInputManager(ComputerContext context, - MessageSendManager sendManager) { + MessageSendManager sendManager, + SnapshotManager snapshotManager) { this.sendManager = sendManager; + this.snapshotManager = snapshotManager; this.sendThreadNum = this.inputSendThreadNum(context.config()); this.sendExecutor = ExecutorUtil.newFixedThreadPool(this.sendThreadNum, PREFIX); @@ -103,11 +108,17 @@ public void service(InputSplitRpcService rpcService) { * but there is no guarantee that all of them has been received. */ public void loadGraph() { + if (this.snapshotManager.loadSnapshot()) { + this.snapshotManager.load(); + return; + } + List> futures = new ArrayList<>(); CompletableFuture future; this.sendManager.startSend(MessageType.VERTEX); for (int i = 0; i < this.sendThreadNum; i++) { - future = send(this.sendManager::sendVertex, this.loadService::createIteratorFromVertex); + future = this.send(this.sendManager::sendVertex, + this.loadService::createIteratorFromVertex); futures.add(future); } CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> { @@ -116,11 +127,11 @@ public void loadGraph() { }).join(); this.sendManager.finishSend(MessageType.VERTEX); - futures.clear(); - this.sendManager.startSend(MessageType.EDGE); + futures.clear(); for (int i = 0; i < this.sendThreadNum; i++) { - future = send(this.sendManager::sendEdge, this.loadService::createIteratorFromEdge); + future = this.send(this.sendManager::sendEdge, + this.loadService::createIteratorFromEdge); futures.add(future); } CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> { diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/buffer/FileRegionBuffer.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/buffer/FileRegionBuffer.java index ce1961467..8b597de42 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/buffer/FileRegionBuffer.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/buffer/FileRegionBuffer.java @@ -43,6 +43,11 @@ public FileRegionBuffer(int length) { this.length = length; } + public FileRegionBuffer(int length, String path) { + this.length = length; + this.path = path; + } + /** * Use zero-copy transform from socket channel to file * @param channel diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java index b77ffa807..54237d4f3 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java @@ -38,6 +38,7 @@ import org.apache.hugegraph.computer.core.receiver.edge.EdgeMessageRecvPartitions; import org.apache.hugegraph.computer.core.receiver.message.ComputeMessageRecvPartitions; import org.apache.hugegraph.computer.core.receiver.vertex.VertexMessageRecvPartitions; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.FileManager; @@ -65,6 +66,7 @@ public class MessageRecvManager implements Manager, MessageHandler { private int expectedFinishMessages; private CompletableFuture finishMessagesFuture; private AtomicInteger finishMessagesCount; + private SnapshotManager snapshotManager; private long waitFinishMessagesTimeout; private long superstep; @@ -89,10 +91,14 @@ public void init(Config config) { SuperstepFileGenerator fileGenerator = new SuperstepFileGenerator( this.fileManager, Constants.INPUT_SUPERSTEP); - this.vertexPartitions = new VertexMessageRecvPartitions( - this.context, fileGenerator, this.sortManager); - this.edgePartitions = new EdgeMessageRecvPartitions( - this.context, fileGenerator, this.sortManager); + this.vertexPartitions = new VertexMessageRecvPartitions(this.context, + fileGenerator, + this.sortManager, + this.snapshotManager); + this.edgePartitions = new EdgeMessageRecvPartitions(this.context, + fileGenerator, + this.sortManager, + this.snapshotManager); this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT); // One for vertex and one for edge. this.expectedFinishMessages = this.workerCount * 2; @@ -107,8 +113,10 @@ public void init(Config config) { public void beforeSuperstep(Config config, int superstep) { SuperstepFileGenerator fileGenerator = new SuperstepFileGenerator( this.fileManager, superstep); - this.messagePartitions = new ComputeMessageRecvPartitions( - this.context, fileGenerator, this.sortManager); + this.messagePartitions = new ComputeMessageRecvPartitions(this.context, + fileGenerator, + this.sortManager, + this.snapshotManager); this.expectedFinishMessages = this.workerCount; this.finishMessagesFuture = new CompletableFuture<>(); this.finishMessagesCount.set(this.expectedFinishMessages); @@ -245,4 +253,8 @@ public Map messageStats() { "The messagePartitions can't be null"); return this.messagePartitions.messageStats(); } + + public void setSnapshotManager(SnapshotManager snapshotManager) { + this.snapshotManager = snapshotManager; + } } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartition.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartition.java index d63d5dab3..ce0ada3e3 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartition.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartition.java @@ -127,6 +127,10 @@ public synchronized long totalBytes() { return this.totalBytes; } + public synchronized List outputFiles() { + return this.outputFiles; + } + public synchronized MessageStat messageStat() { // TODO: count the message received return new MessageStat(0L, this.totalBytes); diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartitions.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartitions.java index 362b1425e..222fff2c7 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartitions.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartitions.java @@ -17,42 +17,48 @@ package org.apache.hugegraph.computer.core.receiver; -import java.io.File; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.commons.io.FileUtils; import org.apache.hugegraph.computer.core.common.ComputerContext; import org.apache.hugegraph.computer.core.config.Config; import org.apache.hugegraph.computer.core.network.buffer.NetworkBuffer; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator; import org.apache.hugegraph.computer.core.store.entry.KvEntry; +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public abstract class MessageRecvPartitions

{ protected final ComputerContext context; protected final Config config; protected final SuperstepFileGenerator fileGenerator; protected final SortManager sortManager; + protected final SnapshotManager snapshotManager; // The map of partition-id and the messages for the partition. private final Map partitions; public MessageRecvPartitions(ComputerContext context, SuperstepFileGenerator fileGenerator, - SortManager sortManager) { + SortManager sortManager, + SnapshotManager snapshotManager) { this.context = context; this.config = context.config(); this.fileGenerator = fileGenerator; this.sortManager = sortManager; + this.snapshotManager = snapshotManager; this.partitions = new HashMap<>(); } protected abstract P createPartition(); + protected abstract void writePartitionSnapshot(int partitionId, List outputFiles); + public void addBuffer(int partitionId, NetworkBuffer buffer) { P partition = this.partition(partitionId); partition.addBuffer(buffer); @@ -87,6 +93,7 @@ public Map> iterators() { Map> entries = new HashMap<>(); for (Map.Entry entry : this.partitions.entrySet()) { entries.put(entry.getKey(), entry.getValue().iterator()); + this.writePartitionSnapshot(entry.getKey(), entry.getValue().outputFiles()); } return entries; } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/edge/EdgeMessageRecvPartitions.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/edge/EdgeMessageRecvPartitions.java index 705a1a204..558bfaa2b 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/edge/EdgeMessageRecvPartitions.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/edge/EdgeMessageRecvPartitions.java @@ -18,17 +18,22 @@ package org.apache.hugegraph.computer.core.receiver.edge; import org.apache.hugegraph.computer.core.common.ComputerContext; +import org.apache.hugegraph.computer.core.network.message.MessageType; import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator; +import java.util.List; + public class EdgeMessageRecvPartitions extends MessageRecvPartitions { public EdgeMessageRecvPartitions(ComputerContext context, SuperstepFileGenerator fileGenerator, - SortManager sortManager) { - super(context, fileGenerator, sortManager); + SortManager sortManager, + SnapshotManager snapshotManager) { + super(context, fileGenerator, sortManager, snapshotManager); } @Override @@ -36,4 +41,11 @@ public EdgeMessageRecvPartition createPartition() { return new EdgeMessageRecvPartition(this.context, this.fileGenerator, this.sortManager); } + + @Override + public void writePartitionSnapshot(int partitionId, List outputFiles) { + if (this.snapshotManager.writeSnapshot()) { + this.snapshotManager.upload(MessageType.EDGE, partitionId, outputFiles); + } + } } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/message/ComputeMessageRecvPartitions.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/message/ComputeMessageRecvPartitions.java index 618f4e439..a3e3e3584 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/message/ComputeMessageRecvPartitions.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/message/ComputeMessageRecvPartitions.java @@ -19,16 +19,20 @@ import org.apache.hugegraph.computer.core.common.ComputerContext; import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator; +import java.util.List; + public class ComputeMessageRecvPartitions extends MessageRecvPartitions { public ComputeMessageRecvPartitions(ComputerContext context, SuperstepFileGenerator fileGenerator, - SortManager sortManager) { - super(context, fileGenerator, sortManager); + SortManager sortManager, + SnapshotManager snapshotManager) { + super(context, fileGenerator, sortManager, snapshotManager); } @Override @@ -36,4 +40,9 @@ public ComputeMessageRecvPartition createPartition() { return new ComputeMessageRecvPartition(this.context, this.fileGenerator, this.sortManager); } + + @Override + protected void writePartitionSnapshot(int partitionId, List outputFiles) { + // pass + } } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/vertex/VertexMessageRecvPartitions.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/vertex/VertexMessageRecvPartitions.java index 7046f3fa7..982ae7409 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/vertex/VertexMessageRecvPartitions.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/vertex/VertexMessageRecvPartitions.java @@ -18,18 +18,23 @@ package org.apache.hugegraph.computer.core.receiver.vertex; import org.apache.hugegraph.computer.core.common.ComputerContext; +import org.apache.hugegraph.computer.core.network.message.MessageType; import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator; +import java.util.List; + public class VertexMessageRecvPartitions extends MessageRecvPartitions { public VertexMessageRecvPartitions(ComputerContext context, SuperstepFileGenerator fileGenerator, - SortManager sortManager) { - super(context, fileGenerator, sortManager); + SortManager sortManager, + SnapshotManager snapshotManager) { + super(context, fileGenerator, sortManager, snapshotManager); } @Override @@ -37,4 +42,11 @@ public VertexMessageRecvPartition createPartition() { return new VertexMessageRecvPartition(this.context, this.fileGenerator, this.sortManager); } + + @Override + public void writePartitionSnapshot(int partitionId, List outputFiles) { + if (this.snapshotManager.writeSnapshot()) { + this.snapshotManager.upload(MessageType.VERTEX, partitionId, outputFiles); + } + } } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java new file mode 100644 index 000000000..4734ee5aa --- /dev/null +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.computer.core.snapshot; + +import io.minio.DownloadObjectArgs; +import io.minio.ListObjectsArgs; +import io.minio.MinioClient; +import io.minio.RemoveObjectsArgs; +import io.minio.Result; +import io.minio.UploadObjectArgs; +import io.minio.messages.DeleteError; +import io.minio.messages.DeleteObject; +import io.minio.messages.Item; +import org.apache.commons.lang.StringUtils; +import org.apache.hugegraph.computer.core.common.ComputerContext; +import org.apache.hugegraph.computer.core.common.ContainerInfo; +import org.apache.hugegraph.computer.core.common.exception.ComputerException; +import org.apache.hugegraph.computer.core.config.ComputerOptions; +import org.apache.hugegraph.computer.core.config.Config; +import org.apache.hugegraph.computer.core.graph.partition.Partitioner; +import org.apache.hugegraph.computer.core.manager.Manager; +import org.apache.hugegraph.computer.core.network.buffer.FileRegionBuffer; +import org.apache.hugegraph.computer.core.network.message.MessageType; +import org.apache.hugegraph.computer.core.receiver.MessageRecvManager; +import org.apache.hugegraph.computer.core.sender.MessageSendManager; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; + +public class SnapshotManager implements Manager { + + private static final Logger LOG = Log.logger(SnapshotManager.class); + public static final String NAME = "worker_snapshot"; + + private final MessageSendManager sendManager; + private final MessageRecvManager recvManager; + + private final ContainerInfo workerInfo; + private final Partitioner partitioner; + private final int partitionCount; + private final boolean loadSnapshot; + private final boolean writeSnapshot; + private final String snapshotName; + + private MinioClient minioClient; + private String bucketName; + + public SnapshotManager(ComputerContext context, MessageSendManager sendManager, + MessageRecvManager recvManager, ContainerInfo workerInfo) { + this.loadSnapshot = context.config().get(ComputerOptions.SNAPSHOT_LOAD); + this.writeSnapshot = context.config().get(ComputerOptions.SNAPSHOT_WRITE); + + this.sendManager = sendManager; + this.recvManager = recvManager; + this.recvManager.setSnapshotManager(this); + + this.workerInfo = workerInfo; + this.partitioner = context.config().createObject(ComputerOptions.WORKER_PARTITIONER); + this.partitionCount = context.config().get(ComputerOptions.JOB_PARTITIONS_COUNT); + this.snapshotName = context.config().get(ComputerOptions.SNAPSHOT_NAME); + } + + @Override + public String name() { + return NAME; + } + + @Override + public void init(Config config) { + String endpoint = config.get(ComputerOptions.SNAPSHOT_MINIO_ENDPOINT); + String accessKey = config.get(ComputerOptions.SNAPSHOT_MINIO_ACCESS_KEY); + String secretKey = config.get(ComputerOptions.SNAPSHOT_MINIO_SECRET_KEY); + this.bucketName = config.get(ComputerOptions.SNAPSHOT_MINIO_BUCKET_NAME); + if (StringUtils.isNotEmpty(endpoint)) { + this.minioClient = MinioClient.builder() + .endpoint(endpoint) + .credentials(accessKey, secretKey) + .build(); + } + } + + @Override + public void close(Config config) { + // pass + } + + public boolean loadSnapshot() { + return this.loadSnapshot; + } + + public boolean writeSnapshot() { + return this.writeSnapshot; + } + + public void upload(MessageType messageType, int partitionId, List outputFiles) { + if (this.loadSnapshot()) { + LOG.info("No later {} snapshots have to be uploaded", + messageType.name().toLowerCase(Locale.ROOT)); + return; + } + this.uploadObjects(messageType, partitionId, outputFiles); + } + + public void load() { + int id = this.workerInfo.id(); + for (int partitionId = 0; partitionId < this.partitionCount; partitionId++) { + if (this.partitioner.workerId(partitionId) == id) { + // TODO: Do not need to send control message to all workers + this.sendManager.startSend(MessageType.VERTEX); + this.downloadObjects(MessageType.VERTEX, partitionId); + this.sendManager.finishSend(MessageType.VERTEX); + + this.sendManager.startSend(MessageType.EDGE); + this.downloadObjects(MessageType.EDGE, partitionId); + this.sendManager.finishSend(MessageType.EDGE); + } + } + } + + private void uploadObjects(MessageType messageType, int partitionId, + List outputFiles) { + String dirName = this.generateObjectDirName(messageType, partitionId); + + try { + this.clearObjectsIfExist(dirName); + } catch (Exception e) { + throw new ComputerException("Failed to clear out-dated snapshots from %s", dirName, e); + } + + LOG.info("Upload {} snapshots for partition {}", + messageType.name().toLowerCase(Locale.ROOT), partitionId); + for (String outputFile : outputFiles) { + String objectName = dirName + new File(outputFile).getName(); + this.uploadObject(outputFile, objectName); + } + } + + private void downloadObjects(MessageType messageType, int partitionId) { + LOG.info("Load {} snapshots for partition {}", + messageType.name().toLowerCase(Locale.ROOT), partitionId); + String dirName = this.generateObjectDirName(messageType, partitionId); + + try { + Iterable> snapshotFiles = this.minioClient.listObjects( + ListObjectsArgs.builder() + .bucket(this.bucketName) + .prefix(dirName) + .build()); + + if (!snapshotFiles.iterator().hasNext()) { + throw new ComputerException("Empty snapshot directory %s", dirName); + } + + for (Result result : snapshotFiles) { + Item item = result.get(); + int size = (int) item.size(); + String objectName = item.objectName(); + + String outputPath = this.recvManager.genOutputPath(messageType, partitionId); + this.downloadObject(objectName, outputPath); + + FileRegionBuffer fileRegionBuffer = new FileRegionBuffer(size, outputPath); + this.recvManager.handle(messageType, partitionId, fileRegionBuffer); + } + } catch (Exception e) { + throw new ComputerException("Failed to download snapshots from %s", dirName, e); + } + } + + private void uploadObject(String fileName, String objectName) { + try { + this.minioClient.uploadObject(UploadObjectArgs.builder() + .bucket(this.bucketName) + .object(objectName) + .filename(fileName) + .build()); + } catch (Exception e) { + throw new ComputerException("Failed to upload snapshot %s to %s", + fileName, objectName, e); + } + } + + private void downloadObject(String objectName, String outputPath) { + try { + this.minioClient.downloadObject(DownloadObjectArgs.builder() + .bucket(this.bucketName) + .object(objectName) + .filename(outputPath) + .build()); + } catch (Exception e) { + throw new ComputerException("Failed to download snapshot from %s to %s", + objectName, outputPath, e); + } + } + + private void clearObjectsIfExist(String dirName) throws Exception { + List objects = new LinkedList<>(); + Iterable> snapshotFiles = this.minioClient.listObjects( + ListObjectsArgs.builder() + .bucket(this.bucketName) + .prefix(dirName) + .build()); + if (!snapshotFiles.iterator().hasNext()) { + return; + } + + LOG.info("Clear out-dated snapshots from {} first", dirName); + for (Result result : snapshotFiles) { + Item item = result.get(); + objects.add(new DeleteObject(item.objectName())); + } + Iterable> results = + minioClient.removeObjects(RemoveObjectsArgs.builder() + .bucket(this.bucketName) + .objects(objects) + .build()); + for (Result result : results) { + DeleteError error = result.get(); + throw new ComputerException("Failed to delete snapshot %s, error message: %s", + error.objectName(), error.message()); + } + } + + private String generateObjectDirName(MessageType messageType, int partitionId) { + // dir name: {SNAPSHOT_NAME}/{PARTITIONER}/{PARTITION_COUNT}/VERTEX/{PARTITION_ID}/ + Path path = Paths.get(this.snapshotName, + this.partitioner.getClass().getSimpleName(), + String.valueOf(this.partitionCount), + messageType.name(), + String.valueOf(partitionId)); + return path + "/"; + } +} diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java index 901d9ad2e..48af87c66 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java @@ -47,6 +47,7 @@ import org.apache.hugegraph.computer.core.receiver.MessageRecvManager; import org.apache.hugegraph.computer.core.rpc.WorkerRpcManager; import org.apache.hugegraph.computer.core.sender.MessageSendManager; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.sorting.RecvSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SendSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; @@ -322,8 +323,15 @@ private InetSocketAddress initManagers(ContainerInfo masterInfo) { clientManager.sender()); this.managers.add(sendManager); + SnapshotManager snapshotManager = new SnapshotManager(this.context, + sendManager, + recvManager, + this.workerInfo); + this.managers.add(snapshotManager); + WorkerInputManager inputManager = new WorkerInputManager(this.context, - sendManager); + sendManager, + snapshotManager); inputManager.service(rpcManager.inputSplitService()); this.managers.add(inputManager); diff --git a/computer-dist/release-docs/LICENSE b/computer-dist/release-docs/LICENSE index 251a0797f..3bd66078f 100644 --- a/computer-dist/release-docs/LICENSE +++ b/computer-dist/release-docs/LICENSE @@ -335,6 +335,7 @@ The following components are provided under the Apache 2.0 License. (Apache License, Version 2.0) * Guava: Google Core Libraries for Java(com.google.guava:guava:25.1-jre-none ) (Apache License, Version 2.0) * LZ4 and xxHash(org.lz4:lz4-java:1.4.0-none ) (Apache License, Version 2.0) * Metrics Core(io.dropwizard.metrics:metrics-core:3.2.6-none ) + (Apache License, Version 2.0) * Minio(io.minio:minio:8.5.6-https://min.io ) (Apache License, Version 2.0) * Netty/All-in-One(io.netty:netty-all:4.1.42.Final-none ) (Apache License, Version 2.0) * OkHttp(com.squareup.okhttp:okhttp:2.7.5-none ) (Apache License, Version 2.0) * hugegraph-rpc(com.baidu.hugegraph:hugegraph-rpc:2.0.1-none ) diff --git a/computer-dist/release-docs/licenses/LICENSE-minio.txt b/computer-dist/release-docs/licenses/LICENSE-minio.txt new file mode 100644 index 000000000..d64569567 --- /dev/null +++ b/computer-dist/release-docs/licenses/LICENSE-minio.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/computer-test/pom.xml b/computer-test/pom.xml index 76f992b02..4b55da32f 100644 --- a/computer-test/pom.xml +++ b/computer-test/pom.xml @@ -53,6 +53,10 @@ okhttp com.squareup.okhttp + + okhttp + com.squareup.okhttp3 + diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java index c4052497b..63f0c5b2d 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java @@ -47,6 +47,7 @@ import org.apache.hugegraph.computer.core.receiver.MessageRecvManager; import org.apache.hugegraph.computer.core.receiver.ReceiverUtil; import org.apache.hugegraph.computer.core.sender.MessageSendManager; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.sorting.SendSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.FileManager; @@ -104,6 +105,11 @@ public void setup() { fileManager, sortManager); this.managers.add(receiveManager); + SnapshotManager snapshotManager = new SnapshotManager(context(), + null, + receiveManager, + null); + this.managers.add(snapshotManager); this.managers.initAll(this.config); this.connectionId = new ConnectionId(new InetSocketAddress("localhost", 8081), @@ -225,7 +231,7 @@ private static void addMessages(Consumer consumer) message.add(id); ReceiverUtil.consumeBuffer(ReceiverUtil.writeMessage(id, message), - consumer); + consumer); } } } diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java index c2126d1bd..f164d868b 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java @@ -51,6 +51,7 @@ import org.apache.hugegraph.computer.core.receiver.MessageRecvManager; import org.apache.hugegraph.computer.core.receiver.ReceiverUtil; import org.apache.hugegraph.computer.core.sender.MessageSendManager; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator; import org.apache.hugegraph.computer.core.sort.sorting.SendSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; @@ -137,6 +138,11 @@ private void testEdgeFreq(EdgeFrequency freq) fileManager, sortManager); this.managers.add(receiveManager); + SnapshotManager snapshotManager = new SnapshotManager(context(), + null, + receiveManager, + null); + this.managers.add(snapshotManager); this.managers.initAll(this.config); ConnectionId connectionId = new ConnectionId(new InetSocketAddress( "localhost", 8081), diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java index 36361380f..e2ad3de58 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java @@ -41,6 +41,7 @@ import org.apache.hugegraph.computer.core.network.message.MessageType; import org.apache.hugegraph.computer.core.receiver.MessageRecvManager; import org.apache.hugegraph.computer.core.receiver.ReceiverUtil; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator; import org.apache.hugegraph.computer.core.sort.sorting.RecvSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; @@ -90,6 +91,11 @@ public void setup() { fileManager, sortManager); this.managers.add(receiveManager); + SnapshotManager snapshotManager = new SnapshotManager(context(), + null, + receiveManager, + null); + this.managers.add(snapshotManager); this.managers.initAll(this.config); this.connectionId = new ConnectionId(new InetSocketAddress("localhost", 8081), diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java index 89029ec14..cf8e3e611 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java @@ -25,6 +25,7 @@ import org.apache.hugegraph.computer.core.network.connection.ConnectionManager; import org.apache.hugegraph.computer.core.network.connection.TransportConnectionManager; import org.apache.hugegraph.computer.core.receiver.MessageRecvManager; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.sorting.RecvSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.FileManager; @@ -55,6 +56,10 @@ public void test() { MessageRecvManager recvManager = new MessageRecvManager(context(), fileManager, sortManager); + SnapshotManager snapshotManager = new SnapshotManager(context(), + null, + recvManager, + null); recvManager.init(config); ConnectionManager connManager = new TransportConnectionManager(); DataServerManager serverManager = new DataServerManager(connManager, diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java index 3942b4eff..b423d3861 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java @@ -32,6 +32,7 @@ import org.apache.hugegraph.computer.core.receiver.edge.EdgeMessageRecvPartitionTest; import org.apache.hugegraph.computer.core.receiver.message.ComputeMessageRecvPartitionTest; import org.apache.hugegraph.computer.core.receiver.vertex.VertexMessageRecvPartitionTest; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator; import org.apache.hugegraph.computer.core.sort.sorting.RecvSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; @@ -49,6 +50,7 @@ public class MessageRecvManagerTest extends UnitTestBase { private FileManager fileManager; private SortManager sortManager; private MessageRecvManager receiveManager; + private SnapshotManager snapshotManager; private ConnectionId connectionId; @Before @@ -73,6 +75,10 @@ public void setup() { this.receiveManager = new MessageRecvManager(context(), this.fileManager, this.sortManager); + this.snapshotManager = new SnapshotManager(context(), + null, + receiveManager, + null); this.receiveManager.init(this.config); this.connectionId = new ConnectionId( new InetSocketAddress("localhost",8081), diff --git a/pom.xml b/pom.xml index 028af9b3a..2784670be 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ 1.0.0 1.0.0 1.0.0 + 8.5.6