Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to Async and Side Output based on Eng Feedback #82

Merged
merged 5 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ env/
venv/
.java-version
/pyflink/
/.run/

Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,20 @@
public class ProcessedEvent {

final String message;
public String processed;

@Override
public String toString() {
return "ProcessedEvent{" +
", message='" + message + '\'' +
", processed='" + processed + '\'' +
'}';
}

public ProcessedEvent(String message, String processed) {
public ProcessedEvent(String message) {
this.message = message;
this.processed = processed;
}

// Getter methods
public String getMessage() {
return message;
}

public String getProcessed() {
return processed;
}

// Setter method for processed field
public void setProcessed(String processed) {
this.processed = processed;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.*;

public class ProcessingFunction extends RichAsyncFunction<IncomingEvent, ProcessedEvent> {
private static final Logger LOG = LoggerFactory.getLogger(ProcessingFunction.class);

private final String apiUrl;
private final String apiKey;
private static ExecutorService executorService;


private transient AsyncHttpClient client;

Expand All @@ -43,41 +43,52 @@ public ProcessingFunction(String apiUrl, String apiKey) {
public void open(Configuration parameters) throws Exception {
DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config().setConnectTimeout(Duration.ofSeconds(10));
client = Dsl.asyncHttpClient(clientBuilder);

int numCores = Runtime.getRuntime().availableProcessors(); // get num cores on node for thread count
executorService = Executors.newFixedThreadPool(numCores);

}

@Override
public void close() throws Exception
{
client.close();
executorService.shutdown();
}

@Override
public void asyncInvoke(IncomingEvent incomingEvent, ResultFuture<ProcessedEvent> resultFuture) {

// Create a new ProcessedEvent instance
ProcessedEvent processedEvent = new ProcessedEvent(incomingEvent.getMessage(), null);
ProcessedEvent processedEvent = new ProcessedEvent(incomingEvent.getMessage());
LOG.debug("New request: {}", incomingEvent);

// Note: The Async Client used must return a Future object or equivalent
Future<Response> future = client.prepareGet(apiUrl)
.setHeader("x-api-key", apiKey)
.execute();

// Asynchronously calling API and handling response via Completable Future
CompletableFuture.supplyAsync(() -> {
try {
LOG.debug("Trying to get response for {}", incomingEvent.getId());
Response response = future.get();
return response.getStatusCode();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error during async HTTP call: {}", e.getMessage());
return -1;
}
}).thenAccept(statusCode -> {
// Process the request via a Completable Future, in order to not block request synchronously
// Notice we are passing executor service for thread management
CompletableFuture.supplyAsync(() ->
{
try {
LOG.debug("Trying to get response for {}", incomingEvent.getId());
Response response = future.get();
return response.getStatusCode();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error during async HTTP call: {}", e.getMessage());
return -1;
}
}, executorService).thenAccept(statusCode -> {
if (statusCode == 200) {
processedEvent.setProcessed("SUCCESS");
LOG.debug("Success! {}", incomingEvent.getId());
resultFuture.complete(Collections.singleton(processedEvent));
} else if (statusCode == 500) { // Retryable error
LOG.error("Status code 500, retrying shortly...");
processedEvent.setProcessed("FAIL");
resultFuture.completeExceptionally(new Throwable(statusCode.toString()));
} else {
LOG.error("Unexpected status code: {}", statusCode);
processedEvent.setProcessed("FAIL");
resultFuture.completeExceptionally(new Throwable(statusCode.toString()));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static void main(String[] args) throws Exception {
Preconditions.checkArgument(!outputStreamArn.isEmpty(), "Output stream ARN must not be empty");

processedStream
.map(value -> String.format("%s,%s", value.message, value.processed))
.map(value -> String.format("%s", value.message))
.sinkTo(createSink(outputProperties));

LOGGER.debug("Starting flink job: {}", "Async I/O Retries");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ public IncomingEvent(String message) {





}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.amazonaws.services.msf;

public enum ProcessingOutcome {
SUCCESS("SUCCESS"),
ERROR("ERROR");

private final String text;

ProcessingOutcome(final String text) {
this.text = text;
}

@Override
public String toString() {
return text;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,20 @@ public static void main(String[] args) throws Exception {
.setParallelism(1);

// Validate stream for invalid messages
SingleOutputStreamOperator<Tuple2<IncomingEvent, Boolean>> validatedStream = source
SingleOutputStreamOperator<Tuple2<IncomingEvent, ProcessingOutcome>> validatedStream = source
.map(incomingEvent -> {
boolean isPoisoned = "Poison".equals(incomingEvent.message);
return Tuple2.of(incomingEvent, isPoisoned);
}, TypeInformation.of(new TypeHint<Tuple2<IncomingEvent, Boolean>>() {
ProcessingOutcome result = "Poison".equals(incomingEvent.message)?ProcessingOutcome.ERROR: ProcessingOutcome.SUCCESS;
return Tuple2.of(incomingEvent, result);
}, TypeInformation.of(new TypeHint<Tuple2<IncomingEvent, ProcessingOutcome>>() {
}));

// Split the stream based on validation
SingleOutputStreamOperator<IncomingEvent> mainStream = validatedStream
.process(new ProcessFunction<Tuple2<IncomingEvent, Boolean>, IncomingEvent>() {
.process(new ProcessFunction<Tuple2<IncomingEvent, ProcessingOutcome>, IncomingEvent>() {
@Override
public void processElement(Tuple2<IncomingEvent, Boolean> value, Context ctx,
public void processElement(Tuple2<IncomingEvent, ProcessingOutcome> value, Context ctx,
Collector<IncomingEvent> out) throws Exception {
if (value.f1) {
if (value.f1.equals(ProcessingOutcome.ERROR)) {
// Invalid event (true), send to DLQ sink
ctx.output(invalidEventsTag, value.f0);
} else {
Expand Down