Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bi-direction subscription #1124

Merged
merged 2 commits into from
Oct 8, 2024
Merged

Conversation

artursouza
Copy link
Member

@artursouza artursouza commented Sep 12, 2024

Description

Bi-directional subscription.

Issue reference

We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.

Please reference the issue this PR will close: #1072

Checklist

Please make sure you've completed the relevant tasks for this PR, out of the following list:

  • Code compiles correctly
  • Created/updated tests
  • Extended the documentation

@@ -145,6 +145,12 @@
<version>${dapr.sdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for the tests to work on IntelliJ. Not sure why it is like that in master branch.

@artursouza artursouza force-pushed the bidi-subscription branch 4 times, most recently from 095e952 to 763fd6e Compare September 26, 2024 23:32
@artursouza artursouza marked this pull request as ready for review September 26, 2024 23:33
@artursouza artursouza requested review from a team as code owners September 26, 2024 23:33
@artur-ciocanu
Copy link
Contributor

@artursouza @salaboy and @cicoyle my Dapr-fu is not that great yet, but for the streaming subscription, after enabling detailed logs I have gotten this stack trace:

time="2024-09-27T17:17:38.233931+03:00" level=info msg="Subscribing to pubsub 'messagebus' topic 'stream-topic'" app_id=pubsubstreamit instance=ciocanu.corp.adobe.com scope=dapr.runtime.pubsub.streamer type=log ver=1.14.2
time="2024-09-27T17:17:38.237124+03:00" level=debug msg="Processing Redis message 1726305169401-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237173+03:00" level=debug msg="Processing Redis message 1726305169393-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237235+03:00" level=debug msg="Processing Redis message 1726305169396-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237322+03:00" level=debug msg="Processing Redis message 1726305169418-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237199+03:00" level=debug msg="Processing Redis message 1726305169412-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237446+03:00" level=debug msg="Processing Redis message 1726305169399-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237241+03:00" level=debug msg="Processing Redis message 1726305169415-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237151+03:00" level=debug msg="Processing Redis message 1726305169406-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237142+03:00" level=debug msg="Processing Redis message 1726305169409-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237216+03:00" level=debug msg="Processing Redis message 1726305169381-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
17:17:38.236 [grpc-default-executor-1] INFO  foo - Received: 
time="2024-09-27T17:17:38.250127+03:00" level=info msg="Unsubscribed from pubsub 'messagebus' topic 'stream-topic'" app_id=pubsubstreamit instance=ciocanu.corp.adobe.com scope=dapr.runtime.pubsub.streamer type=log ver=1.14.2
17:17:38.249 [grpc-default-executor-1] ERROR foo - Received error:
io.grpc.StatusRuntimeException: CANCELLED: Failed to read message.
	at io.grpc.Status.asRuntimeException(Status.java:533)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
	at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
	at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
	at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
	at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.grpc.StatusRuntimeException: INTERNAL: Invalid protobuf byte sequence
	at io.grpc.Status.asRuntimeException(Status.java:525)
	at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:240)
	at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:134)
	at io.grpc.MethodDescriptor.parseResponse(MethodDescriptor.java:284)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:657)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:644)
	... 5 common frames omitted
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message had invalid UTF-8.
	at com.google.protobuf.InvalidProtocolBufferException.invalidUtf8(InvalidProtocolBufferException.java:149)
	at com.google.protobuf.Utf8$UnsafeProcessor.decodeUtf8(Utf8.java:1365)
	at com.google.protobuf.Utf8.decodeUtf8(Utf8.java:318)
	at com.google.protobuf.CodedInputStream$ArrayDecoder.readStringRequireUtf8(CodedInputStream.java:788)
	at io.dapr.v1.DaprAppCallbackProtos$TopicEventRequest.<init>(DaprAppCallbackProtos.java:2278)
	at io.dapr.v1.DaprAppCallbackProtos$TopicEventRequest$1.parsePartialFrom(DaprAppCallbackProtos.java:4315)
	at io.dapr.v1.DaprAppCallbackProtos$TopicEventRequest$1.parsePartialFrom(DaprAppCallbackProtos.java:4309)
	at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:63)
	at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:25)
	at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parseFrom(ProtoLiteUtils.java:245)
	at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:237)
	... 9 common frames omitted
