diff --git a/library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt b/library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt index 45c283132..e516281d2 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt @@ -4,7 +4,6 @@ import androidx.test.ext.junit.runners.AndroidJUnit4 import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import org.junit.Assert.assertEquals import org.junit.Ignore import org.junit.Test @@ -79,8 +78,8 @@ class ConversationsTest { } @Test - @Ignore("Flaky Test") - fun testStreamAllMessages() = runBlocking { + @Ignore("CI Issues") + fun testStreamAllMessages() { val bo = PrivateKeyBuilder() val alix = PrivateKeyBuilder() val clientOptions = @@ -110,7 +109,6 @@ class ConversationsTest { val caro = PrivateKeyBuilder() val caroClient = Client().create(caro, clientOptions) val caroConversation = caroClient.conversations.newConversation(alixClient.address) - sleep(2500) for (i in 0 until 5) { diff --git a/library/src/androidTest/java/org/xmtp/android/library/LocalInstrumentedTest.kt b/library/src/androidTest/java/org/xmtp/android/library/LocalInstrumentedTest.kt index 3ed9beb54..c1836b4f0 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/LocalInstrumentedTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/LocalInstrumentedTest.kt @@ -30,7 +30,7 @@ import org.xmtp.proto.message.contents.PrivateKeyOuterClass.PrivateKeyBundle import java.util.Date @RunWith(AndroidJUnit4::class) -@Ignore("All Flaky") +@Ignore("CI Issues") class LocalInstrumentedTest { @Test fun testPublishingAndFetchingContactBundlesWithWhileGeneratingKeys() { diff --git a/library/src/androidTest/java/org/xmtp/android/library/TestHelpers.kt b/library/src/androidTest/java/org/xmtp/android/library/TestHelpers.kt index 3f7803f92..5421677e7 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/TestHelpers.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/TestHelpers.kt @@ -199,6 +199,15 @@ class FakeApiClient : ApiClient { } return flowOf() } + + override suspend fun subscribe2(request: Flow): Flow { + val env = stream.counts().first() + + if (request.first().contentTopicsList.contains(env.contentTopic)) { + return flowOf(env) + } + return flowOf() + } } data class Fixtures( diff --git a/library/src/main/java/org/xmtp/android/library/ApiClient.kt b/library/src/main/java/org/xmtp/android/library/ApiClient.kt index 6df15b33a..a8a19fe48 100644 --- a/library/src/main/java/org/xmtp/android/library/ApiClient.kt +++ b/library/src/main/java/org/xmtp/android/library/ApiClient.kt @@ -9,7 +9,6 @@ import kotlinx.coroutines.flow.Flow import org.xmtp.android.library.messages.Pagination import org.xmtp.android.library.messages.Topic import org.xmtp.proto.message.api.v1.MessageApiGrpcKt -import org.xmtp.proto.message.api.v1.MessageApiOuterClass import org.xmtp.proto.message.api.v1.MessageApiOuterClass.BatchQueryRequest import org.xmtp.proto.message.api.v1.MessageApiOuterClass.BatchQueryResponse import org.xmtp.proto.message.api.v1.MessageApiOuterClass.Cursor @@ -18,6 +17,7 @@ import org.xmtp.proto.message.api.v1.MessageApiOuterClass.PublishRequest import org.xmtp.proto.message.api.v1.MessageApiOuterClass.PublishResponse import org.xmtp.proto.message.api.v1.MessageApiOuterClass.QueryRequest import org.xmtp.proto.message.api.v1.MessageApiOuterClass.QueryResponse +import org.xmtp.proto.message.api.v1.MessageApiOuterClass.SubscribeRequest import java.io.Closeable import java.util.concurrent.TimeUnit @@ -35,6 +35,7 @@ interface ApiClient { suspend fun envelopes(topic: String, pagination: Pagination? = null): List suspend fun publish(envelopes: List): PublishResponse suspend fun subscribe(topics: List): Flow + suspend fun subscribe2(request: Flow): Flow } data class GRPCApiClient( @@ -81,6 +82,10 @@ data class GRPCApiClient( }.build() } }.build() + + fun makeSubscribeRequest( + topics: List, + ): SubscribeRequest = SubscribeRequest.newBuilder().addAllContentTopics(topics).build() } private val channel: ManagedChannel = @@ -174,8 +179,7 @@ data class GRPCApiClient( } override suspend fun subscribe(topics: List): Flow { - val request = - MessageApiOuterClass.SubscribeRequest.newBuilder().addAllContentTopics(topics).build() + val request = makeSubscribeRequest(topics) val headers = Metadata() headers.put(CLIENT_VERSION_HEADER_KEY, Constants.VERSION) @@ -185,6 +189,16 @@ data class GRPCApiClient( return client.subscribe(request, headers) } + override suspend fun subscribe2(request: Flow): Flow { + val headers = Metadata() + + headers.put(CLIENT_VERSION_HEADER_KEY, Constants.VERSION) + if (appVersion != null) { + headers.put(APP_VERSION_HEADER_KEY, appVersion) + } + + return client.subscribe2(request, headers) + } override fun close() { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS) diff --git a/library/src/main/java/org/xmtp/android/library/Client.kt b/library/src/main/java/org/xmtp/android/library/Client.kt index 637767855..9fa5f7fff 100644 --- a/library/src/main/java/org/xmtp/android/library/Client.kt +++ b/library/src/main/java/org/xmtp/android/library/Client.kt @@ -255,6 +255,10 @@ class Client() { return apiClient.subscribe(topics = topics) } + suspend fun subscribe2(request: Flow): Flow { + return apiClient.subscribe2(request = request) + } + suspend fun subscribeTopic(topics: List): Flow { return subscribe(topics.map { it.description }) } diff --git a/library/src/main/java/org/xmtp/android/library/Conversations.kt b/library/src/main/java/org/xmtp/android/library/Conversations.kt index 45f244fcc..154b8424c 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversations.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversations.kt @@ -1,12 +1,14 @@ package org.xmtp.android.library import android.util.Log -import kotlinx.coroutines.currentCoroutineContext +import io.grpc.StatusException +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.job import kotlinx.coroutines.runBlocking import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest +import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest import org.xmtp.android.library.messages.Envelope import org.xmtp.android.library.messages.EnvelopeBuilder import org.xmtp.android.library.messages.InvitationV1 @@ -365,9 +367,12 @@ data class Conversations( for (conversation in list()) { topics.add(conversation.topic) } + + val subscribeFlow = MutableStateFlow(makeSubscribeRequest(topics)) + while (true) { try { - client.subscribe(topics = topics).collect { envelope -> + client.subscribe2(request = subscribeFlow).collect { envelope -> when { conversationsByTopic.containsKey(envelope.contentTopic) -> { val conversation = conversationsByTopic[envelope.contentTopic] @@ -379,7 +384,7 @@ data class Conversations( val conversation = fromInvite(envelope = envelope) conversationsByTopic[conversation.topic] = conversation topics.add(conversation.topic) - currentCoroutineContext().job.cancel() + subscribeFlow.value = makeSubscribeRequest(topics) } envelope.contentTopic.startsWith("/xmtp/0/intro-") -> { @@ -388,12 +393,20 @@ data class Conversations( val decoded = conversation.decode(envelope) emit(decoded) topics.add(conversation.topic) - currentCoroutineContext().job.cancel() + subscribeFlow.value = makeSubscribeRequest(topics) } else -> {} } } + } catch (error: CancellationException) { + break + } catch (error: StatusException) { + if (error.status.code == io.grpc.Status.Code.UNAVAILABLE) { + continue + } else { + break + } } catch (error: Exception) { continue } diff --git a/library/src/test/java/org/xmtp/android/library/TestHelpers.kt b/library/src/test/java/org/xmtp/android/library/TestHelpers.kt index 4a85773d7..f9c0415f5 100644 --- a/library/src/test/java/org/xmtp/android/library/TestHelpers.kt +++ b/library/src/test/java/org/xmtp/android/library/TestHelpers.kt @@ -195,6 +195,15 @@ class FakeApiClient : ApiClient { } return flowOf() } + + override suspend fun subscribe2(request: Flow): Flow { + val env = stream.counts().first() + + if (request.first().contentTopicsList.contains(env.contentTopic)) { + return flowOf(env) + } + return flowOf() + } } data class Fixtures(val aliceAccount: PrivateKeyBuilder, val bobAccount: PrivateKeyBuilder) {