Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): support load vertex/edge snapshot #269

Merged
merged 6 commits into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,62 @@ public static synchronized ComputerOptions instance() {
""
);

public static final ConfigOption<Boolean> SNAPSHOT_WRITE =
new ConfigOption<>(
"snapshot.write",
"Whether write snapshot of input vertex and edge partitions",
Radeity marked this conversation as resolved.
Show resolved Hide resolved
allowValues(true, false),
false
);

public static final ConfigOption<Boolean> SNAPSHOT_LOAD =
new ConfigOption<>(
"snapshot.load",
"Whether use snapshot of input vertex and edge partitions",
Radeity marked this conversation as resolved.
Show resolved Hide resolved
allowValues(true, false),
false
);

public static final ConfigOption<String> SNAPSHOT_VIEW_KEY =
new ConfigOption<>(
"snapshot.view_key",
"View key of target snapshot",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does the view key mean? do we have a concept that is easier to understand?

Copy link
Member Author

@Radeity Radeity Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use SNAPSHOT_NAME instead, just a user-defined unique identifier of the snapshot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @javeme , I wanna make sure is there any other confusion about it before make changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. SNAPSHOT_NAME seems easy to understand

null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_ENDPOINT =
new ConfigOption<>(
"snapshot.minio_endpoint",
"MinIO endpoint",
Radeity marked this conversation as resolved.
Show resolved Hide resolved
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_ACCESS_KEY =
new ConfigOption<>(
"snapshot.minio_access_key",
"MinIO access key",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_SECRET_KEY =
new ConfigOption<>(
"snapshot.minio_secret_key",
"MinIO secret key",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_BUCKET_NAME =
new ConfigOption<>(
"snapshot.minio_bucket_name",
"MinIO bucket name",
null,
""
);

public static final ConfigOption<Integer> INPUT_SEND_THREAD_NUMS =
new ConfigOption<>(
"input.send_thread_nums",
Expand Down
5 changes: 5 additions & 0 deletions computer-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>${minio-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.rpc.InputSplitRpcService;
import org.apache.hugegraph.computer.core.sender.MessageSendManager;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.worker.load.LoadService;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.Log;
Expand All @@ -60,9 +61,13 @@ public class WorkerInputManager implements Manager {
*/
private final MessageSendManager sendManager;

private final SnapshotManager snapshotManager;

public WorkerInputManager(ComputerContext context,
MessageSendManager sendManager) {
MessageSendManager sendManager,
SnapshotManager snapshotManager) {
this.sendManager = sendManager;
this.snapshotManager = snapshotManager;

this.sendThreadNum = this.inputSendThreadNum(context.config());
this.sendExecutor = ExecutorUtil.newFixedThreadPool(this.sendThreadNum, PREFIX);
Expand Down Expand Up @@ -103,11 +108,17 @@ public void service(InputSplitRpcService rpcService) {
* but there is no guarantee that all of them has been received.
*/
public void loadGraph() {
if (this.snapshotManager.loadSnapshot()) {
this.snapshotManager.load();
return;
}

List<CompletableFuture<?>> futures = new ArrayList<>();
CompletableFuture<?> future;
this.sendManager.startSend(MessageType.VERTEX);
for (int i = 0; i < this.sendThreadNum; i++) {
future = send(this.sendManager::sendVertex, this.loadService::createIteratorFromVertex);
future = this.send(this.sendManager::sendVertex,
this.loadService::createIteratorFromVertex);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
Expand All @@ -116,11 +127,11 @@ public void loadGraph() {
}).join();
this.sendManager.finishSend(MessageType.VERTEX);

futures.clear();

this.sendManager.startSend(MessageType.EDGE);
futures.clear();
for (int i = 0; i < this.sendThreadNum; i++) {
future = send(this.sendManager::sendEdge, this.loadService::createIteratorFromEdge);
future = this.send(this.sendManager::sendEdge,
this.loadService::createIteratorFromEdge);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public FileRegionBuffer(int length) {
this.length = length;
}

public FileRegionBuffer(int length, String path) {
this.length = length;
this.path = path;
}

/**
* Use zero-copy transform from socket channel to file
* @param channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hugegraph.computer.core.receiver.edge.EdgeMessageRecvPartitions;
import org.apache.hugegraph.computer.core.receiver.message.ComputeMessageRecvPartitions;
import org.apache.hugegraph.computer.core.receiver.vertex.VertexMessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.FileManager;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class MessageRecvManager implements Manager, MessageHandler {
private int expectedFinishMessages;
private CompletableFuture<Void> finishMessagesFuture;
private AtomicInteger finishMessagesCount;
private SnapshotManager snapshotManager;

private long waitFinishMessagesTimeout;
private long superstep;
Expand All @@ -90,9 +92,9 @@ public void init(Config config) {
this.fileManager,
Constants.INPUT_SUPERSTEP);
this.vertexPartitions = new VertexMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.context, fileGenerator, this.sortManager, this.snapshotManager);
this.edgePartitions = new EdgeMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.context, fileGenerator, this.sortManager, this.snapshotManager);
this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT);
// One for vertex and one for edge.
this.expectedFinishMessages = this.workerCount * 2;
Expand All @@ -108,7 +110,7 @@ public void beforeSuperstep(Config config, int superstep) {
SuperstepFileGenerator fileGenerator = new SuperstepFileGenerator(
this.fileManager, superstep);
this.messagePartitions = new ComputeMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.context, fileGenerator, this.sortManager, this.snapshotManager);
this.expectedFinishMessages = this.workerCount;
this.finishMessagesFuture = new CompletableFuture<>();
this.finishMessagesCount.set(this.expectedFinishMessages);
Expand Down Expand Up @@ -245,4 +247,8 @@ public Map<Integer, MessageStat> messageStats() {
"The messagePartitions can't be null");
return this.messagePartitions.messageStats();
}

public void setSnapshotManager(SnapshotManager snapshotManager) {
this.snapshotManager = snapshotManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public synchronized long totalBytes() {
return this.totalBytes;
}

public synchronized List<String> outputFiles() {
return this.outputFiles;
}

public synchronized MessageStat messageStat() {
// TODO: count the message received
return new MessageStat(0L, this.totalBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,48 @@

package org.apache.hugegraph.computer.core.receiver;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.network.buffer.NetworkBuffer;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;
import org.apache.hugegraph.computer.core.store.entry.KvEntry;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public abstract class MessageRecvPartitions<P extends MessageRecvPartition> {

protected final ComputerContext context;
protected final Config config;
protected final SuperstepFileGenerator fileGenerator;
protected final SortManager sortManager;
protected final SnapshotManager snapshotManager;

// The map of partition-id and the messages for the partition.
private final Map<Integer, P> partitions;

public MessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
SortManager sortManager,
SnapshotManager snapshotManager) {
this.context = context;
this.config = context.config();
this.fileGenerator = fileGenerator;
this.sortManager = sortManager;
this.snapshotManager = snapshotManager;
this.partitions = new HashMap<>();
}

protected abstract P createPartition();

protected abstract void writePartitionSnapshot(int partitionId, List<String> outputFiles);

public void addBuffer(int partitionId, NetworkBuffer buffer) {
P partition = this.partition(partitionId);
partition.addBuffer(buffer);
Expand Down Expand Up @@ -87,6 +93,7 @@ public Map<Integer, PeekableIterator<KvEntry>> iterators() {
Map<Integer, PeekableIterator<KvEntry>> entries = new HashMap<>();
for (Map.Entry<Integer, P> entry : this.partitions.entrySet()) {
entries.put(entry.getKey(), entry.getValue().iterator());
this.writePartitionSnapshot(entry.getKey(), entry.getValue().outputFiles());
}
return entries;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,34 @@
package org.apache.hugegraph.computer.core.receiver.edge;

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;

import java.util.List;

public class EdgeMessageRecvPartitions
extends MessageRecvPartitions<EdgeMessageRecvPartition> {

public EdgeMessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
super(context, fileGenerator, sortManager);
SortManager sortManager,
SnapshotManager snapshotManager) {
super(context, fileGenerator, sortManager, snapshotManager);
}

@Override
public EdgeMessageRecvPartition createPartition() {
return new EdgeMessageRecvPartition(this.context, this.fileGenerator,
this.sortManager);
}

@Override
public void writePartitionSnapshot(int partitionId, List<String> outputFiles) {
if (this.snapshotManager.writeSnapshot()) {
this.snapshotManager.upload(MessageType.EDGE, partitionId, outputFiles);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,30 @@

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;

import java.util.List;

public class ComputeMessageRecvPartitions
extends MessageRecvPartitions<ComputeMessageRecvPartition> {

public ComputeMessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
super(context, fileGenerator, sortManager);
SortManager sortManager,
SnapshotManager snapshotManager) {
super(context, fileGenerator, sortManager, snapshotManager);
}

@Override
public ComputeMessageRecvPartition createPartition() {
return new ComputeMessageRecvPartition(this.context, this.fileGenerator,
this.sortManager);
}

@Override
protected void writePartitionSnapshot(int partitionId, List<String> outputFiles) {
// pass
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,35 @@
package org.apache.hugegraph.computer.core.receiver.vertex;

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;

import java.util.List;

public class VertexMessageRecvPartitions
extends MessageRecvPartitions<VertexMessageRecvPartition> {


public VertexMessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
super(context, fileGenerator, sortManager);
SortManager sortManager,
SnapshotManager snapshotManager) {
super(context, fileGenerator, sortManager, snapshotManager);
}

@Override
public VertexMessageRecvPartition createPartition() {
return new VertexMessageRecvPartition(this.context, this.fileGenerator,
this.sortManager);
}

@Override
public void writePartitionSnapshot(int partitionId, List<String> outputFiles) {
if (this.snapshotManager.writeSnapshot()) {
this.snapshotManager.upload(MessageType.VERTEX, partitionId, outputFiles);
}
}
}
Loading
Loading