Stopping dapr application ...

NOTE: Here we have both Dapr sidecar logs and PubSubStreamIT test logs, I am not entirely sure why the ArrayDecoder fails, but I think it is a start and it is clear why the subscription is cancelled immediately.

@salaboy
Copy link
Contributor

salaboy commented Sep 28, 2024

Hmm interesting "Protocol message had invalid UTF-8" so it expects utf-8 but it's not getting that?

Also, we have 1.14.4 by now, it might be worth upgrading to that.

@artur-ciocanu
Copy link
Contributor

@artursouza I have finally figured out what is the issue. Currently in Java SDK we use the following proto files: https://raw.githubusercontent.com/dapr/dapr/v1.14.0-rc.2/dapr/proto. While in Dapr 1.14.+ runtime we use these: https://raw.githubusercontent.com/dapr/dapr/v1.14.{patch-version}/dapr/proto. Once I have used the protos for the latest Dapr release, I was able to consume messages in a streaming fashion using the PubSubStreamIT integration tests.

Here are the differences:

For v1.14.0-rc.2 we have:

rpc SubscribeTopicEventsAlpha1(stream SubscribeTopicEventsRequestAlpha1) returns (stream TopicEventRequest) {}

For v1.14.+ we have:

rpc SubscribeTopicEventsAlpha1(stream SubscribeTopicEventsRequestAlpha1) returns (stream SubscribeTopicEventsResponseAlpha1) {}

As we can see the released version has a completely different type for streaming the response. This explains the deserialization exception that I have run into, since Dapr was returning SubscribeTopicEventsResponseAlpha1, but on the Java side we wanted to deserialize it into TopicEventRequest.

Having all of the above, the question is what should be the value that we use in the parent POM for <dapr.proto.baseurl> do we use the latest Dapr release or something else.

CC: @cicoyle @salaboy

@artursouza
Copy link
Member Author

@artursouza I have finally figured out what is the issue. Currently in Java SDK we use the following proto files: https://raw.githubusercontent.com/dapr/dapr/v1.14.0-rc.2/dapr/proto. While in Dapr 1.14.+ runtime we use these: https://raw.githubusercontent.com/dapr/dapr/v1.14.{patch-version}/dapr/proto. Once I have used the protos for the latest Dapr release, I was able to consume messages in a streaming fashion using the PubSubStreamIT integration tests.

Here are the differences:

For v1.14.0-rc.2 we have:

rpc SubscribeTopicEventsAlpha1(stream SubscribeTopicEventsRequestAlpha1) returns (stream TopicEventRequest) {}

For v1.14.+ we have:

rpc SubscribeTopicEventsAlpha1(stream SubscribeTopicEventsRequestAlpha1) returns (stream SubscribeTopicEventsResponseAlpha1) {}

As we can see the released version has a completely different type for streaming the response. This explains the deserialization exception that I have run into, since Dapr was returning SubscribeTopicEventsResponseAlpha1, but on the Java side we wanted to deserialize it into TopicEventRequest.

Having all of the above, the question is what should be the value that we use in the parent POM for <dapr.proto.baseurl> do we use the latest Dapr release or something else.

CC: @cicoyle @salaboy

Correct. We should issue a patch release for the 1.12 Java SDK release. Let me cut a PR now.

Add bidi subscription to validate workflow.

Signed-off-by: Artur Souza <[email protected]>
@artur-ciocanu
Copy link
Contributor

@artursouza I was reviewing your PR and as far as I can see you adopted a callback approach via SubscriptionListener. I am wondering if using something like a Flux API is a more suitable approach:

