Skip to content

Commit

Permalink
Fix: Stream All Messages not correctly continuing flow (#123)
Browse files Browse the repository at this point in the history
* fix: stream all messages

* fix up the lint issues

* replaced with the new test

* move it out of the while for performance

* Update docker-compose.yml

* work around no longer needed
  • Loading branch information
nplasterer authored Sep 30, 2023
1 parent 5cd45da commit 9561552
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 35 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ jobs:
uses: gradle/gradle-build-action@v2
- name: Validate Gradle Wrapper
uses: gradle/wrapper-validation-action@v1
# Workaround for https://github.com/actions/runner-images/issues/8104
- name: Fix Qemu Error
run: |
brew remove --ignore-dependencies qemu
curl -o ./qemu.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/f88e30b3a23ef3735580f9b05535ce5a0a03c9e3/Formula/qemu.rb
brew install ./qemu.rb
- name: Set up Docker
run: brew install docker docker-compose
- name: Start Colima
Expand Down
2 changes: 1 addition & 1 deletion dev/local/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ services:
depends_on:
wakunode:
condition: service_healthy
build: ./test
build: ./test
Original file line number Diff line number Diff line change
Expand Up @@ -557,18 +557,6 @@ class ConversationTest {
}
}

@Test
fun testStreamAllMessagesGetsMessageFromKnownConversation() = kotlinx.coroutines.test.runTest {
val fixtures = fixtures()
val client = fixtures.aliceClient
val bobConversation = fixtures.bobClient.conversations.newConversation(client.address)
client.conversations.streamAllMessages().test {
bobConversation.send(text = "hi")
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
awaitComplete()
}
}

@Test
fun testV2RejectsSpoofedContactBundles() {
val topic = "/xmtp/0/m-Gdb7oj5nNdfZ3MJFLAcS4WTABgr6al1hePy6JV1-QUE/proto"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package org.xmtp.android.library

import androidx.test.ext.junit.runners.AndroidJUnit4
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Test
import org.junit.runner.RunWith
Expand All @@ -16,6 +20,7 @@ import org.xmtp.android.library.messages.createDeterministic
import org.xmtp.android.library.messages.getPublicKeyBundle
import org.xmtp.android.library.messages.toPublicKeyBundle
import org.xmtp.android.library.messages.walletAddress
import java.lang.Thread.sleep
import java.util.Date

@RunWith(AndroidJUnit4::class)
Expand Down Expand Up @@ -71,4 +76,65 @@ class ConversationsTest {
assertEquals(conversation.peerAddress, newWallet.address)
assertEquals(conversation.createdAt.time, created.time)
}

@Test
fun testStreamAllMessages() = runBlocking {
val bo = PrivateKeyBuilder()
val alix = PrivateKeyBuilder()
val clientOptions =
ClientOptions(api = ClientOptions.Api(env = XMTPEnvironment.LOCAL, isSecure = false))
val boClient = Client().create(bo, clientOptions)
val alixClient = Client().create(alix, clientOptions)
val boConversation = boClient.conversations.newConversation(alixClient.address)

// Record message stream across all conversations
val allMessages = mutableListOf<DecodedMessage>()

val job = CoroutineScope(Dispatchers.IO).launch {
try {
alixClient.conversations.streamAllMessages().collect { message ->
allMessages.add(message)
}
} catch (e: Exception) {}
}
sleep(2500)

for (i in 0 until 5) {
boConversation.send(text = "Message $i")
sleep(1000)
}
assertEquals(allMessages.size, 5)

val caro = PrivateKeyBuilder()
val caroClient = Client().create(caro, clientOptions)
val caroConversation = caroClient.conversations.newConversation(alixClient.address)

sleep(2500)

for (i in 0 until 5) {
caroConversation.send(text = "Message $i")
sleep(1000)
}

assertEquals(allMessages.size, 10)

job.cancel()

CoroutineScope(Dispatchers.IO).launch {
try {
alixClient.conversations.streamAllMessages().collect { message ->
allMessages.add(message)
}
} catch (e: Exception) {
}
}
sleep(2500)

for (i in 0 until 5) {
boConversation.send(text = "Message $i")
sleep(1000)
}

assertEquals(allMessages.size, 15)
}
}
35 changes: 19 additions & 16 deletions library/src/main/java/org/xmtp/android/library/Conversations.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package org.xmtp.android.library

import android.util.Log
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.job
import kotlinx.coroutines.runBlocking
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
import org.xmtp.android.library.messages.Envelope
Expand Down Expand Up @@ -354,16 +356,15 @@ data class Conversations(
}

fun streamAllMessages(): Flow<DecodedMessage> = flow {
while (true) {
val topics = mutableListOf(
Topic.userInvite(client.address).description,
Topic.userIntro(client.address).description
)

for (conversation in list()) {
topics.add(conversation.topic)
}
val topics = mutableListOf(
Topic.userInvite(client.address).description,
Topic.userIntro(client.address).description
)

for (conversation in list()) {
topics.add(conversation.topic)
}
while (true) {
try {
client.subscribe(topics = topics).collect { envelope ->
when {
Expand All @@ -374,24 +375,26 @@ data class Conversations(
}

envelope.contentTopic.startsWith("/xmtp/0/invite-") -> {
val conversation = fromInvite(envelope)
val conversation = fromInvite(envelope = envelope)
conversationsByTopic[conversation.topic] = conversation
// Break so we can resubscribe with the new conversation
return@collect
topics.add(conversation.topic)
currentCoroutineContext().job.cancel()
}

envelope.contentTopic.startsWith("/xmtp/0/intro-") -> {
val conversation = fromIntro(envelope)
val conversation = fromIntro(envelope = envelope)
conversationsByTopic[conversation.topic] = conversation
val decoded = conversation.decode(envelope)
emit(decoded)
// Break so we can resubscribe with the new conversation
return@collect
topics.add(conversation.topic)
currentCoroutineContext().job.cancel()
}

else -> {}
}
}
} catch (error: Exception) {
throw error
continue
}
}
}
Expand Down

0 comments on commit 9561552

Please sign in to comment.