Skip to content

Commit

Permalink
feat(core): support load vertex/edge snapshot (#269)
Browse files Browse the repository at this point in the history
  • Loading branch information
Radeity authored Oct 22, 2023
1 parent 479e1a4 commit 2e45cd2
Show file tree
Hide file tree
Showing 21 changed files with 657 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,62 @@ public static synchronized ComputerOptions instance() {
""
);

public static final ConfigOption<Boolean> SNAPSHOT_WRITE =
new ConfigOption<>(
"snapshot.write",
"Whether to write snapshot of input vertex/edge partitions.",
allowValues(true, false),
false
);

public static final ConfigOption<Boolean> SNAPSHOT_LOAD =
new ConfigOption<>(
"snapshot.load",
"Whether to load from snapshot of vertex/edge partitions.",
allowValues(true, false),
false
);

public static final ConfigOption<String> SNAPSHOT_NAME =
new ConfigOption<>(
"snapshot.name",
"The user-defined snapshot name.",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_ENDPOINT =
new ConfigOption<>(
"snapshot.minio_endpoint",
"The endpoint of MinIO, MinIO can be used to store snapshots.",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_ACCESS_KEY =
new ConfigOption<>(
"snapshot.minio_access_key",
"The access key of MinIO.",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_SECRET_KEY =
new ConfigOption<>(
"snapshot.minio_secret_key",
"The secret key of MinIO.",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_BUCKET_NAME =
new ConfigOption<>(
"snapshot.minio_bucket_name",
"The bucket name of MinIO.",
null,
""
);

public static final ConfigOption<Integer> INPUT_SEND_THREAD_NUMS =
new ConfigOption<>(
"input.send_thread_nums",
Expand Down
5 changes: 5 additions & 0 deletions computer-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>${minio-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.rpc.InputSplitRpcService;
import org.apache.hugegraph.computer.core.sender.MessageSendManager;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.worker.load.LoadService;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.Log;
Expand All @@ -60,9 +61,13 @@ public class WorkerInputManager implements Manager {
*/
private final MessageSendManager sendManager;

private final SnapshotManager snapshotManager;

public WorkerInputManager(ComputerContext context,
MessageSendManager sendManager) {
MessageSendManager sendManager,
SnapshotManager snapshotManager) {
this.sendManager = sendManager;
this.snapshotManager = snapshotManager;

this.sendThreadNum = this.inputSendThreadNum(context.config());
this.sendExecutor = ExecutorUtil.newFixedThreadPool(this.sendThreadNum, PREFIX);
Expand Down Expand Up @@ -103,11 +108,17 @@ public void service(InputSplitRpcService rpcService) {
* but there is no guarantee that all of them has been received.
*/
public void loadGraph() {
if (this.snapshotManager.loadSnapshot()) {
this.snapshotManager.load();
return;
}

List<CompletableFuture<?>> futures = new ArrayList<>();
CompletableFuture<?> future;
this.sendManager.startSend(MessageType.VERTEX);
for (int i = 0; i < this.sendThreadNum; i++) {
future = send(this.sendManager::sendVertex, this.loadService::createIteratorFromVertex);
future = this.send(this.sendManager::sendVertex,
this.loadService::createIteratorFromVertex);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
Expand All @@ -116,11 +127,11 @@ public void loadGraph() {
}).join();
this.sendManager.finishSend(MessageType.VERTEX);

futures.clear();

this.sendManager.startSend(MessageType.EDGE);
futures.clear();
for (int i = 0; i < this.sendThreadNum; i++) {
future = send(this.sendManager::sendEdge, this.loadService::createIteratorFromEdge);
future = this.send(this.sendManager::sendEdge,
this.loadService::createIteratorFromEdge);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public FileRegionBuffer(int length) {
this.length = length;
}

public FileRegionBuffer(int length, String path) {
this.length = length;
this.path = path;
}

/**
* Use zero-copy transform from socket channel to file
* @param channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hugegraph.computer.core.receiver.edge.EdgeMessageRecvPartitions;
import org.apache.hugegraph.computer.core.receiver.message.ComputeMessageRecvPartitions;
import org.apache.hugegraph.computer.core.receiver.vertex.VertexMessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.FileManager;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class MessageRecvManager implements Manager, MessageHandler {
private int expectedFinishMessages;
private CompletableFuture<Void> finishMessagesFuture;
private AtomicInteger finishMessagesCount;
private SnapshotManager snapshotManager;

private long waitFinishMessagesTimeout;
private long superstep;
Expand All @@ -89,10 +91,14 @@ public void init(Config config) {
SuperstepFileGenerator fileGenerator = new SuperstepFileGenerator(
this.fileManager,
Constants.INPUT_SUPERSTEP);
this.vertexPartitions = new VertexMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.edgePartitions = new EdgeMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.vertexPartitions = new VertexMessageRecvPartitions(this.context,
fileGenerator,
this.sortManager,
this.snapshotManager);
this.edgePartitions = new EdgeMessageRecvPartitions(this.context,
fileGenerator,
this.sortManager,
this.snapshotManager);
this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT);
// One for vertex and one for edge.
this.expectedFinishMessages = this.workerCount * 2;
Expand All @@ -107,8 +113,10 @@ public void init(Config config) {
public void beforeSuperstep(Config config, int superstep) {
SuperstepFileGenerator fileGenerator = new SuperstepFileGenerator(
this.fileManager, superstep);
this.messagePartitions = new ComputeMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.messagePartitions = new ComputeMessageRecvPartitions(this.context,
fileGenerator,
this.sortManager,
this.snapshotManager);
this.expectedFinishMessages = this.workerCount;
this.finishMessagesFuture = new CompletableFuture<>();
this.finishMessagesCount.set(this.expectedFinishMessages);
Expand Down Expand Up @@ -245,4 +253,8 @@ public Map<Integer, MessageStat> messageStats() {
"The messagePartitions can't be null");
return this.messagePartitions.messageStats();
}

public void setSnapshotManager(SnapshotManager snapshotManager) {
this.snapshotManager = snapshotManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public synchronized long totalBytes() {
return this.totalBytes;
}

public synchronized List<String> outputFiles() {
return this.outputFiles;
}

public synchronized MessageStat messageStat() {
// TODO: count the message received
return new MessageStat(0L, this.totalBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,48 @@

package org.apache.hugegraph.computer.core.receiver;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.network.buffer.NetworkBuffer;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;
import org.apache.hugegraph.computer.core.store.entry.KvEntry;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public abstract class MessageRecvPartitions<P extends MessageRecvPartition> {

protected final ComputerContext context;
protected final Config config;
protected final SuperstepFileGenerator fileGenerator;
protected final SortManager sortManager;
protected final SnapshotManager snapshotManager;

// The map of partition-id and the messages for the partition.
private final Map<Integer, P> partitions;

public MessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
SortManager sortManager,
SnapshotManager snapshotManager) {
this.context = context;
this.config = context.config();
this.fileGenerator = fileGenerator;
this.sortManager = sortManager;
this.snapshotManager = snapshotManager;
this.partitions = new HashMap<>();
}

protected abstract P createPartition();

protected abstract void writePartitionSnapshot(int partitionId, List<String> outputFiles);

public void addBuffer(int partitionId, NetworkBuffer buffer) {
P partition = this.partition(partitionId);
partition.addBuffer(buffer);
Expand Down Expand Up @@ -87,6 +93,7 @@ public Map<Integer, PeekableIterator<KvEntry>> iterators() {
Map<Integer, PeekableIterator<KvEntry>> entries = new HashMap<>();
for (Map.Entry<Integer, P> entry : this.partitions.entrySet()) {
entries.put(entry.getKey(), entry.getValue().iterator());
this.writePartitionSnapshot(entry.getKey(), entry.getValue().outputFiles());
}
return entries;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,34 @@
package org.apache.hugegraph.computer.core.receiver.edge;

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;

import java.util.List;

public class EdgeMessageRecvPartitions
extends MessageRecvPartitions<EdgeMessageRecvPartition> {

public EdgeMessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
super(context, fileGenerator, sortManager);
SortManager sortManager,
SnapshotManager snapshotManager) {
super(context, fileGenerator, sortManager, snapshotManager);
}

@Override
public EdgeMessageRecvPartition createPartition() {
return new EdgeMessageRecvPartition(this.context, this.fileGenerator,
this.sortManager);
}

@Override
public void writePartitionSnapshot(int partitionId, List<String> outputFiles) {
if (this.snapshotManager.writeSnapshot()) {
this.snapshotManager.upload(MessageType.EDGE, partitionId, outputFiles);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,30 @@

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;

import java.util.List;

public class ComputeMessageRecvPartitions
extends MessageRecvPartitions<ComputeMessageRecvPartition> {

public ComputeMessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
super(context, fileGenerator, sortManager);
SortManager sortManager,
SnapshotManager snapshotManager) {
super(context, fileGenerator, sortManager, snapshotManager);
}

@Override
public ComputeMessageRecvPartition createPartition() {
return new ComputeMessageRecvPartition(this.context, this.fileGenerator,
this.sortManager);
}

@Override
protected void writePartitionSnapshot(int partitionId, List<String> outputFiles) {
// pass
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,35 @@
package org.apache.hugegraph.computer.core.receiver.vertex;

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;

import java.util.List;

public class VertexMessageRecvPartitions
extends MessageRecvPartitions<VertexMessageRecvPartition> {


public VertexMessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
super(context, fileGenerator, sortManager);
SortManager sortManager,
SnapshotManager snapshotManager) {
super(context, fileGenerator, sortManager, snapshotManager);
}

@Override
public VertexMessageRecvPartition createPartition() {
return new VertexMessageRecvPartition(this.context, this.fileGenerator,
this.sortManager);
}

@Override
public void writePartitionSnapshot(int partitionId, List<String> outputFiles) {
if (this.snapshotManager.writeSnapshot()) {
this.snapshotManager.upload(MessageType.VERTEX, partitionId, outputFiles);
}
}
}
Loading

0 comments on commit 2e45cd2

Please sign in to comment.