<T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type);

We already use Project Reactor and most of the DaprClient methods return Mono<T>. I think using Flux<T> for asynchronous streams is more idiomatic and aligned with Project Reactor. If necessary client can use subscribeOn on the Flux to process message in a separate thread pool, if necessary.

The implementation that I am proposing should look something like this:

public <T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type) throws InterruptedException {
    return Flux.create(emitter -> {
      StreamObserver<DaprAppCallbackProtos.TopicEventRequest> responseObserver = new StreamObserver<>() {
        @Override
        public void onNext(...) {
          emitter.next(...);
        }

        @Override
        public void onError(Throwable throwable) {
          emitter.error(throwable);
        }

        @Override
        public void onCompleted() {
          emitter.complete();
        }
      };

      DaprProtos.SubscribeTopicEventsInitialRequestAlpha1 initialRequest =
          DaprProtos.SubscribeTopicEventsInitialRequestAlpha1.newBuilder()
              .setTopic(topic)
              .setPubsubName(pubsubName)
              .build();
      DaprProtos.SubscribeTopicEventsRequestAlpha1 request =
          DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
              .setInitialRequest(initialRequest)
              .build();
      
      var requestObserver = this.asyncStub.subscribeTopicEventsAlpha1(responseObserver);
      
      requestObserver.onNext(this.request);
    });
  }

Please take a look and let me know your thoughts.

CC: @cicoyle @salaboy

@artursouza
Copy link
Member Author

@artursouza I was reviewing your PR and as far as I can see you adopted a callback approach via SubscriptionListener. I am wondering if using something like a Flux API is a more suitable approach:

<T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type);

We already use Project Reactor and most of the DaprClient methods return Mono<T>. I think using Flux<T> for asynchronous streams is more idiomatic and aligned with Project Reactor. If necessary client can use subscribeOn on the Flux to process message in a separate thread pool, if necessary.

The implementation that I am proposing should look something like this:

public <T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type) throws InterruptedException {
    return Flux.create(emitter -> {
      StreamObserver<DaprAppCallbackProtos.TopicEventRequest> responseObserver = new StreamObserver<>() {
        @Override
        public void onNext(...) {
          emitter.next(...);
        }

        @Override
        public void onError(Throwable throwable) {
          emitter.error(throwable);
        }

        @Override
        public void onCompleted() {
          emitter.complete();
        }
      };

      DaprProtos.SubscribeTopicEventsInitialRequestAlpha1 initialRequest =
          DaprProtos.SubscribeTopicEventsInitialRequestAlpha1.newBuilder()
              .setTopic(topic)
              .setPubsubName(pubsubName)
              .build();
      DaprProtos.SubscribeTopicEventsRequestAlpha1 request =
          DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
              .setInitialRequest(initialRequest)
              .build();
      
      var requestObserver = this.asyncStub.subscribeTopicEventsAlpha1(responseObserver);
      
      requestObserver.onNext(this.request);
    });
  }

Please take a look and let me know your thoughts.

CC: @cicoyle @salaboy

I agree 100%. Although I am not a fan of project Reactor, I agree with the consistency. Let me give this a shot.

@artursouza
Copy link
Member Author

var requestObserver = this.asyncStub.subscribeTopicEventsAlpha1(responseObserver);

  requestObserver.onNext(this.request);

I am trying this. The challenge for this is how to process the ack/retry/drop response from the client code. Any suggestions?

@artursouza
Copy link
Member Author

I made a small change to make the subscription receive the status as a Mono in the listener, so it can be done async on the client side. I am happy to try another API. Remember that this is in the Preview interface, so we can change.

Copy link
Contributor

@cicoyle cicoyle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation lgtm. glad to have consistency

@artursouza artursouza merged commit cb552ba into dapr:master Oct 8, 2024
6 of 7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

Add Streaming Subscription Support
4 participants