Skip to content

Commit

Permalink
Updates to Stream All Messages (#131)
Browse files Browse the repository at this point in the history
* first pass on upgrading to the subscribe2 functions

* update to use the new subscibe2 and make sure all the tests pass

* fix up linter issue

* try to fix the grpc error

* fix up the tests

* do todos for tests
  • Loading branch information
nplasterer authored Nov 7, 2023
1 parent 3ac6990 commit 6f1d807
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import androidx.test.ext.junit.runners.AndroidJUnit4
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Ignore
import org.junit.Test
Expand Down Expand Up @@ -79,8 +78,8 @@ class ConversationsTest {
}

@Test
@Ignore("Flaky Test")
fun testStreamAllMessages() = runBlocking {
@Ignore("CI Issues")
fun testStreamAllMessages() {
val bo = PrivateKeyBuilder()
val alix = PrivateKeyBuilder()
val clientOptions =
Expand Down Expand Up @@ -110,7 +109,6 @@ class ConversationsTest {
val caro = PrivateKeyBuilder()
val caroClient = Client().create(caro, clientOptions)
val caroConversation = caroClient.conversations.newConversation(alixClient.address)

sleep(2500)

for (i in 0 until 5) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.xmtp.proto.message.contents.PrivateKeyOuterClass.PrivateKeyBundle
import java.util.Date

@RunWith(AndroidJUnit4::class)
@Ignore("All Flaky")
@Ignore("CI Issues")
class LocalInstrumentedTest {
@Test
fun testPublishingAndFetchingContactBundlesWithWhileGeneratingKeys() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ class FakeApiClient : ApiClient {
}
return flowOf()
}

override suspend fun subscribe2(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<MessageApiOuterClass.Envelope> {
val env = stream.counts().first()

if (request.first().contentTopicsList.contains(env.contentTopic)) {
return flowOf(env)
}
return flowOf()
}
}

data class Fixtures(
Expand Down
20 changes: 17 additions & 3 deletions library/src/main/java/org/xmtp/android/library/ApiClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import kotlinx.coroutines.flow.Flow
import org.xmtp.android.library.messages.Pagination
import org.xmtp.android.library.messages.Topic
import org.xmtp.proto.message.api.v1.MessageApiGrpcKt
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.BatchQueryRequest
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.BatchQueryResponse
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.Cursor
Expand All @@ -18,6 +17,7 @@ import org.xmtp.proto.message.api.v1.MessageApiOuterClass.PublishRequest
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.PublishResponse
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.QueryRequest
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.QueryResponse
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.SubscribeRequest
import java.io.Closeable
import java.util.concurrent.TimeUnit

Expand All @@ -35,6 +35,7 @@ interface ApiClient {
suspend fun envelopes(topic: String, pagination: Pagination? = null): List<Envelope>
suspend fun publish(envelopes: List<Envelope>): PublishResponse
suspend fun subscribe(topics: List<String>): Flow<Envelope>
suspend fun subscribe2(request: Flow<SubscribeRequest>): Flow<Envelope>
}

data class GRPCApiClient(
Expand Down Expand Up @@ -81,6 +82,10 @@ data class GRPCApiClient(
}.build()
}
}.build()

fun makeSubscribeRequest(
topics: List<String>,
): SubscribeRequest = SubscribeRequest.newBuilder().addAllContentTopics(topics).build()
}

private val channel: ManagedChannel =
Expand Down Expand Up @@ -174,8 +179,7 @@ data class GRPCApiClient(
}

override suspend fun subscribe(topics: List<String>): Flow<Envelope> {
val request =
MessageApiOuterClass.SubscribeRequest.newBuilder().addAllContentTopics(topics).build()
val request = makeSubscribeRequest(topics)
val headers = Metadata()

headers.put(CLIENT_VERSION_HEADER_KEY, Constants.VERSION)
Expand All @@ -185,6 +189,16 @@ data class GRPCApiClient(

return client.subscribe(request, headers)
}
override suspend fun subscribe2(request: Flow<SubscribeRequest>): Flow<Envelope> {
val headers = Metadata()

headers.put(CLIENT_VERSION_HEADER_KEY, Constants.VERSION)
if (appVersion != null) {
headers.put(APP_VERSION_HEADER_KEY, appVersion)
}

return client.subscribe2(request, headers)
}

override fun close() {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS)
Expand Down
4 changes: 4 additions & 0 deletions library/src/main/java/org/xmtp/android/library/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ class Client() {
return apiClient.subscribe(topics = topics)
}

suspend fun subscribe2(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<Envelope> {
return apiClient.subscribe2(request = request)
}

suspend fun subscribeTopic(topics: List<Topic>): Flow<Envelope> {
return subscribe(topics.map { it.description })
}
Expand Down
23 changes: 18 additions & 5 deletions library/src/main/java/org/xmtp/android/library/Conversations.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package org.xmtp.android.library

import android.util.Log
import kotlinx.coroutines.currentCoroutineContext
import io.grpc.StatusException
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.job
import kotlinx.coroutines.runBlocking
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
import org.xmtp.android.library.messages.Envelope
import org.xmtp.android.library.messages.EnvelopeBuilder
import org.xmtp.android.library.messages.InvitationV1
Expand Down Expand Up @@ -365,9 +367,12 @@ data class Conversations(
for (conversation in list()) {
topics.add(conversation.topic)
}

val subscribeFlow = MutableStateFlow(makeSubscribeRequest(topics))

while (true) {
try {
client.subscribe(topics = topics).collect { envelope ->
client.subscribe2(request = subscribeFlow).collect { envelope ->
when {
conversationsByTopic.containsKey(envelope.contentTopic) -> {
val conversation = conversationsByTopic[envelope.contentTopic]
Expand All @@ -379,7 +384,7 @@ data class Conversations(
val conversation = fromInvite(envelope = envelope)
conversationsByTopic[conversation.topic] = conversation
topics.add(conversation.topic)
currentCoroutineContext().job.cancel()
subscribeFlow.value = makeSubscribeRequest(topics)
}

envelope.contentTopic.startsWith("/xmtp/0/intro-") -> {
Expand All @@ -388,12 +393,20 @@ data class Conversations(
val decoded = conversation.decode(envelope)
emit(decoded)
topics.add(conversation.topic)
currentCoroutineContext().job.cancel()
subscribeFlow.value = makeSubscribeRequest(topics)
}

else -> {}
}
}
} catch (error: CancellationException) {
break
} catch (error: StatusException) {
if (error.status.code == io.grpc.Status.Code.UNAVAILABLE) {
continue
} else {
break
}
} catch (error: Exception) {
continue
}
Expand Down
9 changes: 9 additions & 0 deletions library/src/test/java/org/xmtp/android/library/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ class FakeApiClient : ApiClient {
}
return flowOf()
}

override suspend fun subscribe2(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<MessageApiOuterClass.Envelope> {
val env = stream.counts().first()

if (request.first().contentTopicsList.contains(env.contentTopic)) {
return flowOf(env)
}
return flowOf()
}
}

data class Fixtures(val aliceAccount: PrivateKeyBuilder, val bobAccount: PrivateKeyBuilder) {
Expand Down

0 comments on commit 6f1d807

Please sign in to comment.