Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: apply latest reducer proto to the reducer SDK #90

Merged
merged 9 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
@Getter
@AllArgsConstructor
class ActorResponse {
// FIXME - with the latest proto update, each response has a single set of keys, hence we can remove the keys field here.
String[] keys;
ReduceOuterClass.ReduceResponse response;
}
3 changes: 0 additions & 3 deletions src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@

@AllArgsConstructor
class HandlerDatum implements Datum {

private byte[] value;
private Instant watermark;
private Instant eventTime;


@Override
public Instant getWatermark() {
return this.watermark;
Expand All @@ -27,5 +25,4 @@ public byte[] getValue() {
public Instant getEventTime() {
return this.eventTime;
}

}
39 changes: 25 additions & 14 deletions src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -19,7 +20,6 @@
@Slf4j
@AllArgsConstructor
class ReduceActor extends AbstractActor {

private String[] keys;
private Metadata md;
private Reducer groupBy;
Expand All @@ -44,23 +44,34 @@ private void invokeHandler(HandlerDatum handlerDatum) {
private void getResult(String eof) {
MessageList resultMessages = this.groupBy.getOutput(keys, md);
// send the result back to sender(parent actor)
getSender().tell(buildDatumListResponse(resultMessages), getSelf());
resultMessages.getMessages().forEach(message -> {
getSender().tell(buildActorResponse(message), getSelf());
});
}

private ActorResponse buildDatumListResponse(MessageList messageList) {
private ActorResponse buildActorResponse(Message message) {
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
messageList.getMessages().forEach(message -> {
responseBuilder.addResults(ReduceOuterClass.ReduceResponse.Result.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(
message.getKeys()))
.addAllTags(message.getTags() == null ? new ArrayList<>() : List.of(
message.getTags()))
.build());

});
// set the window using the actor metadata.
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getStartTime().getNano()))
.setEnd(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
// if we start building the response, it means we already reached EOF.
responseBuilder.setEOF(true);
KeranYang marked this conversation as resolved.
Show resolved Hide resolved
// set the result.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>():Arrays.asList(message.getKeys()))
.addAllTags(
message.getTags() == null ? new ArrayList<>():List.of(message.getTags()))
.build());
return new ActorResponse(this.keys, responseBuilder.build());
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
/**
* ReduceSupervisorActor actor distributes the messages to actors and handles failure.
*/

@Slf4j
class ReduceSupervisorActor extends AbstractActor {
private final ReducerFactory<? extends Reducer> reducerFactory;
Expand Down Expand Up @@ -90,18 +89,21 @@ public Receive createReceive() {
if there is no actor for an incoming set of keys, create a new actor
track all the child actors using actors map
*/
private void invokeActors(ReduceOuterClass.ReduceRequest datumRequest) {
String[] keys = datumRequest.getKeysList().toArray(new String[0]);
private void invokeActors(ReduceOuterClass.ReduceRequest reduceRequest) {
ReduceOuterClass.ReduceRequest.Payload payload = reduceRequest.getPayload();
String[] keys = payload.getKeysList().toArray(new String[0]);
// TODO - do we need to include window information in the keyStr?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
String keyStr = String.join(Constants.DELIMITER, keys);
if (!actorsMap.containsKey(keyStr)) {
Reducer reduceHandler = reducerFactory.createReducer();
ActorRef actorRef = getContext()
.actorOf(ReduceActor.props(keys, md, reduceHandler));

actorsMap.put(keyStr, actorRef);
}

HandlerDatum handlerDatum = constructHandlerDatum(datumRequest);
HandlerDatum handlerDatum = constructHandlerDatum(payload);
actorsMap.get(keyStr).tell(handlerDatum, getSelf());
}

Expand All @@ -121,22 +123,25 @@ private void responseListener(ActorResponse actorResponse) {
*/

responseObserver.onNext(actorResponse.getResponse());
// TODO - do we need to include window information for aligned windows?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
actorsMap.remove(String.join(Constants.DELIMITER, actorResponse.getKeys()));
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
}
}

private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest datumRequest) {
private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payload payload) {
return new HandlerDatum(
datumRequest.getValue().toByteArray(),
payload.getValue().toByteArray(),
Instant.ofEpochSecond(
datumRequest.getWatermark().getSeconds(),
datumRequest.getWatermark().getNanos()),
payload.getWatermark().getSeconds(),
payload.getWatermark().getNanos()),
Instant.ofEpochSecond(
datumRequest.getEventTime().getSeconds(),
datumRequest.getEventTime().getNanos())
payload.getEventTime().getSeconds(),
payload.getEventTime().getNanos())
);
}

Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/numaproj/numaflow/reducer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser
responseObserver));


