Skip to content

Commit

Permalink
Add functionality to get individual message sent callbacks via courie…
Browse files Browse the repository at this point in the history
…r service interface (#68)

* Add functionality to get individual message sent callbacks via courier service interface
  • Loading branch information
deepanshu42 authored Jun 14, 2023
1 parent 1ae8021 commit 1c9ce1c
Show file tree
Hide file tree
Showing 18 changed files with 159 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package com.gojek.courier.app.data.network

import com.gojek.courier.QoS
import com.gojek.courier.annotation.Callback
import com.gojek.courier.annotation.Data
import com.gojek.courier.annotation.Path
import com.gojek.courier.annotation.Send
import com.gojek.courier.annotation.Subscribe
import com.gojek.courier.annotation.Unsubscribe
import com.gojek.courier.app.data.network.model.Message
import com.gojek.courier.callback.SendMessageCallback
import io.reactivex.Observable

interface CourierService {
@Send(topic = "{topic}", qos = QoS.ONE)
fun publish(@Path("topic") topic: String, @Data message: Message)
fun publish(@Path("topic") topic: String, @Data message: Message, @Callback callback: SendMessageCallback)

@Subscribe(topic = "{topic}")
fun subscribe(@Path("topic") topic: String): Observable<Message>
Expand Down
21 changes: 20 additions & 1 deletion app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.gojek.courier.app.ui

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import com.gojek.chuckmqtt.external.MqttChuckConfig
import com.gojek.chuckmqtt.external.MqttChuckInterceptor
Expand All @@ -9,6 +10,7 @@ import com.gojek.courier.Courier
import com.gojek.courier.app.R
import com.gojek.courier.app.data.network.CourierService
import com.gojek.courier.app.data.network.model.Message
import com.gojek.courier.callback.SendMessageCallback
import com.gojek.courier.logging.ILogger
import com.gojek.courier.messageadapter.gson.GsonMessageAdapterFactory
import com.gojek.courier.streamadapter.rxjava2.RxJava2StreamAdapterFactory
Expand Down Expand Up @@ -78,7 +80,24 @@ class MainActivity : AppCompatActivity() {
send.setOnClickListener {
courierService.publish(
topic = topic.text.toString(),
message = Message(123, message.text.toString())
message = Message(123, message.text.toString()),
callback = object : SendMessageCallback {
override fun onMessageSendTrigger() {
Log.d("Courier", "onMessageSendTrigger")
}

override fun onMessageWrittenOnSocket() {
Log.d("Courier", "onMessageWrittenOnSocket")
}

override fun onMessageSendSuccess() {
Log.d("Courier", "onMessageSendSuccess")
}

override fun onMessageSendFailure(error: Throwable) {
Log.d("Courier", "onMessageSendFailure")
}
}
)
}

Expand Down
15 changes: 15 additions & 0 deletions courier-core/api/courier-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ public abstract interface class com/gojek/courier/StreamAdapter$Factory {
public abstract fun create (Ljava/lang/reflect/Type;)Lcom/gojek/courier/StreamAdapter;
}

public final class com/gojek/courier/callback/NoOpSendMessageCallback : com/gojek/courier/callback/SendMessageCallback {
public static final field INSTANCE Lcom/gojek/courier/callback/NoOpSendMessageCallback;
public fun onMessageSendFailure (Ljava/lang/Throwable;)V
public fun onMessageSendSuccess ()V
public fun onMessageSendTrigger ()V
public fun onMessageWrittenOnSocket ()V
}

public abstract interface class com/gojek/courier/callback/SendMessageCallback {
public abstract fun onMessageSendFailure (Ljava/lang/Throwable;)V
public abstract fun onMessageSendSuccess ()V
public abstract fun onMessageSendTrigger ()V
public abstract fun onMessageWrittenOnSocket ()V
}

public final class com/gojek/courier/extensions/CollectionExtensionsKt {
public static final fun toImmutableMap (Ljava/util/Map;)Ljava/util/Map;
public static final fun toImmutableSet (Ljava/util/Set;)Ljava/util/Set;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.gojek.courier.callback

object NoOpSendMessageCallback : SendMessageCallback {
override fun onMessageSendTrigger() {
// no-op
}

override fun onMessageWrittenOnSocket() {
// no-op
}

override fun onMessageSendSuccess() {
// no-op
}

override fun onMessageSendFailure(error: Throwable) {
// no-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.gojek.courier.callback

interface SendMessageCallback {
fun onMessageSendTrigger()
fun onMessageWrittenOnSocket()
fun onMessageSendSuccess()
fun onMessageSendFailure(error: Throwable)
}
3 changes: 3 additions & 0 deletions courier/api/courier.api
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public final class com/gojek/courier/Courier$Configuration {
public fun toString ()Ljava/lang/String;
}

public abstract interface annotation class com/gojek/courier/annotation/Callback : java/lang/annotation/Annotation {
}

public abstract interface annotation class com/gojek/courier/annotation/Data : java/lang/annotation/Annotation {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.gojek.courier.annotation

@MustBeDocumented
@Target(AnnotationTarget.VALUE_PARAMETER)
@kotlin.annotation.Retention(AnnotationRetention.RUNTIME)
annotation class Callback
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.gojek.courier.annotation.parser

import com.gojek.courier.QoS
import com.gojek.courier.annotation.Callback
import com.gojek.courier.annotation.Data
import com.gojek.courier.annotation.Path
import com.gojek.courier.annotation.Receive
Expand All @@ -13,6 +14,7 @@ import com.gojek.courier.argument.processor.ReceiveArgumentProcessor
import com.gojek.courier.argument.processor.SendArgumentProcessor
import com.gojek.courier.argument.processor.SubscriptionArgumentProcessor
import com.gojek.courier.argument.processor.UnsubscriptionArgumentProcessor
import com.gojek.courier.callback.SendMessageCallback
import com.gojek.courier.stub.StubMethod
import com.gojek.courier.utils.MessageAdapterResolver
import com.gojek.courier.utils.StreamAdapterResolver
Expand Down Expand Up @@ -87,7 +89,8 @@ internal class MethodAnnotationsParser(
val messageType = method.getDataParameterType(dataParameterIndex)
val annotations = method.getDataParameterAnnotations(dataParameterIndex)
val adapter = messageAdapterResolver.resolve(messageType, annotations)
val argumentProcessor = SendArgumentProcessor(pathMap, topic, dataParameterIndex)
val callbackIndex = method.getCallbackParameterIndex()
val argumentProcessor = SendArgumentProcessor(pathMap, topic, dataParameterIndex, callbackIndex)
stubMethod = StubMethod.Send(adapter, qos, argumentProcessor)
}

Expand Down Expand Up @@ -226,7 +229,7 @@ internal class MethodAnnotationsParser(
private val PARAM_NAME_REGEX = Pattern.compile(PARAM)

private fun Annotation.isParameterAnnotation(): Boolean {
return this is Path || this is Data || this is TopicMap
return this is Path || this is Data || this is TopicMap || this is Callback
}

private fun Annotation.isStubMethodAnnotation(): Boolean {
Expand Down Expand Up @@ -268,6 +271,29 @@ internal class MethodAnnotationsParser(
return index
}

private fun Method.getCallbackParameterIndex(): Int {
var index = -1
for (parameterIndex in parameterAnnotations.indices) {
val parameterAnnotations = parameterAnnotations[parameterIndex]
val annotations = parameterAnnotations.filter { it.isParameterAnnotation() }
require(annotations.size == 1) {
"A parameter must have one and only one parameter annotation: $parameterIndex"
}
if (annotations.first() is Callback) {
if (index == -1) {
index = parameterIndex
break
} else {
throw IllegalArgumentException("Multiple parameters found with @Callback annotation")
}
}
}
if (index != -1 && parameterTypes[index] != SendMessageCallback::class.java) {
throw IllegalArgumentException("Parameter annotated with @Callback should be of type SendMessageCallback: ${parameterTypes[index]}")
}
return index
}

private fun Method.getDataParameterType(index: Int): Type {
return parameterTypes[index]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.gojek.courier.argument.processor

import com.gojek.courier.callback.NoOpSendMessageCallback
import com.gojek.courier.callback.SendMessageCallback

internal class SendArgumentProcessor(
private val pathMap: Map<String, Int>,
private val topic: String,
private val dataParameterIndex: Int
private val dataParameterIndex: Int,
private val callbackIndex: Int
) : ArgumentProcessor() {
private var parsedTopic = topic

Expand All @@ -21,4 +25,12 @@ internal class SendArgumentProcessor(
fun getDataArgument(args: Array<Any>): Any {
return args[dataParameterIndex]
}

fun getCallbackArgument(args: Array<Any>): SendMessageCallback {
return if (callbackIndex == -1) {
NoOpSendMessageCallback
} else {
args[callbackIndex] as SendMessageCallback
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ internal class Coordinator(
val data = stubMethod.argumentProcessor.getDataArgument(args)
stubMethod.argumentProcessor.inject(args)
val topic = stubMethod.argumentProcessor.getTopic()
val callback = stubMethod.argumentProcessor.getCallbackArgument(args)
val message = stubMethod.messageAdapter.toMessage(topic, data)
val qos = stubMethod.qos
val sent = client.send(message, topic, qos)
val sent = client.send(message, topic, qos, callback)
logger.d("Coordinator", "Sending message on topic: $topic, qos: $qos, message: $data")
return sent
}
Expand Down
3 changes: 2 additions & 1 deletion mqtt-client/api/mqtt-client.api
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ public abstract interface class com/gojek/mqtt/client/MqttClient {
public abstract fun reconnect ()V
public abstract fun removeEventHandler (Lcom/gojek/mqtt/event/EventHandler;)V
public abstract fun removeMessageListener (Ljava/lang/String;Lcom/gojek/mqtt/client/listener/MessageListener;)V
public abstract fun send (Lcom/gojek/courier/Message;Ljava/lang/String;Lcom/gojek/courier/QoS;)Z
public abstract fun send (Lcom/gojek/courier/Message;Ljava/lang/String;Lcom/gojek/courier/QoS;Lcom/gojek/courier/callback/SendMessageCallback;)Z
public abstract fun subscribe (Lkotlin/Pair;[Lkotlin/Pair;)V
public abstract fun unsubscribe (Ljava/lang/String;[Ljava/lang/String;)V
}

public final class com/gojek/mqtt/client/MqttClient$DefaultImpls {
public static synthetic fun disconnect$default (Lcom/gojek/mqtt/client/MqttClient;ZILjava/lang/Object;)V
public static synthetic fun send$default (Lcom/gojek/mqtt/client/MqttClient;Lcom/gojek/courier/Message;Ljava/lang/String;Lcom/gojek/courier/QoS;Lcom/gojek/courier/callback/SendMessageCallback;ILjava/lang/Object;)Z
}

public abstract interface class com/gojek/mqtt/client/MqttInterceptor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.gojek.mqtt.client

import com.gojek.courier.Message
import com.gojek.courier.QoS
import com.gojek.courier.callback.NoOpSendMessageCallback
import com.gojek.courier.callback.SendMessageCallback
import com.gojek.mqtt.client.listener.MessageListener
import com.gojek.mqtt.client.model.ConnectionState
import com.gojek.mqtt.event.EventHandler
Expand All @@ -14,7 +16,12 @@ interface MqttClient {
fun reconnect()
fun subscribe(topic: Pair<String, QoS>, vararg topics: Pair<String, QoS>)
fun unsubscribe(topic: String, vararg topics: String)
fun send(message: Message, topic: String, qos: QoS): Boolean
fun send(
message: Message,
topic: String,
qos: QoS,
sendMessageCallback: SendMessageCallback = NoOpSendMessageCallback
): Boolean
fun addMessageListener(topic: String, listener: MessageListener)
fun removeMessageListener(topic: String, listener: MessageListener)
fun addGlobalMessageListener(listener: MessageListener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.gojek.mqtt.client

import com.gojek.courier.Message
import com.gojek.courier.QoS
import com.gojek.courier.callback.SendMessageCallback
import com.gojek.mqtt.client.internal.MqttClientInternal
import com.gojek.mqtt.client.listener.MessageListener
import com.gojek.mqtt.client.model.ConnectionState
Expand Down Expand Up @@ -36,8 +37,8 @@ internal class MqttCourierClient(
mqttClient.unsubscribe(topic, *topics)
}

override fun send(message: Message, topic: String, qos: QoS): Boolean {
return mqttClient.send(MqttPacket((message as Message.Bytes).value, topic, qos))
override fun send(message: Message, topic: String, qos: QoS, sendMessageCallback: SendMessageCallback): Boolean {
return mqttClient.send(MqttPacket((message as Message.Bytes).value, topic, qos), sendMessageCallback)
}

override fun addMessageListener(topic: String, listener: MessageListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.gojek.mqtt.client.internal

import android.content.Context
import com.gojek.courier.QoS
import com.gojek.courier.callback.SendMessageCallback
import com.gojek.keepalive.KeepAliveFailureHandler
import com.gojek.keepalive.NoOpKeepAliveFailureHandler
import com.gojek.keepalive.OptimalKeepAliveFailureHandler
Expand Down Expand Up @@ -97,8 +98,8 @@ internal class MqttClientInternal(
androidMqttClient.unsubscribe(listOf(*topics))
}

fun send(mqttPacket: MqttPacket): Boolean {
return androidMqttClient.send(mqttPacket)
fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean {
return androidMqttClient.send(mqttPacket, sendMessageCallback)
}

fun addMessageListener(topic: String, listener: MessageListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@ package com.gojek.mqtt.client.model

import android.os.Parcel
import android.os.Parcelable
import com.gojek.courier.callback.NoOpSendMessageCallback
import com.gojek.courier.callback.SendMessageCallback

internal data class MqttSendPacket(
var message: ByteArray,
var messageId: Long,
var timestamp: Long,
var qos: Int,
var topic: String,
var type: Int
var type: Int,
var sendMessageCallback: SendMessageCallback
) : Parcelable {
constructor(parcel: Parcel) : this(
parcel.createByteArray()!!,
parcel.readLong(),
parcel.readLong(),
parcel.readInt(),
parcel.readString()!!,
parcel.readInt()
parcel.readInt(),
NoOpSendMessageCallback
)

override fun writeToParcel(parcel: Parcel, flags: Int) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.gojek.mqtt.client.v3

import com.gojek.courier.QoS
import com.gojek.courier.callback.SendMessageCallback
import com.gojek.mqtt.client.listener.MessageListener
import com.gojek.mqtt.client.model.ConnectionState
import com.gojek.mqtt.constants.MQTT_WAIT_BEFORE_RECONNECT_TIME_MS
Expand All @@ -12,7 +13,7 @@ internal interface IAndroidMqttClient {
fun connect(timeMillis: Long = MQTT_WAIT_BEFORE_RECONNECT_TIME_MS)
fun reconnect()
fun disconnect(clearState: Boolean = false)
fun send(mqttPacket: MqttPacket): Boolean
fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean
fun addMessageListener(topic: String, listener: MessageListener)
fun removeMessageListener(topic: String, listener: MessageListener)
fun addGlobalMessageListener(listener: MessageListener)
Expand Down
Loading

0 comments on commit 1c9ce1c

Please sign in to comment.