Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: apply latest reducer proto to the reducer SDK #90

Merged
merged 9 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducer/ActorRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.numaproj.numaflow.reducer;

import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* ActorRequest is to store the request sent to ReduceActors.
*/
@Getter
@AllArgsConstructor
class ActorRequest {
ReduceOuterClass.ReduceRequest request;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getRequest().getPayload().getKeysList().toArray(new String[0]));
}

public String[] getKeySet() {
return this.getRequest().getPayload().getKeysList().toArray(new String[0]);
}
}
11 changes: 10 additions & 1 deletion src/main/java/io/numaproj/numaflow/reducer/ActorResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@
@Getter
@AllArgsConstructor
class ActorResponse {
String[] keys;
ReduceOuterClass.ReduceResponse response;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getResponse().getResult().getKeysList().toArray(new String[0]));
}
}
3 changes: 0 additions & 3 deletions src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@

@AllArgsConstructor
class HandlerDatum implements Datum {

private byte[] value;
private Instant watermark;
private Instant eventTime;


@Override
public Instant getWatermark() {
return this.watermark;
Expand All @@ -27,5 +25,4 @@ public byte[] getValue() {
public Instant getEventTime() {
return this.eventTime;
}

}
60 changes: 45 additions & 15 deletions src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -19,7 +20,6 @@
@Slf4j
@AllArgsConstructor
class ReduceActor extends AbstractActor {

private String[] keys;
private Metadata md;
private Reducer groupBy;
Expand All @@ -44,23 +44,53 @@ private void invokeHandler(HandlerDatum handlerDatum) {
private void getResult(String eof) {
MessageList resultMessages = this.groupBy.getOutput(keys, md);
// send the result back to sender(parent actor)
getSender().tell(buildDatumListResponse(resultMessages), getSelf());
resultMessages.getMessages().forEach(message -> {
getSender().tell(buildActorResponse(message), getSelf());
});
// send a response back with EOF set to true, indicating the reducer has finished the data aggregation.
getSender().tell(buildEOFActorResponse(), getSelf());
}

private ActorResponse buildDatumListResponse(MessageList messageList) {
private ActorResponse buildActorResponse(Message message) {
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
messageList.getMessages().forEach(message -> {
responseBuilder.addResults(ReduceOuterClass.ReduceResponse.Result.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(
message.getKeys()))
.addAllTags(message.getTags() == null ? new ArrayList<>() : List.of(
message.getTags()))
.build());

});
return new ActorResponse(this.keys, responseBuilder.build());
// set the window using the actor metadata.
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getStartTime().getNano()))
.setEnd(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
responseBuilder.setEOF(false);
// set the result.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>():Arrays.asList(message.getKeys()))
.addAllTags(
message.getTags() == null ? new ArrayList<>():List.of(message.getTags()))
.build());
return new ActorResponse(responseBuilder.build());
}

private ActorResponse buildEOFActorResponse() {
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getStartTime().getNano()))
.setEnd(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
responseBuilder.setEOF(true);
// set a dummy result with the keys.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.addAllKeys(List.of(this.keys))
.build());
return new ActorResponse(responseBuilder.build());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
/**
* ReduceSupervisorActor actor distributes the messages to actors and handles failure.
*/

@Slf4j
class ReduceSupervisorActor extends AbstractActor {
private final ReducerFactory<? extends Reducer> reducerFactory;
Expand Down Expand Up @@ -79,7 +78,7 @@ public void postStop() {
public Receive createReceive() {
return ReceiveBuilder
.create()
.match(ReduceOuterClass.ReduceRequest.class, this::invokeActors)
.match(ActorRequest.class, this::invokeActors)
.match(String.class, this::sendEOF)
.match(ActorResponse.class, this::responseListener)
.build();
Expand All @@ -90,19 +89,18 @@ public Receive createReceive() {
if there is no actor for an incoming set of keys, create a new actor
track all the child actors using actors map
*/
private void invokeActors(ReduceOuterClass.ReduceRequest datumRequest) {
String[] keys = datumRequest.getKeysList().toArray(new String[0]);
String keyStr = String.join(Constants.DELIMITER, keys);
if (!actorsMap.containsKey(keyStr)) {
private void invokeActors(ActorRequest actorRequest) {
String[] keys = actorRequest.getKeySet();
String uniqueId = actorRequest.getUniqueIdentifier();
if (!actorsMap.containsKey(uniqueId)) {
Reducer reduceHandler = reducerFactory.createReducer();
ActorRef actorRef = getContext()
.actorOf(ReduceActor.props(keys, md, reduceHandler));

actorsMap.put(keyStr, actorRef);
actorsMap.put(uniqueId, actorRef);
}

HandlerDatum handlerDatum = constructHandlerDatum(datumRequest);
actorsMap.get(keyStr).tell(handlerDatum, getSelf());
HandlerDatum handlerDatum = constructHandlerDatum(actorRequest.getRequest().getPayload());
actorsMap.get(uniqueId).tell(handlerDatum, getSelf());
}

private void sendEOF(String EOF) {
Expand All @@ -119,24 +117,25 @@ private void responseListener(ActorResponse actorResponse) {
if there are no entries in the map, that means processing is
done we can close the stream.
*/

responseObserver.onNext(actorResponse.getResponse());
actorsMap.remove(String.join(Constants.DELIMITER, actorResponse.getKeys()));
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
if (actorResponse.getResponse().getEOF()) {
actorsMap.remove(actorResponse.getUniqueIdentifier());
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
}
}
}

private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest datumRequest) {
private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payload payload) {
return new HandlerDatum(
datumRequest.getValue().toByteArray(),
payload.getValue().toByteArray(),
Instant.ofEpochSecond(
datumRequest.getWatermark().getSeconds(),
datumRequest.getWatermark().getNanos()),
payload.getWatermark().getSeconds(),
payload.getWatermark().getNanos()),
Instant.ofEpochSecond(
datumRequest.getEventTime().getSeconds(),
datumRequest.getEventTime().getNanos())
payload.getEventTime().getSeconds(),
payload.getEventTime().getNanos())
);
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/reducer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void start() throws Exception {
server.start();

log.info(
"Server started, listening on socket path: " + grpcConfig.getSocketPath());
"Server started, listening on socket path: "
+ grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/numaproj/numaflow/reducer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser
responseObserver));


return new StreamObserver<ReduceOuterClass.ReduceRequest>() {
return new StreamObserver<>() {
@Override
public void onNext(ReduceOuterClass.ReduceRequest datum) {
// send the message to parent actor, which takes care of distribution.
if (!supervisorActor.isTerminated()) {
supervisorActor.tell(datum, ActorRef.noSender());
supervisorActor.tell(new ActorRequest(datum), ActorRef.noSender());
} else {
responseObserver.onError(new Throwable("Supervisor actor was terminated"));
}
Expand All @@ -110,7 +110,6 @@ public void onError(Throwable throwable) {
public void onCompleted() {
// indicate the end of input to the supervisor
supervisorActor.tell(Constants.EOF, ActorRef.noSender());

}
};
}
Expand Down
49 changes: 43 additions & 6 deletions src/main/proto/reduce/v1/reduce.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import "google/protobuf/empty.proto";
package reduce.v1;

service Reduce {
// ReduceFn applies a reduce function to a request stream.
// ReduceFn applies a reduce function to a stream of reduce requests and sends reduce response back in a streaming fashion.
rpc ReduceFn(stream ReduceRequest) returns (stream ReduceResponse);

// IsReady is the heartbeat endpoint for gRPC.
Expand All @@ -19,22 +19,59 @@ service Reduce {
* ReduceRequest represents a request element.
*/
message ReduceRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
// WindowOperation represents a window operation.
// For Aligned windows, OPEN, APPEND and CLOSE events are sent.
message WindowOperation {
enum Event {
OPEN = 0;
CLOSE = 1;
APPEND = 4;
}

Event event = 1;
repeated Window windows = 2;
}

// Payload represents a payload element.
message Payload {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
}

Payload payload = 1;
WindowOperation operation = 2;
}

// Window represents a window.
// Since the client doesn't track keys, window doesn't have a keys field.
message Window {
google.protobuf.Timestamp start = 1;
google.protobuf.Timestamp end = 2;
string slot = 3;
}

/**
* ReduceResponse represents a response element.
*/
message ReduceResponse {
// FIXME: put all fields(window, EOF) inside of Result. Reference: https://protobuf.dev/programming-guides/api/#dont-include-primitive-types
KeranYang marked this conversation as resolved.
Show resolved Hide resolved
// Result represents a result element. It contains the result of the reduce function.
message Result {
repeated string keys = 1;
bytes value = 2;
repeated string tags = 3;
}
repeated Result results = 1;

Result result = 1;

// window represents a window to which the result belongs.
Window window = 2;

// EOF represents the end of the response for a window.
// When it's set to true, the platform considers the response as an indicator which doesn't contain real data.
bool EOF = 3;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

/**
* This is a dummy implementation of reduce output stream observer for testing purpose.
*/
@Slf4j
public class ReduceOutputStreamObserver implements StreamObserver<ReduceOuterClass.ReduceResponse> {
public AtomicReference<Boolean> completed = new AtomicReference<>(false);
public AtomicReference<ReduceOuterClass.ReduceResponse> resultDatum = new AtomicReference<>(
ReduceOuterClass.ReduceResponse.newBuilder().build());
public AtomicReference<List<ReduceOuterClass.ReduceResponse>> resultDatum = new AtomicReference<>(
new ArrayList<>());
public Throwable t;

@Override
public void onNext(ReduceOuterClass.ReduceResponse datum) {
resultDatum.set(resultDatum
.get()
.toBuilder()
.addAllResults(datum.getResultsList())
.build());
public void onNext(ReduceOuterClass.ReduceResponse response) {
List<ReduceOuterClass.ReduceResponse> receivedResponses = resultDatum.get();
receivedResponses.add(response);
resultDatum.set(receivedResponses);
}

@Override
Expand Down
Loading
Loading