return new StreamObserver<ReduceOuterClass.ReduceRequest>() {
return new StreamObserver<>() {
@Override
public void onNext(ReduceOuterClass.ReduceRequest datum) {
// send the message to parent actor, which takes care of distribution.
Expand All @@ -110,7 +110,6 @@ public void onError(Throwable throwable) {
public void onCompleted() {
// indicate the end of input to the supervisor
supervisorActor.tell(Constants.EOF, ActorRef.noSender());

}
};
}
Expand Down
48 changes: 42 additions & 6 deletions src/main/proto/reduce/v1/reduce.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import "google/protobuf/empty.proto";
package reduce.v1;

service Reduce {
// ReduceFn applies a reduce function to a request stream.
// ReduceFn applies a reduce function to a stream of reduce requests and sends reduce response back in a streaming fashion.
rpc ReduceFn(stream ReduceRequest) returns (stream ReduceResponse);

// IsReady is the heartbeat endpoint for gRPC.
Expand All @@ -19,22 +19,58 @@ service Reduce {
* ReduceRequest represents a request element.
*/
message ReduceRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
// WindowOperation represents a window operation.
// For Aligned windows, OPEN, APPEND and CLOSE events are sent.
message WindowOperation {
enum Event {
OPEN = 0;
CLOSE = 1;
APPEND = 4;
}

Event event = 1;
repeated Window windows = 2;
}

// Payload represents a payload element.
message Payload {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
}

Payload payload = 1;
WindowOperation operation = 2;
}

// Window represents a window.
// Since the client doesn't track keys, window doesn't have a keys field.
message Window {
google.protobuf.Timestamp start = 1;
google.protobuf.Timestamp end = 2;
string slot = 3;
}

/**
* ReduceResponse represents a response element.
*/
message ReduceResponse {
// FIXME: put all fields(window, EOF) inside of Result. Reference: https://protobuf.dev/programming-guides/api/#dont-include-primitive-types
KeranYang marked this conversation as resolved.
Show resolved Hide resolved
// Result represents a result element. It contains the result of the reduce function.
message Result {
repeated string keys = 1;
bytes value = 2;
repeated string tags = 3;
}
repeated Result results = 1;

Result result = 1;

// window represents a window to which the result belongs.
Window window = 2;

// EOF represents the end of the response for a window.
bool EOF = 3;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

/**
* This is a dummy implementation of reduce output stream observer for testing purpose.
*/
@Slf4j
public class ReduceOutputStreamObserver implements StreamObserver<ReduceOuterClass.ReduceResponse> {
public AtomicReference<Boolean> completed = new AtomicReference<>(false);
public AtomicReference<ReduceOuterClass.ReduceResponse> resultDatum = new AtomicReference<>(
ReduceOuterClass.ReduceResponse.newBuilder().build());
public AtomicReference<List<ReduceOuterClass.ReduceResponse>> resultDatum = new AtomicReference<>(
new ArrayList<>());
public Throwable t;

@Override
public void onNext(ReduceOuterClass.ReduceResponse datum) {
resultDatum.set(resultDatum
.get()
.toBuilder()
.addAllResults(datum.getResultsList())
.build());
public void onNext(ReduceOuterClass.ReduceResponse response) {
List<ReduceOuterClass.ReduceResponse> receivedResponses = resultDatum.get();
receivedResponses.add(response);
resultDatum.set(receivedResponses);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,20 @@
import org.junit.Test;

import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

public class ReduceSupervisorActorTest {

@Test
public void invokeSingleActor() throws RuntimeException {
public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-1");
CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();

String reduceKey = "reduce-key";
ReduceOuterClass.ReduceRequest.Builder requestBuilder = ReduceOuterClass.ReduceRequest.
newBuilder().addKeys(reduceKey);

ActorRef shutdownActor = actorSystem
.actorOf(ReduceShutdownActor
.props(completableFuture));
Expand All @@ -39,24 +37,35 @@ public void invokeSingleActor() throws RuntimeException {
.props(new TestReducerFactory(), md, shutdownActor, outputStreamObserver));

for (int i = 1; i <= 10; i++) {
ReduceOuterClass.ReduceRequest reduceRequest = requestBuilder
.addKeys("reduce-test")
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
ReduceOuterClass.ReduceRequest reduceRequest = ReduceOuterClass.ReduceRequest
.newBuilder()
.setPayload(ReduceOuterClass.ReduceRequest.Payload
.newBuilder()
// all reduce requests share same set of keys.
.addAllKeys(Arrays.asList("key-1", "key-2"))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build();
supervisor.tell(reduceRequest, ActorRef.noSender());
}

supervisor.tell(Constants.EOF, ActorRef.noSender());

try {
completableFuture.get();
assertEquals(1, outputStreamObserver.resultDatum.get().size());
assertEquals("10", outputStreamObserver.resultDatum
.get()
.get(0)
.getResult()
.getValue()
.toStringUtf8());
} catch (InterruptedException | ExecutionException e) {
fail("Expected the future to complete without exception");
}
}

@Test
public void invokeMultipleActors() throws RuntimeException {
public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcasts_then_multipleReducerActorsHandleKeySetsSeparately() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-2");
CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();

Expand All @@ -67,26 +76,42 @@ public void invokeMultipleActors() throws RuntimeException {
Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));

ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
ActorRef supervisor = actorSystem
.actorOf(ReduceSupervisorActor
.props(
new TestReducerFactory(),
md,
shutdownActor,
new ReduceOutputStreamObserver()));
outputStreamObserver)
);

for (int i = 1; i <= 10; i++) {
ReduceOuterClass.ReduceRequest reduceRequest = ReduceOuterClass.ReduceRequest
.newBuilder()
.addKeys("reduce-test" + i)
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.setPayload(ReduceOuterClass.ReduceRequest.Payload
.newBuilder()
// each request contain a unique set of keys.
.addAllKeys(Arrays.asList("shared-key", "unique-key-" + i))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build();
supervisor.tell(reduceRequest, ActorRef.noSender());
}

supervisor.tell(Constants.EOF, ActorRef.noSender());
try {
completableFuture.get();
// the outputStreamObserver should get updated 10 times, each time with value 1.
assertEquals(10, outputStreamObserver.resultDatum.get().size());
for (int i = 0; i < 10; i++) {
assertEquals("1", outputStreamObserver.resultDatum
.get()
.get(i)
.getResult()
.getValue()
.toStringUtf8());
}
} catch (InterruptedException | ExecutionException e) {
fail("Expected the future to complete without exception");
}
Expand Down
Loading
Loading