Skip to content

Commit

Permalink
Streaming API sketch
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Dec 10, 2024
1 parent 75abbab commit cbc71d3
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package software.amazon.awssdk.crt.iot;

public class IncomingPublishEvent {

private byte[] payload;

private IncomingPublishEvent() {
}

/**
* Gets the payload of the IncomingPublishEvent.
*
* @return Payload of the IncomingPublishEvent.
*/
public byte[] getPayload() {
return payload;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,19 @@ public CompletableFuture<MqttRequestResponse> submitRequest(RequestResponseOpera

return future;
}


/**
* Creates a new streaming operation from a set of configuration options. A streaming operation provides a
* mechanism for listening to a specific event stream from an AWS MQTT-based service.
*
* @param options configuration options for the streaming operation
*
* @return a new streaming operation instance
*/
public StreamingOperationBase createStream(StreamingOperationOptions options) {
return new StreamingOperationBase(this, options);
}

/**
* Cleans up the native resources associated with this client. The client is unusable after this call
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package software.amazon.awssdk.crt.iot;

import software.amazon.awssdk.crt.CrtResource;

/**
* An AWS MQTT service streaming operation. A streaming operation listens to messages on
* a particular topic, deserializes them using a service model, and emits the modeled data by invoking a callback.
*/
public class StreamingOperationBase extends CrtResource {

StreamingOperationBase(MqttRequestResponseClient rrClient, StreamingOperationOptions options) {
acquireNativeHandle(streamingOperationNew(
this,
rrClient.getNativeHandle(),
options
));
}

/**
* Triggers the streaming operation to start listening to the configured stream of events. Has no effect on an
* already-open operation. It is an error to attempt to re-open a closed streaming operation.
*/
public void open() {
streamingOperationOpen(getNativeHandle());
}

/**
* Cleans up the native resources associated with this client. The client is unusable after this call
*/
@Override
protected void releaseNativeHandle() {
if (!isNull()) {
streamingOperationDestroy(getNativeHandle());
}
}

/**
* Determines whether a resource releases its dependencies at the same time the native handle is released or if it waits.
* Resources that wait are responsible for calling releaseReferences() manually.
*/
@Override
protected boolean canReleaseReferencesImmediately() { return true; }

/*******************************************************************************
* native methods
******************************************************************************/

private static native long streamingOperationNew(StreamingOperationBase streamingOperation, long rrClientHandle, StreamingOperationOptions options);

private static native void streamingOperationOpen(long streamingOperationHandle);

private static native void streamingOperationDestroy(long streamingOperationHandle);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package software.amazon.awssdk.crt.iot;

import java.util.function.Consumer;

/**
* Configuration options for an MQTT-based streaming operation.
*/
public class StreamingOperationOptions {

String topic;
Consumer<SubscriptionStatusEvent> subscriptionStatusEventCallback;
Consumer<IncomingPublishEvent> incomingPublishEventCallback;

public static class StreamingOperationOptionsBuilder {

private final StreamingOperationOptions options = new StreamingOperationOptions();

StreamingOperationOptionsBuilder() {}

/**
* Sets the MQTT topic that a streaming operation will listen for messages on
*
* @param topic MQTT topic to listen to
*
* @return this builder object
*/
public StreamingOperationOptionsBuilder withTopic(String topic) {
this.options.topic = topic;
return this;
}

/**
* Sets the callback function a streaming operation should invoke whenever the underlying subscription changes
* status.
*
* @param callback function to invoke on streaming subscription status changes
*
* @return this builder object
*/
public StreamingOperationOptionsBuilder withSubscriptionStatusEventCallback(Consumer<SubscriptionStatusEvent> callback) {
this.options.subscriptionStatusEventCallback = callback;
return this;
}

/**
* Sets the callback function a streaming operation should invoke every time a publish message arrives on
* the operation's topic.
*
* @param callback function to invoke whenever a publish messages arrives that matches the operation's topic
*
* @return this builder object
*/
public StreamingOperationOptionsBuilder withIncomingPublishEventCallback(Consumer<IncomingPublishEvent> callback) {
this.options.incomingPublishEventCallback = callback;
return this;
}

/**
* Creates a StreamingOperationOptions instance from the builder's configuration.
*
* @return a new StreamingOperationOptions instance
*/
public StreamingOperationOptions build() {
return new StreamingOperationOptions(this.options);
}
}

/**
* Creates a new StreamingOperationOptionsBuilder instance
*
* @return a new StreamingOperationOptionsBuilder instance
*/
public static StreamingOperationOptionsBuilder builder() {
return new StreamingOperationOptionsBuilder();
}

private StreamingOperationOptions() {
}

private StreamingOperationOptions(StreamingOperationOptions options) {
this.topic = options.topic;
this.subscriptionStatusEventCallback = options.subscriptionStatusEventCallback;
this.incomingPublishEventCallback = options.incomingPublishEventCallback;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package software.amazon.awssdk.crt.iot;

import java.util.Optional;

/**
* An event that describes a change in subscription status for a streaming operation.
*/
public class SubscriptionStatusEvent {
private SubscriptionStatusEventType type;
private Optional<Integer> error;

private SubscriptionStatusEvent(SubscriptionStatusEventType type) {
this.type = type;
this.error = Optional.empty();
}

private SubscriptionStatusEvent(SubscriptionStatusEventType type, int errorCode) {
this.type = type;
this.error = Optional.of(errorCode);
}

/**
* Gets the type of status change represented by the event.
*
* @return The type of status change represented by the event
*/
public SubscriptionStatusEventType getType() {
return this.type;
}

/**
* Gets the underlying reason for the event. Only set for SubscriptionLost and SubscriptionHalted. Use
* CRT.awsErrorString() to convert the integer error code into an error description.
*
* @return underlying reason for the event
*/
public Optional<Integer> getError() {
return this.error;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package software.amazon.awssdk.crt.iot;

/**
* The type of change to the state of a streaming operation subscription
*/
public enum SubscriptionStatusEventType {

/**
* The streaming operation is successfully subscribed to its topic (filter)
*/
SUBSCRIPTION_ESTABLISHED(0),

/**
* The streaming operation has temporarily lost its subscription to its topic (filter)
*/
SUBSCRIPTION_LOST(1),

/**
* The streaming operation has entered a terminal state where it has given up trying to subscribe
* to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission policy).
*/
SUBSCRIPTION_HALTED(2);

private final int type;

private SubscriptionStatusEventType(int value) {
type = value;
}
}
37 changes: 37 additions & 0 deletions src/native/mqtt_request_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,43 @@ JNIEXPORT void JNICALL
s_aws_request_response_operation_jni_owned_parameters_clean_up(&request_params, env);
}

JNIEXPORT void JNICALL
Java_software_amazon_awssdk_crt_iot_StreamingOperationBase_streamingOperationNew(
JNIEnv *env,
jclass jni_class,
jobject java_streaming_operation,
jlong jni_mqtt_request_response_client_handle,
jobject java_options) {

(void)env;
(void)jni_class;
(void)java_streaming_operation;
(void)jni_mqtt_request_response_client_handle;
(void)java_options;
}

JNIEXPORT void JNICALL
Java_software_amazon_awssdk_crt_iot_StreamingOperationBase_streamingOperationOpen(
JNIEnv *env,
jclass jni_class,
jlong jni_streaming_operation_handle) {

(void)env;
(void)jni_class;
(void)jni_streaming_operation_handle;
}

JNIEXPORT void JNICALL
Java_software_amazon_awssdk_crt_iot_StreamingOperationBase_streamingOperationDestroy(
JNIEnv *env,
jclass jni_class,
jlong jni_streaming_operation_handle) {

(void)env;
(void)jni_class;
(void)jni_streaming_operation_handle;
}

#if UINTPTR_MAX == 0xffffffff
# if defined(_MSC_VER)
# pragma warning(pop)
Expand Down

0 comments on commit cbc71d3

Please sign in to comment.