Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): support load vertex/edge snapshot #269

Merged
merged 6 commits into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,62 @@ public static synchronized ComputerOptions instance() {
""
);

public static final ConfigOption<Boolean> SNAPSHOT_WRITE =
new ConfigOption<>(
"snapshot.write",
"Whether to write snapshot of input vertex/edge partitions.",
allowValues(true, false),
false
);

public static final ConfigOption<Boolean> SNAPSHOT_LOAD =
new ConfigOption<>(
"snapshot.load",
"Whether to load from snapshot of vertex/edge partitions.",
allowValues(true, false),
false
);

public static final ConfigOption<String> SNAPSHOT_NAME =
new ConfigOption<>(
"snapshot.name",
"The user-defined snapshot name.",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_ENDPOINT =
new ConfigOption<>(
"snapshot.minio_endpoint",
"The endpoint of MinIO, MinIO can be used to store snapshots.",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_ACCESS_KEY =
new ConfigOption<>(
"snapshot.minio_access_key",
"The access key of MinIO.",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_SECRET_KEY =
new ConfigOption<>(
"snapshot.minio_secret_key",
"The secret key of MinIO.",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_BUCKET_NAME =
new ConfigOption<>(
"snapshot.minio_bucket_name",
"The bucket name of MinIO.",
null,
""
);

public static final ConfigOption<Integer> INPUT_SEND_THREAD_NUMS =
new ConfigOption<>(
"input.send_thread_nums",
Expand Down
5 changes: 5 additions & 0 deletions computer-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>${minio-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.rpc.InputSplitRpcService;
import org.apache.hugegraph.computer.core.sender.MessageSendManager;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.worker.load.LoadService;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.Log;
Expand All @@ -60,9 +61,13 @@
*/
private final MessageSendManager sendManager;

private final SnapshotManager snapshotManager;

public WorkerInputManager(ComputerContext context,
MessageSendManager sendManager) {
MessageSendManager sendManager,
SnapshotManager snapshotManager) {
this.sendManager = sendManager;
this.snapshotManager = snapshotManager;

this.sendThreadNum = this.inputSendThreadNum(context.config());
this.sendExecutor = ExecutorUtil.newFixedThreadPool(this.sendThreadNum, PREFIX);
Expand Down Expand Up @@ -103,11 +108,17 @@
* but there is no guarantee that all of them has been received.
*/
public void loadGraph() {
if (this.snapshotManager.loadSnapshot()) {
this.snapshotManager.load();
return;

Check warning on line 113 in computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java

View check run for this annotation

Codecov / codecov/patch

computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java#L112-L113

Added lines #L112 - L113 were not covered by tests
}

List<CompletableFuture<?>> futures = new ArrayList<>();
CompletableFuture<?> future;
this.sendManager.startSend(MessageType.VERTEX);
for (int i = 0; i < this.sendThreadNum; i++) {
future = send(this.sendManager::sendVertex, this.loadService::createIteratorFromVertex);
future = this.send(this.sendManager::sendVertex,
this.loadService::createIteratorFromVertex);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
Expand All @@ -116,11 +127,11 @@
}).join();
this.sendManager.finishSend(MessageType.VERTEX);

futures.clear();

this.sendManager.startSend(MessageType.EDGE);
futures.clear();
for (int i = 0; i < this.sendThreadNum; i++) {
future = send(this.sendManager::sendEdge, this.loadService::createIteratorFromEdge);
future = this.send(this.sendManager::sendEdge,
this.loadService::createIteratorFromEdge);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
this.length = length;
}

public FileRegionBuffer(int length, String path) {
this.length = length;
this.path = path;
}

Check warning on line 49 in computer-core/src/main/java/org/apache/hugegraph/computer/core/network/buffer/FileRegionBuffer.java

View check run for this annotation

Codecov / codecov/patch

computer-core/src/main/java/org/apache/hugegraph/computer/core/network/buffer/FileRegionBuffer.java#L46-L49

Added lines #L46 - L49 were not covered by tests

/**
* Use zero-copy transform from socket channel to file
* @param channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hugegraph.computer.core.receiver.edge.EdgeMessageRecvPartitions;
import org.apache.hugegraph.computer.core.receiver.message.ComputeMessageRecvPartitions;
import org.apache.hugegraph.computer.core.receiver.vertex.VertexMessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.FileManager;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class MessageRecvManager implements Manager, MessageHandler {
private int expectedFinishMessages;
private CompletableFuture<Void> finishMessagesFuture;
private AtomicInteger finishMessagesCount;
private SnapshotManager snapshotManager;

private long waitFinishMessagesTimeout;
private long superstep;
Expand All @@ -89,10 +91,14 @@ public void init(Config config) {
SuperstepFileGenerator fileGenerator = new SuperstepFileGenerator(
this.fileManager,
Constants.INPUT_SUPERSTEP);
this.vertexPartitions = new VertexMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.edgePartitions = new EdgeMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.vertexPartitions = new VertexMessageRecvPartitions(this.context,
fileGenerator,
this.sortManager,
this.snapshotManager);
this.edgePartitions = new EdgeMessageRecvPartitions(this.context,
fileGenerator,
this.sortManager,
this.snapshotManager);
this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT);
// One for vertex and one for edge.
this.expectedFinishMessages = this.workerCount * 2;
Expand All @@ -107,8 +113,10 @@ public void init(Config config) {
public void beforeSuperstep(Config config, int superstep) {
SuperstepFileGenerator fileGenerator = new SuperstepFileGenerator(
this.fileManager, superstep);
this.messagePartitions = new ComputeMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.messagePartitions = new ComputeMessageRecvPartitions(this.context,
fileGenerator,
this.sortManager,
this.snapshotManager);
this.expectedFinishMessages = this.workerCount;
this.finishMessagesFuture = new CompletableFuture<>();
this.finishMessagesCount.set(this.expectedFinishMessages);
Expand Down Expand Up @@ -245,4 +253,8 @@ public Map<Integer, MessageStat> messageStats() {
"The messagePartitions can't be null");
return this.messagePartitions.messageStats();
}

public void setSnapshotManager(SnapshotManager snapshotManager) {
this.snapshotManager = snapshotManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public synchronized long totalBytes() {
return this.totalBytes;
}

public synchronized List<String> outputFiles() {
return this.outputFiles;
}

public synchronized MessageStat messageStat() {
// TODO: count the message received
return new MessageStat(0L, this.totalBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,48 @@

package org.apache.hugegraph.computer.core.receiver;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.network.buffer.NetworkBuffer;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;
import org.apache.hugegraph.computer.core.store.entry.KvEntry;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public abstract class MessageRecvPartitions<P extends MessageRecvPartition> {

protected final ComputerContext context;
protected final Config config;
protected final SuperstepFileGenerator fileGenerator;
protected final SortManager sortManager;
protected final SnapshotManager snapshotManager;

// The map of partition-id and the messages for the partition.
private final Map<Integer, P> partitions;

public MessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
SortManager sortManager,
SnapshotManager snapshotManager) {
this.context = context;
this.config = context.config();
this.fileGenerator = fileGenerator;
this.sortManager = sortManager;
this.snapshotManager = snapshotManager;
this.partitions = new HashMap<>();
}

protected abstract P createPartition();

protected abstract void writePartitionSnapshot(int partitionId, List<String> outputFiles);

public void addBuffer(int partitionId, NetworkBuffer buffer) {
P partition = this.partition(partitionId);
partition.addBuffer(buffer);
Expand Down Expand Up @@ -87,6 +93,7 @@ public Map<Integer, PeekableIterator<KvEntry>> iterators() {
Map<Integer, PeekableIterator<KvEntry>> entries = new HashMap<>();
for (Map.Entry<Integer, P> entry : this.partitions.entrySet()) {
entries.put(entry.getKey(), entry.getValue().iterator());
this.writePartitionSnapshot(entry.getKey(), entry.getValue().outputFiles());
}
return entries;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,34 @@
package org.apache.hugegraph.computer.core.receiver.edge;

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;

import java.util.List;

public class EdgeMessageRecvPartitions
extends MessageRecvPartitions<EdgeMessageRecvPartition> {

public EdgeMessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
super(context, fileGenerator, sortManager);
SortManager sortManager,
SnapshotManager snapshotManager) {
super(context, fileGenerator, sortManager, snapshotManager);
}

@Override
public EdgeMessageRecvPartition createPartition() {
return new EdgeMessageRecvPartition(this.context, this.fileGenerator,
this.sortManager);
}

@Override
public void writePartitionSnapshot(int partitionId, List<String> outputFiles) {
if (this.snapshotManager.writeSnapshot()) {
this.snapshotManager.upload(MessageType.EDGE, partitionId, outputFiles);
}

Check warning on line 49 in computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/edge/EdgeMessageRecvPartitions.java

View check run for this annotation

Codecov / codecov/patch

computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/edge/EdgeMessageRecvPartitions.java#L48-L49

Added lines #L48 - L49 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,30 @@

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;

import java.util.List;

public class ComputeMessageRecvPartitions
extends MessageRecvPartitions<ComputeMessageRecvPartition> {

public ComputeMessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
super(context, fileGenerator, sortManager);
SortManager sortManager,
SnapshotManager snapshotManager) {
super(context, fileGenerator, sortManager, snapshotManager);
}

@Override
public ComputeMessageRecvPartition createPartition() {
return new ComputeMessageRecvPartition(this.context, this.fileGenerator,
this.sortManager);
}

@Override
protected void writePartitionSnapshot(int partitionId, List<String> outputFiles) {
// pass
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,35 @@
package org.apache.hugegraph.computer.core.receiver.vertex;

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;

import java.util.List;

public class VertexMessageRecvPartitions
extends MessageRecvPartitions<VertexMessageRecvPartition> {


public VertexMessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
super(context, fileGenerator, sortManager);
SortManager sortManager,
SnapshotManager snapshotManager) {
super(context, fileGenerator, sortManager, snapshotManager);
}

@Override
public VertexMessageRecvPartition createPartition() {
return new VertexMessageRecvPartition(this.context, this.fileGenerator,
this.sortManager);
}

@Override
public void writePartitionSnapshot(int partitionId, List<String> outputFiles) {
if (this.snapshotManager.writeSnapshot()) {
this.snapshotManager.upload(MessageType.VERTEX, partitionId, outputFiles);
}
}
}
Loading
Loading