diff --git a/library/src/androidTest/java/org/xmtp/android/library/DmTest.kt b/library/src/androidTest/java/org/xmtp/android/library/DmTest.kt new file mode 100644 index 000000000..9a5a9db20 --- /dev/null +++ b/library/src/androidTest/java/org/xmtp/android/library/DmTest.kt @@ -0,0 +1,359 @@ +package org.xmtp.android.library + +import androidx.test.ext.junit.runners.AndroidJUnit4 +import androidx.test.platform.app.InstrumentationRegistry +import app.cash.turbine.test +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.Assert.assertEquals +import org.junit.Assert.assertThrows +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.xmtp.android.library.codecs.ContentTypeReaction +import org.xmtp.android.library.codecs.Reaction +import org.xmtp.android.library.codecs.ReactionAction +import org.xmtp.android.library.codecs.ReactionCodec +import org.xmtp.android.library.codecs.ReactionSchema +import org.xmtp.android.library.messages.DecryptedMessage +import org.xmtp.android.library.messages.MessageDeliveryStatus +import org.xmtp.android.library.messages.PrivateKey +import org.xmtp.android.library.messages.PrivateKeyBuilder +import org.xmtp.android.library.messages.walletAddress +import java.security.SecureRandom + +@RunWith(AndroidJUnit4::class) +class DmTest { + private lateinit var alixWallet: PrivateKeyBuilder + private lateinit var boWallet: PrivateKeyBuilder + private lateinit var caroWallet: PrivateKeyBuilder + private lateinit var alix: PrivateKey + private lateinit var alixClient: Client + private lateinit var bo: PrivateKey + private lateinit var boClient: Client + private lateinit var caro: PrivateKey + private lateinit var caroClient: Client + + @Before + fun setUp() { + val key = SecureRandom().generateSeed(32) + val context = InstrumentationRegistry.getInstrumentation().targetContext + alixWallet = PrivateKeyBuilder() + alix = alixWallet.getPrivateKey() + alixClient = runBlocking { + Client().createV3( + account = alixWallet, + options = ClientOptions( + ClientOptions.Api(XMTPEnvironment.LOCAL, false), + enableV3 = true, + appContext = context, + dbEncryptionKey = key + ) + ) + } + boWallet = PrivateKeyBuilder() + bo = boWallet.getPrivateKey() + boClient = runBlocking { + Client().createV3( + account = boWallet, + options = ClientOptions( + ClientOptions.Api(XMTPEnvironment.LOCAL, false), + enableV3 = true, + appContext = context, + dbEncryptionKey = key + ) + ) + } + + caroWallet = PrivateKeyBuilder() + caro = caroWallet.getPrivateKey() + caroClient = runBlocking { + Client().createV3( + account = caroWallet, + options = ClientOptions( + ClientOptions.Api(XMTPEnvironment.LOCAL, false), + enableV3 = true, + appContext = context, + dbEncryptionKey = key + ) + ) + } + } + + @Test + fun testCanCreateADm() { + runBlocking { + val convo1 = boClient.conversations.findOrCreateDm(alix.walletAddress) + alixClient.conversations.syncConversations() + val sameConvo1 = alixClient.conversations.findOrCreateDm(bo.walletAddress) + assertEquals(convo1.id, sameConvo1.id) + } + } + + @Test + fun testCanListDmMembers() { + val dm = runBlocking { + boClient.conversations.findOrCreateDm( + alix.walletAddress, + ) + } + assertEquals( + runBlocking { dm.members().map { it.inboxId }.sorted() }, + listOf( + alixClient.inboxId, + boClient.inboxId + ).sorted() + ) + + assertEquals( + runBlocking { + Conversation.Dm(dm).members().map { it.inboxId }.sorted() + }, + listOf( + alixClient.inboxId, + boClient.inboxId + ).sorted() + ) + + assertEquals( + runBlocking + { dm.peerInboxId() }, + alixClient.inboxId, + ) + } + + @Test + fun testCannotCreateDmWithMemberNotOnV3() { + val chuxAccount = PrivateKeyBuilder() + val chux: PrivateKey = chuxAccount.getPrivateKey() + runBlocking { Client().create(account = chuxAccount) } + + assertThrows("Recipient not on network", XMTPException::class.java) { + runBlocking { boClient.conversations.findOrCreateDm(chux.walletAddress) } + } + } + + @Test + fun testCannotStartDmWithSelf() { + assertThrows("Recipient is sender", XMTPException::class.java) { + runBlocking { boClient.conversations.findOrCreateDm(bo.walletAddress) } + } + } + + @Test + fun testDmStartsWithAllowedState() { + runBlocking { + val dm = boClient.conversations.findOrCreateDm(alix.walletAddress) + dm.send("howdy") + dm.send("gm") + dm.sync() + assert(boClient.contacts.isGroupAllowed(dm.id)) + assertEquals(boClient.contacts.consentList.groupState(dm.id), ConsentState.ALLOWED) + assertEquals(dm.consentState(), ConsentState.ALLOWED) + } + } + + @Test + fun testCanSendMessageToDm() { + val dm = runBlocking { boClient.conversations.findOrCreateDm(alix.walletAddress) } + runBlocking { dm.send("howdy") } + val messageId = runBlocking { dm.send("gm") } + runBlocking { dm.sync() } + assertEquals(dm.messages().first().body, "gm") + assertEquals(dm.messages().first().id, messageId) + assertEquals(dm.messages().first().deliveryStatus, MessageDeliveryStatus.PUBLISHED) + assertEquals(dm.messages().size, 3) + + runBlocking { alixClient.conversations.syncConversations() } + val sameDm = runBlocking { alixClient.conversations.listDms().last() } + runBlocking { sameDm.sync() } + assertEquals(sameDm.messages().size, 2) + assertEquals(sameDm.messages().first().body, "gm") + } + + @Test + fun testCanListDmMessages() { + val dm = runBlocking { boClient.conversations.findOrCreateDm(alix.walletAddress) } + runBlocking { + dm.send("howdy") + dm.send("gm") + } + + assertEquals(dm.messages().size, 3) + assertEquals(dm.messages(deliveryStatus = MessageDeliveryStatus.PUBLISHED).size, 3) + runBlocking { dm.sync() } + assertEquals(dm.messages().size, 3) + assertEquals(dm.messages(deliveryStatus = MessageDeliveryStatus.UNPUBLISHED).size, 0) + assertEquals(dm.messages(deliveryStatus = MessageDeliveryStatus.PUBLISHED).size, 3) + + runBlocking { alixClient.conversations.syncConversations() } + val sameDm = runBlocking { alixClient.conversations.listDms().last() } + runBlocking { sameDm.sync() } + assertEquals(sameDm.messages(deliveryStatus = MessageDeliveryStatus.PUBLISHED).size, 2) + } + + @Test + fun testCanSendContentTypesToDm() { + Client.register(codec = ReactionCodec()) + + val dm = runBlocking { boClient.conversations.findOrCreateDm(alix.walletAddress) } + runBlocking { dm.send("gm") } + runBlocking { dm.sync() } + val messageToReact = dm.messages()[0] + + val reaction = Reaction( + reference = messageToReact.id, + action = ReactionAction.Added, + content = "U+1F603", + schema = ReactionSchema.Unicode + ) + + runBlocking { + dm.send( + content = reaction, + options = SendOptions(contentType = ContentTypeReaction) + ) + } + runBlocking { dm.sync() } + + val messages = dm.messages() + assertEquals(messages.size, 3) + val content: Reaction? = messages.first().content() + assertEquals("U+1F603", content?.content) + assertEquals(messageToReact.id, content?.reference) + assertEquals(ReactionAction.Added, content?.action) + assertEquals(ReactionSchema.Unicode, content?.schema) + } + + @Test + fun testCanStreamDmMessages() = kotlinx.coroutines.test.runTest { + val group = boClient.conversations.findOrCreateDm(alix.walletAddress.lowercase()) + alixClient.conversations.syncConversations() + val alixDm = alixClient.findDm(bo.walletAddress) + group.streamMessages().test { + alixDm?.send("hi") + assertEquals("hi", awaitItem().body) + alixDm?.send("hi again") + assertEquals("hi again", awaitItem().body) + } + } + + @Test + fun testCanStreamAllMessages() { + val boDm = runBlocking { boClient.conversations.findOrCreateDm(alix.walletAddress) } + runBlocking { alixClient.conversations.syncConversations() } + + val allMessages = mutableListOf() + + val job = CoroutineScope(Dispatchers.IO).launch { + try { + alixClient.conversations.streamAllConversationMessages().collect { message -> + allMessages.add(message) + } + } catch (e: Exception) { + } + } + Thread.sleep(2500) + + for (i in 0 until 2) { + runBlocking { boDm.send(text = "Message $i") } + Thread.sleep(100) + } + assertEquals(2, allMessages.size) + + val caroDm = + runBlocking { caroClient.conversations.findOrCreateDm(alixClient.address) } + Thread.sleep(2500) + + for (i in 0 until 2) { + runBlocking { caroDm.send(text = "Message $i") } + Thread.sleep(100) + } + + assertEquals(4, allMessages.size) + + job.cancel() + } + + @Test + fun testCanStreamDecryptedDmMessages() = kotlinx.coroutines.test.runTest { + val dm = boClient.conversations.findOrCreateDm(alix.walletAddress) + alixClient.conversations.syncConversations() + val alixDm = alixClient.findDm(bo.walletAddress) + dm.streamDecryptedMessages().test { + alixDm?.send("hi") + assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8()) + alixDm?.send("hi again") + assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8()) + } + } + + @Test + fun testCanStreamAllDecryptedDmMessages() { + val dm = runBlocking { boClient.conversations.findOrCreateDm(alix.walletAddress) } + runBlocking { alixClient.conversations.syncConversations() } + + val allMessages = mutableListOf() + + val job = CoroutineScope(Dispatchers.IO).launch { + try { + alixClient.conversations.streamAllConversationDecryptedMessages().collect { message -> + allMessages.add(message) + } + } catch (e: Exception) { + } + } + Thread.sleep(2500) + + for (i in 0 until 2) { + runBlocking { dm.send(text = "Message $i") } + Thread.sleep(100) + } + assertEquals(2, allMessages.size) + + val caroDm = + runBlocking { caroClient.conversations.findOrCreateDm(alixClient.address) } + Thread.sleep(2500) + + for (i in 0 until 2) { + runBlocking { caroDm.send(text = "Message $i") } + Thread.sleep(100) + } + + assertEquals(4, allMessages.size) + + job.cancel() + } + + @Test + fun testCanStreamConversations() = kotlinx.coroutines.test.runTest { + boClient.conversations.streamConversations().test { + val dm = + alixClient.conversations.findOrCreateDm(bo.walletAddress) + assertEquals(dm.id, awaitItem().id) + val dm2 = + caroClient.conversations.findOrCreateDm(bo.walletAddress) + assertEquals(dm2.id, awaitItem().id) + } + } + + @Test + fun testDmConsent() { + runBlocking { + val dm = + boClient.conversations.findOrCreateDm(alix.walletAddress) + assert(boClient.contacts.isGroupAllowed(dm.id)) + assertEquals(dm.consentState(), ConsentState.ALLOWED) + + boClient.contacts.denyGroups(listOf(dm.id)) + assert(boClient.contacts.isGroupDenied(dm.id)) + assertEquals(dm.consentState(), ConsentState.DENIED) + + dm.updateConsentState(ConsentState.ALLOWED) + assert(boClient.contacts.isGroupAllowed(dm.id)) + assertEquals(dm.consentState(), ConsentState.ALLOWED) + } + } +} diff --git a/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt b/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt index 58b6c5423..09679804b 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt @@ -40,20 +40,24 @@ class GroupTest { private lateinit var caroWallet: PrivateKeyBuilder private lateinit var caro: PrivateKey private lateinit var caroClient: Client + private lateinit var davonV3Wallet: PrivateKeyBuilder + private lateinit var davonV3: PrivateKey + private lateinit var davonV3Client: Client private lateinit var fixtures: Fixtures @Before fun setUp() { val key = SecureRandom().generateSeed(32) val context = InstrumentationRegistry.getInstrumentation().targetContext + val options = ClientOptions( + ClientOptions.Api(XMTPEnvironment.LOCAL, false), + enableV3 = true, + appContext = context, + dbEncryptionKey = key + ) fixtures = fixtures( - clientOptions = ClientOptions( - ClientOptions.Api(XMTPEnvironment.LOCAL, false), - enableV3 = true, - appContext = context, - dbEncryptionKey = key - ) + clientOptions = options ) alixWallet = fixtures.aliceAccount alix = fixtures.alice @@ -61,10 +65,14 @@ class GroupTest { bo = fixtures.bob caroWallet = fixtures.caroAccount caro = fixtures.caro + davonV3Wallet = PrivateKeyBuilder() + davonV3 = davonV3Wallet.getPrivateKey() alixClient = fixtures.aliceClient boClient = fixtures.bobClient caroClient = fixtures.caroClient + davonV3Client = + runBlocking { Client().createV3(account = davonV3Wallet, options = options) } } @Test @@ -387,6 +395,8 @@ class GroupTest { runBlocking { boClient.conversations.newGroup(listOf(alix.walletAddress)) boClient.conversations.newGroup(listOf(caro.walletAddress)) + davonV3Client.conversations.findOrCreateDm(bo.walletAddress) + boClient.conversations.syncConversations() } val groups = runBlocking { boClient.conversations.listGroups() } assertEquals(groups.size, 2) @@ -398,6 +408,8 @@ class GroupTest { boClient.conversations.newGroup(listOf(alix.walletAddress)) boClient.conversations.newGroup(listOf(caro.walletAddress)) boClient.conversations.newConversation(alix.walletAddress) + davonV3Client.conversations.findOrCreateDm(bo.walletAddress) + boClient.conversations.syncConversations() } val convos = runBlocking { boClient.conversations.list(includeGroups = true) } assertEquals(convos.size, 3) @@ -589,6 +601,7 @@ class GroupTest { @Test fun testCanStreamAllGroupMessages() { val group = runBlocking { caroClient.conversations.newGroup(listOf(alix.walletAddress)) } + val dm = runBlocking { davonV3Client.conversations.findOrCreateDm(alix.walletAddress) } runBlocking { alixClient.conversations.syncGroups() } val allMessages = mutableListOf() @@ -603,8 +616,11 @@ class GroupTest { } Thread.sleep(2500) + runBlocking { dm.send("should not stream") } for (i in 0 until 2) { - runBlocking { group.send(text = "Message $i") } + runBlocking { + group.send(text = "Message $i") + } Thread.sleep(100) } assertEquals(2, allMessages.size) @@ -626,9 +642,10 @@ class GroupTest { @Test fun testCanStreamAllMessages() { val group = runBlocking { caroClient.conversations.newGroup(listOf(alix.walletAddress)) } + val dm = runBlocking { davonV3Client.conversations.findOrCreateDm(alix.walletAddress) } val conversation = runBlocking { boClient.conversations.newConversation(alix.walletAddress) } - runBlocking { alixClient.conversations.syncGroups() } + runBlocking { alixClient.conversations.syncConversations() } val allMessages = mutableListOf() @@ -646,6 +663,7 @@ class GroupTest { runBlocking { group.send("hi") conversation.send("hi") + dm.send("should not stream") } Thread.sleep(1000) @@ -671,7 +689,8 @@ class GroupTest { @Test fun testCanStreamAllDecryptedGroupMessages() { val group = runBlocking { caroClient.conversations.newGroup(listOf(alix.walletAddress)) } - runBlocking { alixClient.conversations.syncGroups() } + val dm = runBlocking { davonV3Client.conversations.findOrCreateDm(alix.walletAddress) } + runBlocking { alixClient.conversations.syncConversations() } val allMessages = mutableListOf() @@ -685,6 +704,7 @@ class GroupTest { } Thread.sleep(2500) + runBlocking { dm.send("Should not stream") } for (i in 0 until 2) { runBlocking { group.send(text = "Message $i") } Thread.sleep(100) @@ -708,6 +728,7 @@ class GroupTest { @Test fun testCanStreamAllDecryptedMessages() { val group = runBlocking { caroClient.conversations.newGroup(listOf(alix.walletAddress)) } + val dm = runBlocking { davonV3Client.conversations.findOrCreateDm(alix.walletAddress) } val conversation = runBlocking { boClient.conversations.newConversation(alix.walletAddress) } runBlocking { alixClient.conversations.syncGroups() } @@ -726,6 +747,7 @@ class GroupTest { Thread.sleep(2500) runBlocking { + dm.send("should not stream") group.send("hi") conversation.send("hi") } @@ -746,6 +768,9 @@ class GroupTest { val group2 = caroClient.conversations.newGroup(listOf(bo.walletAddress)) assertEquals(group2.id, awaitItem().id) + davonV3Client.conversations.findOrCreateDm(bo.walletAddress) + expectNoEvents() + cancelAndConsumeRemainingEvents() } } @@ -765,6 +790,7 @@ class GroupTest { Thread.sleep(2500) runBlocking { + davonV3Client.conversations.findOrCreateDm(alix.walletAddress) alixClient.conversations.newConversation(bo.walletAddress) Thread.sleep(2500) caroClient.conversations.newGroup(listOf(alix.walletAddress)) diff --git a/library/src/androidTest/java/org/xmtp/android/library/SmartContractWalletTest.kt b/library/src/androidTest/java/org/xmtp/android/library/SmartContractWalletTest.kt index 543ce54b1..18f757ac9 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/SmartContractWalletTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/SmartContractWalletTest.kt @@ -212,7 +212,7 @@ class SmartContractWalletTest { @Test fun testCanStreamAllMessages() { - val group = runBlocking { + val group1 = runBlocking { davonSCWClient.conversations.newGroup( listOf( boV3.walletAddress, @@ -228,13 +228,15 @@ class SmartContractWalletTest { ) ) } - runBlocking { davonSCWClient.conversations.syncGroups() } + val dm1 = runBlocking { davonSCWClient.conversations.findOrCreateDm(eriSCW.walletAddress) } + val dm2 = runBlocking { boV3Client.conversations.findOrCreateDm(davonSCW.walletAddress) } + runBlocking { davonSCWClient.conversations.syncConversations() } val allMessages = mutableListOf() val job = CoroutineScope(Dispatchers.IO).launch { try { - davonSCWClient.conversations.streamAllGroupMessages() + davonSCWClient.conversations.streamAllConversationMessages() .collect { message -> allMessages.add(message) } @@ -243,21 +245,23 @@ class SmartContractWalletTest { } Thread.sleep(1000) runBlocking { - group.send("hi") + group1.send("hi") group2.send("hi") + dm1.send("hi") + dm2.send("hi") } Thread.sleep(1000) - assertEquals(2, allMessages.size) + assertEquals(4, allMessages.size) job.cancel() } @Test - fun testCanStreamGroups() { + fun testCanStreamConversations() { val allMessages = mutableListOf() val job = CoroutineScope(Dispatchers.IO).launch { try { - davonSCWClient.conversations.streamGroups() + davonSCWClient.conversations.streamConversations() .collect { message -> allMessages.add(message.topic) } @@ -269,10 +273,12 @@ class SmartContractWalletTest { runBlocking { eriSCWClient.conversations.newGroup(listOf(boV3.walletAddress, davonSCW.walletAddress)) boV3Client.conversations.newGroup(listOf(eriSCW.walletAddress, davonSCW.walletAddress)) + eriSCWClient.conversations.findOrCreateDm(davonSCW.walletAddress) + boV3Client.conversations.findOrCreateDm(davonSCW.walletAddress) } Thread.sleep(1000) - assertEquals(2, allMessages.size) + assertEquals(4, allMessages.size) job.cancel() } } diff --git a/library/src/androidTest/java/org/xmtp/android/library/TestHelpers.kt b/library/src/androidTest/java/org/xmtp/android/library/TestHelpers.kt index 4d2453fe3..2b68048b0 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/TestHelpers.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/TestHelpers.kt @@ -151,6 +151,7 @@ data class Fixtures( val aliceAccount = PrivateKeyBuilder() val bobAccount = PrivateKeyBuilder() val caroAccount = PrivateKeyBuilder() + val davonV3Account = PrivateKeyBuilder() var alice: PrivateKey = aliceAccount.getPrivateKey() var aliceClient: Client = diff --git a/library/src/androidTest/java/org/xmtp/android/library/V3ClientTest.kt b/library/src/androidTest/java/org/xmtp/android/library/V3ClientTest.kt index a30d9751e..5ae75845d 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/V3ClientTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/V3ClientTest.kt @@ -11,6 +11,7 @@ import org.junit.Assert.assertEquals import org.junit.Before import org.junit.Test import org.junit.runner.RunWith +import org.xmtp.android.library.messages.DecryptedMessage import org.xmtp.android.library.messages.MessageDeliveryStatus import org.xmtp.android.library.messages.PrivateKey import org.xmtp.android.library.messages.PrivateKeyBuilder @@ -80,7 +81,8 @@ class V3ClientTest { @Test fun testsCanCreateGroup() { - val group = runBlocking { boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress)) } + val group = + runBlocking { boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress)) } assertEquals( runBlocking { group.members().map { it.inboxId }.sorted() }, listOf(caroV2V3Client.inboxId, boV3Client.inboxId).sorted() @@ -92,8 +94,99 @@ class V3ClientTest { } @Test - fun testsCanSendMessages() { - val group = runBlocking { boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress)) } + fun testsCanCreateDm() { + val dm = runBlocking { boV3Client.conversations.findOrCreateDm(caroV2V3.walletAddress) } + assertEquals( + runBlocking { dm.members().map { it.inboxId }.sorted() }, + listOf(caroV2V3Client.inboxId, boV3Client.inboxId).sorted() + ) + + val sameDm = runBlocking { boV3Client.findDm(caroV2V3.walletAddress) } + assertEquals(sameDm?.id, dm.id) + + runBlocking { caroV2V3Client.conversations.syncConversations() } + val caroDm = runBlocking { caroV2V3Client.findDm(boV3Client.address) } + assertEquals(caroDm?.id, dm.id) + + Assert.assertThrows("Recipient not on network", XMTPException::class.java) { + runBlocking { boV3Client.conversations.findOrCreateDm(alixV2.walletAddress) } + } + } + + @Test + fun testsCanFindConversationByTopic() { + val group = + runBlocking { boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress)) } + val dm = runBlocking { boV3Client.conversations.findOrCreateDm(caroV2V3.walletAddress) } + + val sameDm = boV3Client.findConversationByTopic(dm.topic) + val sameGroup = boV3Client.findConversationByTopic(group.topic) + assertEquals(group.id, sameGroup?.id) + assertEquals(dm.id, sameDm?.id) + } + + @Test + fun testsCanListConversations() { + val dm = runBlocking { boV3Client.conversations.findOrCreateDm(caroV2V3.walletAddress) } + val group = + runBlocking { boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress)) } + assertEquals(runBlocking { boV3Client.conversations.listConversations().size }, 2) + assertEquals(runBlocking { boV3Client.conversations.listDms().size }, 1) + assertEquals(runBlocking { boV3Client.conversations.listGroups().size }, 1) + + runBlocking { caroV2V3Client.conversations.syncConversations() } + assertEquals( + runBlocking { caroV2V3Client.conversations.list(includeGroups = true).size }, + 1 + ) + assertEquals(runBlocking { caroV2V3Client.conversations.listGroups().size }, 1) + } + + @Test + fun testsCanListConversationsFiltered() { + val dm = runBlocking { boV3Client.conversations.findOrCreateDm(caroV2V3.walletAddress) } + val group = + runBlocking { boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress)) } + assertEquals(runBlocking { boV3Client.conversations.listConversations().size }, 2) + assertEquals( + runBlocking { boV3Client.conversations.listConversations(consentState = ConsentState.ALLOWED).size }, + 2 + ) + runBlocking { group.updateConsentState(ConsentState.DENIED) } + assertEquals( + runBlocking { boV3Client.conversations.listConversations(consentState = ConsentState.ALLOWED).size }, + 1 + ) + assertEquals( + runBlocking { boV3Client.conversations.listConversations(consentState = ConsentState.DENIED).size }, + 1 + ) + assertEquals(runBlocking { boV3Client.conversations.listConversations().size }, 2) + } + + @Test + fun testCanListConversationsOrder() { + val dm = runBlocking { boV3Client.conversations.findOrCreateDm(caroV2V3.walletAddress) } + val group1 = + runBlocking { boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress)) } + val group2 = + runBlocking { boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress)) } + runBlocking { dm.send("Howdy") } + runBlocking { group2.send("Howdy") } + runBlocking { boV3Client.conversations.syncAllConversations() } + val conversations = runBlocking { boV3Client.conversations.listConversations() } + val conversationsOrdered = + runBlocking { boV3Client.conversations.listConversations(order = Conversations.ConversationOrder.LAST_MESSAGE) } + assertEquals(conversations.size, 3) + assertEquals(conversationsOrdered.size, 3) + assertEquals(conversations.map { it.id }, listOf(dm.id, group1.id, group2.id)) + assertEquals(conversationsOrdered.map { it.id }, listOf(group2.id, dm.id, group1.id)) + } + + @Test + fun testsCanSendMessagesToGroup() { + val group = + runBlocking { boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress)) } runBlocking { group.send("howdy") } val messageId = runBlocking { group.send("gm") } runBlocking { group.sync() } @@ -102,13 +195,48 @@ class V3ClientTest { assertEquals(group.messages().first().deliveryStatus, MessageDeliveryStatus.PUBLISHED) assertEquals(group.messages().size, 3) - runBlocking { caroV2V3Client.conversations.syncGroups() } + runBlocking { caroV2V3Client.conversations.syncConversations() } val sameGroup = runBlocking { caroV2V3Client.conversations.listGroups().last() } runBlocking { sameGroup.sync() } assertEquals(sameGroup.messages().size, 2) assertEquals(sameGroup.messages().first().body, "gm") } + @Test + fun testsCanSendMessagesToDm() { + var boDm = + runBlocking { boV3Client.conversations.findOrCreateDm(caroV2V3.walletAddress) } + runBlocking { boDm.send("howdy") } + var messageId = runBlocking { boDm.send("gm") } + var boDmMessage = runBlocking { boDm.messages() } + assertEquals(boDmMessage.first().body, "gm") + assertEquals(boDmMessage.first().id, messageId) + assertEquals(boDmMessage.first().deliveryStatus, MessageDeliveryStatus.PUBLISHED) + assertEquals(boDmMessage.size, 3) + + runBlocking { caroV2V3Client.conversations.syncConversations() } + val caroDm = runBlocking { caroV2V3Client.findDm(boV3.walletAddress) } + runBlocking { caroDm!!.sync() } + var caroDmMessage = runBlocking { caroDm!!.messages() } + assertEquals(caroDmMessage.size, 2) + assertEquals(caroDmMessage.first().body, "gm") + + runBlocking { caroDm!!.send("howdy") } + messageId = runBlocking { caroDm!!.send("gm") } + caroDmMessage = runBlocking { caroDm!!.messages() } + assertEquals(caroDmMessage.first().body, "gm") + assertEquals(caroDmMessage.first().id, messageId) + assertEquals(caroDmMessage.first().deliveryStatus, MessageDeliveryStatus.PUBLISHED) + assertEquals(caroDmMessage.size, 4) + + runBlocking { boV3Client.conversations.syncConversations() } + boDm = runBlocking { boV3Client.findDm(caroV2V3.walletAddress)!! } + runBlocking { boDm.sync() } + boDmMessage = runBlocking { boDm.messages() } + assertEquals(boDmMessage.size, 5) + assertEquals(boDmMessage.first().body, "gm") + } + @Test fun testGroupConsent() { runBlocking { @@ -156,12 +284,97 @@ class V3ClientTest { } } + @Test + fun testCanStreamAllMessagesFromV3Users() { + val group = + runBlocking { caroV2V3Client.conversations.newGroup(listOf(boV3.walletAddress)) } + val conversation = + runBlocking { boV3Client.conversations.findOrCreateDm(caroV2V3.walletAddress) } + runBlocking { boV3Client.conversations.syncConversations() } + + val allMessages = mutableListOf() + + val job = CoroutineScope(Dispatchers.IO).launch { + try { + boV3Client.conversations.streamAllConversationMessages() + .collect { message -> + allMessages.add(message) + } + } catch (e: Exception) { + } + } + Thread.sleep(1000) + runBlocking { + group.send("hi") + conversation.send("hi") + } + Thread.sleep(1000) + assertEquals(2, allMessages.size) + job.cancel() + } + + @Test + fun testCanStreamAllDecryptedMessagesFromV3Users() { + val group = + runBlocking { caroV2V3Client.conversations.newGroup(listOf(boV3.walletAddress)) } + val conversation = + runBlocking { boV3Client.conversations.findOrCreateDm(caroV2V3.walletAddress) } + runBlocking { boV3Client.conversations.syncConversations() } + + val allMessages = mutableListOf() + + val job = CoroutineScope(Dispatchers.IO).launch { + try { + boV3Client.conversations.streamAllConversationDecryptedMessages() + .collect { message -> + allMessages.add(message) + } + } catch (e: Exception) { + } + } + Thread.sleep(1000) + runBlocking { + group.send("hi") + conversation.send("hi") + } + Thread.sleep(1000) + assertEquals(2, allMessages.size) + job.cancel() + } + + @Test + fun testCanStreamGroupsAndConversationsFromV3Users() { + val allMessages = mutableListOf() + + val job = CoroutineScope(Dispatchers.IO).launch { + try { + boV3Client.conversations.streamConversations() + .collect { message -> + allMessages.add(message.topic) + } + } catch (e: Exception) { + } + } + Thread.sleep(1000) + + runBlocking { + caroV2V3Client.conversations.newGroup(listOf(boV3.walletAddress)) + Thread.sleep(1000) + boV3Client.conversations.findOrCreateDm(caroV2V3.walletAddress) + } + + Thread.sleep(2000) + assertEquals(2, allMessages.size) + job.cancel() + } + @Test fun testCanStreamAllMessagesFromV2andV3Users() { - val group = runBlocking { boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress)) } + val group = + runBlocking { boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress)) } val conversation = runBlocking { alixV2Client.conversations.newConversation(caroV2V3.walletAddress) } - runBlocking { caroV2V3Client.conversations.syncGroups() } + runBlocking { caroV2V3Client.conversations.syncConversations() } val allMessages = mutableListOf() diff --git a/library/src/main/java/org/xmtp/android/library/Client.kt b/library/src/main/java/org/xmtp/android/library/Client.kt index 46f1cb51c..ed72e2609 100644 --- a/library/src/main/java/org/xmtp/android/library/Client.kt +++ b/library/src/main/java/org/xmtp/android/library/Client.kt @@ -603,6 +603,7 @@ class Client() { } } + @Deprecated("Find now includes DMs and Groups", replaceWith = ReplaceWith("findConversation")) fun findGroup(groupId: String): Group? { val client = v3Client ?: throw XMTPException("Error no V3 client initialized") try { @@ -612,6 +613,44 @@ class Client() { } } + fun findConversation(conversationId: String): Conversation? { + val client = v3Client ?: throw XMTPException("Error no V3 client initialized") + val conversation = client.conversation(conversationId.hexToByteArray()) + return if (conversation.groupMetadata().conversationType() == "dm") { + Conversation.Dm(Dm(this, conversation)) + } else if (conversation.groupMetadata().conversationType() == "group") { + Conversation.Group(Group(this, conversation)) + } else { + null + } + } + + fun findConversationByTopic(topic: String): Conversation? { + val client = v3Client ?: throw XMTPException("Error no V3 client initialized") + val regex = """/xmtp/mls/1/g-(.*?)/proto""".toRegex() + val matchResult = regex.find(topic) + val conversationId = matchResult?.groupValues?.get(1) ?: "" + val conversation = client.conversation(conversationId.hexToByteArray()) + return if (conversation.groupMetadata().conversationType() == "dm") { + Conversation.Dm(Dm(this, conversation)) + } else if (conversation.groupMetadata().conversationType() == "group") { + Conversation.Group(Group(this, conversation)) + } else { + null + } + } + + suspend fun findDm(address: String): Dm? { + val client = v3Client ?: throw XMTPException("Error no V3 client initialized") + val inboxId = + inboxIdFromAddress(address.lowercase()) ?: throw XMTPException("No inboxId present") + try { + return Dm(this, client.dmConversation(inboxId)) + } catch (e: Exception) { + return null + } + } + fun findMessage(messageId: String): MessageV3? { val client = v3Client ?: throw XMTPException("Error no V3 client initialized") return try { diff --git a/library/src/main/java/org/xmtp/android/library/Conversation.kt b/library/src/main/java/org/xmtp/android/library/Conversation.kt index b8db34d09..631502261 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversation.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversation.kt @@ -5,6 +5,7 @@ import com.google.protobuf.kotlin.toByteString import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.runBlocking import org.xmtp.android.library.codecs.EncodedContent +import org.xmtp.android.library.libxmtp.Member import org.xmtp.android.library.libxmtp.MessageV3 import org.xmtp.android.library.messages.DecryptedMessage import org.xmtp.android.library.messages.Envelope @@ -28,154 +29,92 @@ sealed class Conversation { data class V2(val conversationV2: ConversationV2) : Conversation() data class Group(val group: org.xmtp.android.library.Group) : Conversation() + data class Dm(val dm: org.xmtp.android.library.Dm) : Conversation() - enum class Version { V1, V2, GROUP } + enum class Version { V1, V2, GROUP, DM } - // This indicates whether this a v1 or v2 conversation. val version: Version get() { return when (this) { is V1 -> Version.V1 is V2 -> Version.V2 is Group -> Version.GROUP + is Dm -> Version.DM } } - // When the conversation was first created. - val createdAt: Date - get() { - return when (this) { - is V1 -> conversationV1.sentAt - is V2 -> conversationV2.createdAt - is Group -> group.createdAt - } - } - - // This is the address of the peer that I am talking to. - val peerAddress: String + val id: String get() { return when (this) { - is V1 -> conversationV1.peerAddress - is V2 -> conversationV2.peerAddress - is Group -> runBlocking { group.peerInboxIds().joinToString(",") } + is V1 -> throw XMTPException("Only supported for V3") + is V2 -> throw XMTPException("Only supported for V3") + is Group -> group.id + is Dm -> dm.id } } - val peerAddresses: List - get() { - return when (this) { - is V1 -> listOf(conversationV1.peerAddress) - is V2 -> listOf(conversationV2.peerAddress) - is Group -> runBlocking { group.peerInboxIds() } - } - } - - // This distinctly identifies between two addresses. - // Note: this will be empty for older v1 conversations. - val conversationId: String? + val topic: String get() { return when (this) { - is V1 -> null - is V2 -> conversationV2.context.conversationId - is Group -> null + is V1 -> conversationV1.topic.description + is V2 -> conversationV2.topic + is Group -> group.topic + is Dm -> dm.topic } } - val keyMaterial: ByteArray? + val createdAt: Date get() { return when (this) { - is V1 -> null - is V2 -> conversationV2.keyMaterial - is Group -> null + is V1 -> conversationV1.sentAt + is V2 -> conversationV2.createdAt + is Group -> group.createdAt + is Dm -> dm.createdAt } } - suspend fun consentState(): ConsentState { + fun isCreator(): Boolean { return when (this) { - is V1 -> conversationV1.client.contacts.consentList.state(address = peerAddress) - is V2 -> conversationV2.client.contacts.consentList.state(address = peerAddress) - is Group -> group.consentState() - } - } - - /** - * This method is to create a TopicData object - * @return [TopicData] that contains all the information about the Topic, the conversation - * context and the necessary encryption data for it. - */ - fun toTopicData(): TopicData { - val data = TopicData.newBuilder() - .setCreatedNs(createdAt.time * 1_000_000) - .setPeerAddress(peerAddress) - return when (this) { - is V1 -> data.build() - is V2 -> data.setInvitation( - Invitation.InvitationV1.newBuilder() - .setTopic(topic) - .setContext(conversationV2.context) - .setAes256GcmHkdfSha256( - Aes256gcmHkdfsha256.newBuilder() - .setKeyMaterial(conversationV2.keyMaterial.toByteString()), - ), - ).build() - - is Group -> throw XMTPException("Groups do not support topics") + is V1 -> throw XMTPException("Only supported for V3") + is V2 -> throw XMTPException("Only supported for V3") + is Group -> group.isCreator() + is Dm -> dm.isCreator() } } - fun decode(envelope: Envelope, message: MessageV3? = null): DecodedMessage { + suspend fun members(): List { return when (this) { - is V1 -> conversationV1.decode(envelope) - is V2 -> conversationV2.decodeEnvelope(envelope) - is Group -> message?.decode() ?: throw XMTPException("Groups require message be passed") + is V1 -> throw XMTPException("Only supported for V3") + is V2 -> throw XMTPException("Only supported for V3") + is Group -> group.members() + is Dm -> dm.members() } } - fun decodeOrNull(envelope: Envelope): DecodedMessage? { - return try { - decode(envelope) - } catch (e: Exception) { - Log.d("CONVERSATION", "discarding message that failed to decode", e) - null - } - } - - fun prepareMessage(content: T, options: SendOptions? = null): PreparedMessage { + suspend fun updateConsentState(state: ConsentState) { return when (this) { - is V1 -> { - conversationV1.prepareMessage(content = content, options = options) - } - - is V2 -> { - conversationV2.prepareMessage(content = content, options = options) - } - - is Group -> throw XMTPException("Groups do not support prepared messages") // We return a encoded content not a preparedmessage which requires a envelope + is V1 -> throw XMTPException("Only supported for V3") + is V2 -> throw XMTPException("Only supported for V3") + is Group -> group.updateConsentState(state) + is Dm -> dm.updateConsentState(state) } } - fun prepareMessage( - encodedContent: EncodedContent, - options: SendOptions? = null, - ): PreparedMessage { + suspend fun consentState(): ConsentState { return when (this) { - is V1 -> { - conversationV1.prepareMessage(encodedContent = encodedContent, options = options) - } - - is V2 -> { - conversationV2.prepareMessage(encodedContent = encodedContent, options = options) - } - - is Group -> throw XMTPException("Groups do not support prepared messages") // We return a encoded content not a preparedmessage which requires a envelope + is V1 -> conversationV1.client.contacts.consentList.state(address = peerAddress) + is V2 -> conversationV2.client.contacts.consentList.state(address = peerAddress) + is Group -> group.consentState() + is Dm -> dm.consentState() } } - suspend fun send(prepared: PreparedMessage): String { + suspend fun prepareMessageV3(content: T, options: SendOptions? = null): String { return when (this) { - is V1 -> conversationV1.send(prepared = prepared) - is V2 -> conversationV2.send(prepared = prepared) - is Group -> throw XMTPException("Groups do not support prepared messages") // We return a encoded content not a prepared Message which requires a envelope + is V1 -> throw XMTPException("Only supported for V3") + is V2 -> throw XMTPException("Only supported for V3") + is Group -> group.prepareMessage(content, options) + is Dm -> dm.prepareMessage(content, options) } } @@ -184,6 +123,7 @@ sealed class Conversation { is V1 -> conversationV1.send(content = content, options = options) is V2 -> conversationV2.send(content = content, options = options) is Group -> group.send(content = content, options = options) + is Dm -> dm.send(content = content, options = options) } } @@ -192,6 +132,7 @@ sealed class Conversation { is V1 -> conversationV1.send(text = text, sendOptions, sentAt) is V2 -> conversationV2.send(text = text, sendOptions, sentAt) is Group -> group.send(text) + is Dm -> dm.send(text) } } @@ -200,23 +141,18 @@ sealed class Conversation { is V1 -> conversationV1.send(encodedContent = encodedContent, options = options) is V2 -> conversationV2.send(encodedContent = encodedContent, options = options) is Group -> group.send(encodedContent = encodedContent) + is Dm -> dm.send(encodedContent = encodedContent) } } - val clientAddress: String - get() { - return client.address - } - - // Is the topic of the conversation depending on the version - val topic: String - get() { - return when (this) { - is V1 -> conversationV1.topic.description - is V2 -> conversationV2.topic - is Group -> group.topic - } + suspend fun sync() { + return when (this) { + is V1 -> throw XMTPException("Only supported for V3") + is V2 -> throw XMTPException("Only supported for V3") + is Group -> group.sync() + is Dm -> dm.sync() } + } /** * This lists messages sent to the [Conversation]. @@ -261,6 +197,8 @@ sealed class Conversation { direction = direction, ) } + + is Dm -> dm.messages(limit, before, after, direction) } } @@ -274,19 +212,36 @@ sealed class Conversation { is V1 -> conversationV1.decryptedMessages(limit, before, after, direction) is V2 -> conversationV2.decryptedMessages(limit, before, after, direction) is Group -> group.decryptedMessages(limit, before, after, direction) + is Dm -> dm.decryptedMessages(limit, before, after, direction) } } - fun decrypt( - envelope: Envelope, - message: MessageV3? = null, + fun decryptV3( + message: MessageV3, ): DecryptedMessage { return when (this) { - is V1 -> conversationV1.decrypt(envelope) - is V2 -> conversationV2.decrypt(envelope) - is Group -> { - message?.decrypt() ?: throw XMTPException("Groups require message be passed") - } + is V1 -> throw XMTPException("Only supported for V3") + is V2 -> throw XMTPException("Only supported for V3") + is Group -> message.decrypt() + is Dm -> message.decrypt() + } + } + + fun decodeV3(message: MessageV3): DecodedMessage { + return when (this) { + is V1 -> throw XMTPException("Only supported for V3") + is V2 -> throw XMTPException("Only supported for V3") + is Group -> message.decode() + is Dm -> message.decode() + } + } + + suspend fun processMessage(envelopeBytes: ByteArray): MessageV3 { + return when (this) { + is V1 -> throw XMTPException("Only supported for V3") + is V2 -> throw XMTPException("Only supported for V3") + is Group -> group.processMessage(envelopeBytes) + is Dm -> dm.processMessage(envelopeBytes) } } @@ -296,6 +251,7 @@ sealed class Conversation { is V1 -> return null is V2 -> conversationV2.consentProof is Group -> return null + is Dm -> return null } } @@ -306,6 +262,7 @@ sealed class Conversation { is V1 -> conversationV1.client is V2 -> conversationV2.client is Group -> group.client + is Dm -> dm.client } } @@ -318,6 +275,7 @@ sealed class Conversation { is V1 -> conversationV1.streamMessages() is V2 -> conversationV2.streamMessages() is Group -> group.streamMessages() + is Dm -> dm.streamMessages() } } @@ -326,14 +284,159 @@ sealed class Conversation { is V1 -> conversationV1.streamDecryptedMessages() is V2 -> conversationV2.streamDecryptedMessages() is Group -> group.streamDecryptedMessages() + is Dm -> dm.streamDecryptedMessages() + } + } + + // ------- V1 V2 to be deprecated ------ + + fun decrypt( + envelope: Envelope, + ): DecryptedMessage { + return when (this) { + is V1 -> conversationV1.decrypt(envelope) + is V2 -> conversationV2.decrypt(envelope) + is Group -> throw XMTPException("Use decryptV3 instead") + is Dm -> throw XMTPException("Use decryptV3 instead") + } + } + + fun decode(envelope: Envelope): DecodedMessage { + return when (this) { + is V1 -> conversationV1.decode(envelope) + is V2 -> conversationV2.decodeEnvelope(envelope) + is Group -> throw XMTPException("Use decodeV3 instead") + is Dm -> throw XMTPException("Use decodeV3 instead") + } + } + + // This is the address of the peer that I am talking to. + val peerAddress: String + get() { + return when (this) { + is V1 -> conversationV1.peerAddress + is V2 -> conversationV2.peerAddress + is Group -> runBlocking { group.peerInboxIds().joinToString(",") } + is Dm -> runBlocking { dm.peerInboxId() } + } + } + + val peerAddresses: List + get() { + return when (this) { + is V1 -> listOf(conversationV1.peerAddress) + is V2 -> listOf(conversationV2.peerAddress) + is Group -> runBlocking { group.peerInboxIds() } + is Dm -> runBlocking { listOf(dm.peerInboxId()) } + } + } + + // This distinctly identifies between two addresses. + // Note: this will be empty for older v1 conversations. + val conversationId: String? + get() { + return when (this) { + is V1 -> null + is V2 -> conversationV2.context.conversationId + is Group -> null + is Dm -> null + } + } + + val keyMaterial: ByteArray? + get() { + return when (this) { + is V1 -> null + is V2 -> conversationV2.keyMaterial + is Group -> null + is Dm -> null + } + } + + /** + * This method is to create a TopicData object + * @return [TopicData] that contains all the information about the Topic, the conversation + * context and the necessary encryption data for it. + */ + fun toTopicData(): TopicData { + val data = TopicData.newBuilder() + .setCreatedNs(createdAt.time * 1_000_000) + .setPeerAddress(peerAddress) + return when (this) { + is V1 -> data.build() + is V2 -> data.setInvitation( + Invitation.InvitationV1.newBuilder() + .setTopic(topic) + .setContext(conversationV2.context) + .setAes256GcmHkdfSha256( + Aes256gcmHkdfsha256.newBuilder() + .setKeyMaterial(conversationV2.keyMaterial.toByteString()), + ), + ).build() + + is Group -> throw XMTPException("Groups do not support topics") + is Dm -> throw XMTPException("DMs do not support topics") + } + } + + fun decodeOrNull(envelope: Envelope): DecodedMessage? { + return try { + decode(envelope) + } catch (e: Exception) { + Log.d("CONVERSATION", "discarding message that failed to decode", e) + null + } + } + + fun prepareMessage(content: T, options: SendOptions? = null): PreparedMessage { + return when (this) { + is V1 -> conversationV1.prepareMessage(content = content, options = options) + is V2 -> conversationV2.prepareMessage(content = content, options = options) + is Group -> throw XMTPException("Use prepareMessageV3 instead") + is Dm -> throw XMTPException("Use prepareMessageV3 instead") } } + fun prepareMessage( + encodedContent: EncodedContent, + options: SendOptions? = null, + ): PreparedMessage { + return when (this) { + is V1 -> conversationV1.prepareMessage( + encodedContent = encodedContent, + options = options + ) + + is V2 -> conversationV2.prepareMessage( + encodedContent = encodedContent, + options = options + ) + + is Group -> throw XMTPException("Use prepareMessageV3 instead") + is Dm -> throw XMTPException("Use prepareMessageV3 instead") + } + } + + suspend fun send(prepared: PreparedMessage): String { + return when (this) { + is V1 -> conversationV1.send(prepared = prepared) + is V2 -> conversationV2.send(prepared = prepared) + is Group -> throw XMTPException("Groups do not support sending prepared messages call sync instead") + is Dm -> throw XMTPException("DMs do not support sending prepared messages call sync instead") + } + } + + val clientAddress: String + get() { + return client.address + } + fun streamEphemeral(): Flow { return when (this) { is V1 -> return conversationV1.streamEphemeral() is V2 -> return conversationV2.streamEphemeral() is Group -> throw XMTPException("Groups do not support ephemeral messages") + is Dm -> throw XMTPException("DMs do not support ephemeral messages") } } } diff --git a/library/src/main/java/org/xmtp/android/library/Conversations.kt b/library/src/main/java/org/xmtp/android/library/Conversations.kt index ea7038309..0e5be3509 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversations.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversations.kt @@ -7,6 +7,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.merge import kotlinx.coroutines.launch +import org.xmtp.android.library.ConsentState.Companion.toFfiConsentState import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest import org.xmtp.android.library.Util.Companion.envelopeFromFFi import org.xmtp.android.library.libxmtp.MessageV3 @@ -36,22 +37,24 @@ import org.xmtp.proto.keystore.api.v1.Keystore.GetConversationHmacKeysResponse.H import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData import org.xmtp.proto.message.contents.Contact import org.xmtp.proto.message.contents.Invitation +import uniffi.xmtpv3.FfiConversation import uniffi.xmtpv3.FfiConversationCallback import uniffi.xmtpv3.FfiConversations import uniffi.xmtpv3.FfiCreateGroupOptions +import uniffi.xmtpv3.FfiDirection import uniffi.xmtpv3.FfiEnvelope -import uniffi.xmtpv3.FfiConversation +import uniffi.xmtpv3.FfiGroupPermissionsOptions import uniffi.xmtpv3.FfiListConversationsOptions +import uniffi.xmtpv3.FfiListMessagesOptions import uniffi.xmtpv3.FfiMessage import uniffi.xmtpv3.FfiMessageCallback +import uniffi.xmtpv3.FfiPermissionPolicySet import uniffi.xmtpv3.FfiV2SubscribeRequest import uniffi.xmtpv3.FfiV2Subscription import uniffi.xmtpv3.FfiV2SubscriptionCallback import uniffi.xmtpv3.NoPointer import uniffi.xmtpv3.org.xmtp.android.library.libxmtp.GroupPermissionPreconfiguration import uniffi.xmtpv3.org.xmtp.android.library.libxmtp.PermissionPolicySet -import uniffi.xmtpv3.FfiGroupPermissionsOptions -import uniffi.xmtpv3.FfiPermissionPolicySet import java.util.Date import kotlin.time.Duration.Companion.nanoseconds import kotlin.time.DurationUnit @@ -66,42 +69,19 @@ data class Conversations( private const val TAG = "CONVERSATIONS" } - /** - * This method creates a new conversation from an invitation. - * @param envelope Object that contains the information of the current [Client] such as topic - * and timestamp. - * @return [Conversation] from an invitation suing the current [Client]. - */ - fun fromInvite(envelope: Envelope): Conversation { - val sealedInvitation = Invitation.SealedInvitation.parseFrom(envelope.message) - val unsealed = sealedInvitation.v1.getInvitation(viewer = client.keys) - return Conversation.V2( - ConversationV2.create( - client = client, - invitation = unsealed, - header = sealedInvitation.v1.header, - ), - ) + enum class ConversationOrder { + CREATED_AT, + LAST_MESSAGE; } - /** - * This method creates a new conversation from an Intro. - * @param envelope Object that contains the information of the current [Client] such as topic - * and timestamp. - * @return [Conversation] from an Intro suing the current [Client]. - */ - fun fromIntro(envelope: Envelope): Conversation { - val messageV1 = MessageV1Builder.buildFromBytes(envelope.message.toByteArray()) - val senderAddress = messageV1.header.sender.walletAddress - val recipientAddress = messageV1.header.recipient.walletAddress - val peerAddress = if (client.address == senderAddress) recipientAddress else senderAddress - return Conversation.V1( - ConversationV1( - client = client, - peerAddress = peerAddress, - sentAt = messageV1.sentAt, - ), - ) + suspend fun conversationFromWelcome(envelopeBytes: ByteArray): Conversation { + val conversation = libXMTPConversations?.processStreamedWelcomeMessage(envelopeBytes) + ?: throw XMTPException("Client does not support Groups") + if (conversation.groupMetadata().conversationType() == "dm") { + return Conversation.Dm(Dm(client, conversation)) + } else { + return Conversation.Group(Group(client, conversation)) + } } suspend fun fromWelcome(envelopeBytes: ByteArray): Group { @@ -187,46 +167,48 @@ data class Conversations( } // Sync from the network the latest list of groups + @Deprecated("Sync now includes DMs and Groups", replaceWith = ReplaceWith("syncConversations")) suspend fun syncGroups() { libXMTPConversations?.sync() } - // Sync all existing local groups data from the network (Note: call syncGroups() first to get the latest list of groups) - suspend fun syncAllGroups(): UInt? { - return libXMTPConversations?.syncAllConversations() + // Sync from the network the latest list of conversations + suspend fun syncConversations() { + libXMTPConversations?.sync() } - suspend fun listGroups( - after: Date? = null, - before: Date? = null, - limit: Int? = null, - ): List { - return libXMTPConversations?.listGroups( - opts = FfiListConversationsOptions( - after?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS), - before?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS), - limit?.toLong() - ) - )?.map { - Group(client, it) - } ?: emptyList() + // Sync all existing local conversation data from the network (Note: call syncConversations() first to get the latest list of conversations) + suspend fun syncAllConversations(): UInt? { + return libXMTPConversations?.syncAllConversations() } - private suspend fun handleConsentProof( - consentProof: Invitation.ConsentProofPayload, - peerAddress: String, - ) { - val signature = consentProof.signature - val timestamp = consentProof.timestamp + // Sync all existing local groups data from the network (Note: call syncGroups() first to get the latest list of groups) + @Deprecated( + "Sync now includes DMs and Groups", + replaceWith = ReplaceWith("syncAllConversations") + ) + suspend fun syncAllGroups(): UInt? { + return libXMTPConversations?.syncAllConversations() + } - if (!KeyUtil.validateConsentSignature(signature, client.address, peerAddress, timestamp)) { - return + suspend fun findOrCreateDm(peerAddress: String): Dm { + if (client.hasV2Client) throw XMTPException("Only supported for V3 only clients.") + if (peerAddress.lowercase() == client.address.lowercase()) { + throw XMTPException("Recipient is sender") } - val contacts = client.contacts - contacts.refreshConsentList() - if (contacts.consentList.state(peerAddress) == ConsentState.UNKNOWN) { - contacts.allow(listOf(peerAddress)) + val falseAddresses = + client.canMessageV3(listOf(peerAddress)).filter { !it.value }.map { it.key } + if (falseAddresses.isNotEmpty()) { + throw XMTPException("${falseAddresses.joinToString()} not on network") } + var dm = client.findDm(peerAddress) + if (dm == null) { + val dmConversation = libXMTPConversations?.createDm(peerAddress.lowercase()) + ?: throw XMTPException("Client does not support V3 Dms") + dm = Dm(client, dmConversation) + client.contacts.allowGroups(groupIds = listOf(dm.id)) + } + return dm } /** @@ -320,6 +302,114 @@ data class Conversations( return conversation } + suspend fun listGroups( + after: Date? = null, + before: Date? = null, + limit: Int? = null, + ): List { + val ffiGroups = libXMTPConversations?.listGroups( + opts = FfiListConversationsOptions( + after?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS), + before?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS), + limit?.toLong() + ) + ) ?: throw XMTPException("Client does not support V3 dms") + + return ffiGroups.map { + Group(client, it) + } + } + + suspend fun listDms( + after: Date? = null, + before: Date? = null, + limit: Int? = null, + ): List { + if (client.hasV2Client) throw XMTPException("Only supported for V3 only clients.") + val ffiDms = libXMTPConversations?.listDms( + opts = FfiListConversationsOptions( + after?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS), + before?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS), + limit?.toLong() + ) + ) ?: throw XMTPException("Client does not support V3 dms") + + return ffiDms.map { + Dm(client, it) + } + } + + suspend fun listConversations( + after: Date? = null, + before: Date? = null, + limit: Int? = null, + order: ConversationOrder = ConversationOrder.CREATED_AT, + consentState: ConsentState? = null, + ): List { + if (client.hasV2Client) + throw XMTPException("Only supported for V3 only clients.") + + val ffiConversations = libXMTPConversations?.list( + FfiListConversationsOptions( + after?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS), + before?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS), + limit?.toLong() + ) + ) ?: throw XMTPException("Client does not support V3 dms") + + val filteredConversations = filterByConsentState(ffiConversations, consentState) + val sortedConversations = sortConversations(filteredConversations, order) + + return sortedConversations.map { it.toConversation() } + } + + private fun sortConversations( + conversations: List, + order: ConversationOrder, + ): List { + return when (order) { + ConversationOrder.LAST_MESSAGE -> { + conversations.map { conversation -> + val message = + conversation.findMessages( + FfiListMessagesOptions( + null, + null, + 1, + null, + FfiDirection.DESCENDING + ) + ) + .firstOrNull() + conversation to message?.sentAtNs + }.sortedByDescending { + it.second ?: 0L + }.map { + it.first + } + } + + ConversationOrder.CREATED_AT -> conversations + } + } + + private fun filterByConsentState( + conversations: List, + consentState: ConsentState?, + ): List { + return consentState?.let { state -> + conversations.filter { it.consentState() == toFfiConsentState(state) } + } ?: conversations + } + + private fun FfiConversation.toConversation(): Conversation { + return if (groupMetadata().conversationType() == "dm") { + Conversation.Dm(Dm(client, this)) + } else { + Conversation.Group(Group(client, this)) + } + } + /** * Get the list of conversations that current user has * @return The list of [Conversation] that the current [Client] has. @@ -359,42 +449,13 @@ data class Conversations( }.map { Pair(it.topic, it) } if (includeGroups) { - syncGroups() + syncConversations() val groups = listGroups() - conversationsByTopic += groups.map { Pair(it.id.toString(), Conversation.Group(it)) } + conversationsByTopic += groups.map { Pair(it.topic, Conversation.Group(it)) } } return conversationsByTopic.values.sortedByDescending { it.createdAt } } - fun importTopicData(data: TopicData): Conversation { - val conversation: Conversation - if (!data.hasInvitation()) { - val sentAt = Date(data.createdNs / 1_000_000) - conversation = Conversation.V1( - ConversationV1( - client, - data.peerAddress, - sentAt, - ), - ) - } else { - conversation = Conversation.V2( - ConversationV2( - topic = data.invitation.topic, - keyMaterial = data.invitation.aes256GcmHkdfSha256.keyMaterial.toByteArray(), - context = data.invitation.context, - peerAddress = data.peerAddress, - client = client, - createdAtNs = data.createdNs, - header = Invitation.SealedInvitationHeaderV1.getDefaultInstance(), - consentProof = if (data.invitation.hasConsentProof()) data.invitation.consentProof else null - ), - ) - } - conversationsByTopic[conversation.topic] = conversation - return conversation - } - fun getHmacKeys( request: Keystore.GetConversationHmacKeysRequest? = null, ): Keystore.GetConversationHmacKeysResponse { @@ -433,66 +494,198 @@ data class Conversations( return hmacKeysResponse.build() } - private suspend fun listIntroductionPeers(pagination: Pagination? = null): Map { - val apiClient = client.apiClient ?: throw XMTPException("V2 only function") - val envelopes = apiClient.queryTopic( - topic = Topic.userIntro(client.address), - pagination = pagination, - ).envelopesList - val messages = envelopes.mapNotNull { envelope -> - try { - val message = MessageV1Builder.buildFromBytes(envelope.message.toByteArray()) - // Attempt to decrypt, just to make sure we can - message.decrypt(client.privateKeyBundleV1) - message - } catch (e: Exception) { - Log.d(TAG, e.message.toString()) - null - } + private suspend fun handleConsentProof( + consentProof: Invitation.ConsentProofPayload, + peerAddress: String, + ) { + val signature = consentProof.signature + val timestamp = consentProof.timestamp + + if (!KeyUtil.validateConsentSignature(signature, client.address, peerAddress, timestamp)) { + return } - val seenPeers: MutableMap = mutableMapOf() - for (message in messages) { - val recipientAddress = message.recipientAddress - val senderAddress = message.senderAddress - val sentAt = message.sentAt - val peerAddress = - if (recipientAddress == client.address) senderAddress else recipientAddress - val existing = seenPeers[peerAddress] - if (existing == null) { - seenPeers[peerAddress] = sentAt - continue - } - if (existing > sentAt) { - seenPeers[peerAddress] = sentAt - } + val contacts = client.contacts + contacts.refreshConsentList() + if (contacts.consentList.state(peerAddress) == ConsentState.UNKNOWN) { + contacts.allow(listOf(peerAddress)) } - return seenPeers } - /** - * Get the list of invitations using the data sent [pagination] - * @param pagination Information of the topics, ranges (dates), etc. - * @return List of [SealedInvitation] that are inside of the range specified by [pagination] - */ - private suspend fun listInvitations(pagination: Pagination? = null): List { - val apiClient = client.apiClient ?: throw XMTPException("V2 only function") - val envelopes = - apiClient.envelopes(Topic.userInvite(client.address).description, pagination) - return envelopes.map { envelope -> - SealedInvitation.parseFrom(envelope.message) + fun stream(): Flow = callbackFlow { + val streamedConversationTopics: MutableSet = mutableSetOf() + val subscriptionCallback = object : FfiV2SubscriptionCallback { + override fun onMessage(message: FfiEnvelope) { + val envelope = envelopeFromFFi(message) + if (envelope.contentTopic == Topic.userIntro(client.address).description) { + val conversationV1 = fromIntro(envelope = envelope) + if (!streamedConversationTopics.contains(conversationV1.topic)) { + streamedConversationTopics.add(conversationV1.topic) + trySend(conversationV1) + } + } + + if (envelope.contentTopic == Topic.userInvite(client.address).description) { + val conversationV2 = fromInvite(envelope = envelope) + if (!streamedConversationTopics.contains(conversationV2.topic)) { + streamedConversationTopics.add(conversationV2.topic) + trySend(conversationV2) + } + } + } } - } - fun conversation(sealedInvitation: SealedInvitation): ConversationV2 { - val unsealed = sealedInvitation.v1.getInvitation(viewer = client.keys) - return ConversationV2.create( - client = client, - invitation = unsealed, - header = sealedInvitation.v1.header, + val stream = client.subscribe2( + FfiV2SubscribeRequest( + listOf( + Topic.userIntro(client.address).description, + Topic.userInvite(client.address).description + ) + ), + subscriptionCallback ) + + awaitClose { launch { stream.end() } } } - /** + fun streamAll(): Flow { + return merge(streamGroupConversations(), stream()) + } + + fun streamConversations(): Flow = callbackFlow { + if (client.hasV2Client) throw XMTPException("Only supported for V3 only clients.") + val conversationCallback = object : FfiConversationCallback { + override fun onConversation(conversation: FfiConversation) { + if (conversation.groupMetadata().conversationType() == "dm") { + trySend(Conversation.Dm(Dm(client, conversation))) + } else { + trySend(Conversation.Group(Group(client, conversation))) + } + } + } + val stream = libXMTPConversations?.stream(conversationCallback) + ?: throw XMTPException("Client does not support Groups") + awaitClose { stream.end() } + } + + private fun streamGroupConversations(): Flow = callbackFlow { + val groupCallback = object : FfiConversationCallback { + override fun onConversation(conversation: FfiConversation) { + trySend(Conversation.Group(Group(client, conversation))) + } + } + + val stream = libXMTPConversations?.streamGroups(groupCallback) + ?: throw XMTPException("Client does not support Groups") + awaitClose { stream.end() } + } + + fun streamGroups(): Flow = callbackFlow { + val groupCallback = object : FfiConversationCallback { + override fun onConversation(conversation: FfiConversation) { + trySend(Group(client, conversation)) + } + } + val stream = libXMTPConversations?.streamGroups(groupCallback) + ?: throw XMTPException("Client does not support Groups") + awaitClose { stream.end() } + } + + fun streamAllMessages(includeGroups: Boolean = false): Flow { + return if (includeGroups) { + merge(streamAllV2Messages(), streamAllGroupMessages()) + } else { + streamAllV2Messages() + } + } + + fun streamAllDecryptedMessages(includeGroups: Boolean = false): Flow { + return if (includeGroups) { + merge(streamAllV2DecryptedMessages(), streamAllGroupDecryptedMessages()) + } else { + streamAllV2DecryptedMessages() + } + } + + fun streamAllGroupMessages(): Flow = callbackFlow { + val messageCallback = object : FfiMessageCallback { + override fun onMessage(message: FfiMessage) { + val decodedMessage = MessageV3(client, message).decodeOrNull() + decodedMessage?.let { + trySend(it) + } + } + } + val stream = libXMTPConversations?.streamAllGroupMessages(messageCallback) + ?: throw XMTPException("Client does not support Groups") + awaitClose { stream.end() } + } + + fun streamAllGroupDecryptedMessages(): Flow = callbackFlow { + val messageCallback = object : FfiMessageCallback { + override fun onMessage(message: FfiMessage) { + val decryptedMessage = MessageV3(client, message).decryptOrNull() + decryptedMessage?.let { + trySend(it) + } + } + } + val stream = libXMTPConversations?.streamAllGroupMessages(messageCallback) + ?: throw XMTPException("Client does not support Groups") + awaitClose { stream.end() } + } + + fun streamAllConversationMessages(): Flow = callbackFlow { + if (client.hasV2Client) throw XMTPException("Only supported for V3 only clients.") + val messageCallback = object : FfiMessageCallback { + override fun onMessage(message: FfiMessage) { + val conversation = client.findConversation(message.convoId.toHex()) + val decodedMessage = MessageV3(client, message).decodeOrNull() + when (conversation?.version) { + Conversation.Version.DM -> { + decodedMessage?.let { trySend(it) } + } + + else -> { + decodedMessage?.let { trySend(it) } + } + } + } + } + + val stream = libXMTPConversations?.streamAllMessages(messageCallback) + ?: throw XMTPException("Client does not support Groups") + + awaitClose { stream.end() } + } + + fun streamAllConversationDecryptedMessages(): Flow = callbackFlow { + if (client.hasV2Client) throw XMTPException("Only supported for V3 only clients.") + val messageCallback = object : FfiMessageCallback { + override fun onMessage(message: FfiMessage) { + val conversation = client.findConversation(message.convoId.toHex()) + val decryptedMessage = MessageV3(client, message).decryptOrNull() + + when (conversation?.version) { + Conversation.Version.DM -> { + decryptedMessage?.let { trySend(it) } + } + + else -> { + decryptedMessage?.let { trySend(it) } + } + } + } + } + + val stream = libXMTPConversations?.streamAllMessages(messageCallback) + ?: throw XMTPException("Client does not support Groups") + + awaitClose { stream.end() } + } + + // ------- V1 V2 to be deprecated ------ + + /** * @return This lists messages sent to the [Conversation]. * This pulls messages from multiple conversations in a single call. * @see Conversation.messages @@ -500,6 +693,8 @@ data class Conversations( suspend fun listBatchMessages( topics: List>, ): List { + if (!client.hasV2Client) throw XMTPException("Not supported for V3. The local database handles persistence of messages. Use listConversations order lastMessage") + val requests = topics.map { (topic, page) -> makeQueryRequest(topic = topic, pagination = page) } @@ -534,6 +729,8 @@ data class Conversations( suspend fun listBatchDecryptedMessages( topics: List>, ): List { + if (!client.hasV2Client) throw XMTPException("Not supported for V3. The local database handles persistence of messages. Use listConversations order lastMessage") + val requests = topics.map { (topic, page) -> makeQueryRequest(topic = topic, pagination = page) } @@ -565,6 +762,86 @@ data class Conversations( return messages } + fun importTopicData(data: TopicData): Conversation { + if (!client.hasV2Client) throw XMTPException("Not supported for V3. The local database handles persistence.") + val conversation: Conversation + if (!data.hasInvitation()) { + val sentAt = Date(data.createdNs / 1_000_000) + conversation = Conversation.V1( + ConversationV1( + client, + data.peerAddress, + sentAt, + ), + ) + } else { + conversation = Conversation.V2( + ConversationV2( + topic = data.invitation.topic, + keyMaterial = data.invitation.aes256GcmHkdfSha256.keyMaterial.toByteArray(), + context = data.invitation.context, + peerAddress = data.peerAddress, + client = client, + createdAtNs = data.createdNs, + header = Invitation.SealedInvitationHeaderV1.getDefaultInstance(), + consentProof = if (data.invitation.hasConsentProof()) data.invitation.consentProof else null + ), + ) + } + conversationsByTopic[conversation.topic] = conversation + return conversation + } + + /** + * This method creates a new conversation from an invitation. + * @param envelope Object that contains the information of the current [Client] such as topic + * and timestamp. + * @return [Conversation] from an invitation suing the current [Client]. + */ + fun fromInvite(envelope: Envelope): Conversation { + if (!client.hasV2Client) throw XMTPException("Not supported for V3. Use conversationFromWelcome.") + val sealedInvitation = Invitation.SealedInvitation.parseFrom(envelope.message) + val unsealed = sealedInvitation.v1.getInvitation(viewer = client.keys) + return Conversation.V2( + ConversationV2.create( + client = client, + invitation = unsealed, + header = sealedInvitation.v1.header, + ), + ) + } + + /** + * This method creates a new conversation from an Intro. + * @param envelope Object that contains the information of the current [Client] such as topic + * and timestamp. + * @return [Conversation] from an Intro suing the current [Client]. + */ + fun fromIntro(envelope: Envelope): Conversation { + if (!client.hasV2Client) throw XMTPException("Not supported for V3. Use conversationFromWelcome.") + val messageV1 = MessageV1Builder.buildFromBytes(envelope.message.toByteArray()) + val senderAddress = messageV1.header.sender.walletAddress + val recipientAddress = messageV1.header.recipient.walletAddress + val peerAddress = if (client.address == senderAddress) recipientAddress else senderAddress + return Conversation.V1( + ConversationV1( + client = client, + peerAddress = peerAddress, + sentAt = messageV1.sentAt, + ), + ) + } + + fun conversation(sealedInvitation: SealedInvitation): ConversationV2 { + if (!client.hasV2Client) throw XMTPException("Not supported for V3. Use client.findDm to find the dm.") + val unsealed = sealedInvitation.v1.getInvitation(viewer = client.keys) + return ConversationV2.create( + client = client, + invitation = unsealed, + header = sealedInvitation.v1.header, + ) + } + /** * Send an invitation from the current [Client] to the specified recipient (Client) * @param recipient The public key of the client that you want to send the invitation @@ -577,6 +854,7 @@ data class Conversations( invitation: InvitationV1, created: Date, ): SealedInvitation { + if (!client.hasV2Client) throw XMTPException("Not supported for V3. Use newConversation to create welcome.") client.keys.let { val sealed = SealedInvitationBuilder.buildFromV1( sender = it, @@ -608,101 +886,6 @@ data class Conversations( } } - /** - * This subscribes the current [Client] to a topic as userIntro and userInvite and returns a flow - * of the information of those conversations according to the topics - * @return Stream of data information for the conversations - */ - fun stream(): Flow = callbackFlow { - val streamedConversationTopics: MutableSet = mutableSetOf() - val subscriptionCallback = object : FfiV2SubscriptionCallback { - override fun onMessage(message: FfiEnvelope) { - val envelope = envelopeFromFFi(message) - if (envelope.contentTopic == Topic.userIntro(client.address).description) { - val conversationV1 = fromIntro(envelope = envelope) - if (!streamedConversationTopics.contains(conversationV1.topic)) { - streamedConversationTopics.add(conversationV1.topic) - trySend(conversationV1) - } - } - - if (envelope.contentTopic == Topic.userInvite(client.address).description) { - val conversationV2 = fromInvite(envelope = envelope) - if (!streamedConversationTopics.contains(conversationV2.topic)) { - streamedConversationTopics.add(conversationV2.topic) - trySend(conversationV2) - } - } - } - } - - val stream = client.subscribe2( - FfiV2SubscribeRequest( - listOf( - Topic.userIntro(client.address).description, - Topic.userInvite(client.address).description - ) - ), - subscriptionCallback - ) - - awaitClose { launch { stream.end() } } - } - - fun streamAll(): Flow { - return merge(streamGroupConversations(), stream()) - } - - private fun streamGroupConversations(): Flow = callbackFlow { - val groupCallback = object : FfiConversationCallback { - override fun onConversation(conversation: FfiConversation) { - trySend(Conversation.Group(Group(client, conversation))) - } - } - val stream = libXMTPConversations?.streamGroups(groupCallback) - ?: throw XMTPException("Client does not support Groups") - awaitClose { stream.end() } - } - - fun streamGroups(): Flow = callbackFlow { - val groupCallback = object : FfiConversationCallback { - override fun onConversation(conversation: FfiConversation) { - trySend(Group(client, conversation)) - } - } - val stream = libXMTPConversations?.streamGroups(groupCallback) - ?: throw XMTPException("Client does not support Groups") - awaitClose { stream.end() } - } - - fun streamAllGroupMessages(): Flow = callbackFlow { - val messageCallback = object : FfiMessageCallback { - override fun onMessage(message: FfiMessage) { - val decodedMessage = MessageV3(client, message).decodeOrNull() - decodedMessage?.let { - trySend(it) - } - } - } - val stream = libXMTPConversations?.streamAllGroupMessages(messageCallback) - ?: throw XMTPException("Client does not support Groups") - awaitClose { stream.end() } - } - - fun streamAllGroupDecryptedMessages(): Flow = callbackFlow { - val messageCallback = object : FfiMessageCallback { - override fun onMessage(message: FfiMessage) { - val decryptedMessage = MessageV3(client, message).decryptOrNull() - decryptedMessage?.let { - trySend(it) - } - } - } - val stream = libXMTPConversations?.streamAllGroupMessages(messageCallback) - ?: throw XMTPException("Client does not support Groups") - awaitClose { stream.end() } - } - /** * Get the stream of all messages of the current [Client] * @return Flow object of [DecodedMessage] that represents all the messages of the @@ -758,22 +941,6 @@ data class Conversations( awaitClose { launch { stream.end() } } } - fun streamAllMessages(includeGroups: Boolean = false): Flow { - return if (includeGroups) { - merge(streamAllV2Messages(), streamAllGroupMessages()) - } else { - streamAllV2Messages() - } - } - - fun streamAllDecryptedMessages(includeGroups: Boolean = false): Flow { - return if (includeGroups) { - merge(streamAllV2DecryptedMessages(), streamAllGroupDecryptedMessages()) - } else { - streamAllV2DecryptedMessages() - } - } - private fun streamAllV2DecryptedMessages(): Flow = callbackFlow { val topics = mutableListOf( Topic.userInvite(client.address).description, @@ -823,4 +990,56 @@ data class Conversations( awaitClose { launch { stream.end() } } } + + /** + * Get the list of invitations using the data sent [pagination] + * @param pagination Information of the topics, ranges (dates), etc. + * @return List of [SealedInvitation] that are inside of the range specified by [pagination] + */ + private suspend fun listInvitations(pagination: Pagination? = null): List { + if (!client.hasV2Client) throw XMTPException("Not supported for V3. Use conversationFromWelcome.") + val apiClient = client.apiClient ?: throw XMTPException("V2 only function") + val envelopes = + apiClient.envelopes(Topic.userInvite(client.address).description, pagination) + return envelopes.map { envelope -> + SealedInvitation.parseFrom(envelope.message) + } + } + + private suspend fun listIntroductionPeers(pagination: Pagination? = null): Map { + if (!client.hasV2Client) throw XMTPException("Not supported for V3. Use conversationFromWelcome.") + val apiClient = client.apiClient ?: throw XMTPException("V2 only function") + val envelopes = apiClient.queryTopic( + topic = Topic.userIntro(client.address), + pagination = pagination, + ).envelopesList + val messages = envelopes.mapNotNull { envelope -> + try { + val message = MessageV1Builder.buildFromBytes(envelope.message.toByteArray()) + // Attempt to decrypt, just to make sure we can + message.decrypt(client.privateKeyBundleV1) + message + } catch (e: Exception) { + Log.d(TAG, e.message.toString()) + null + } + } + val seenPeers: MutableMap = mutableMapOf() + for (message in messages) { + val recipientAddress = message.recipientAddress + val senderAddress = message.senderAddress + val sentAt = message.sentAt + val peerAddress = + if (recipientAddress == client.address) senderAddress else recipientAddress + val existing = seenPeers[peerAddress] + if (existing == null) { + seenPeers[peerAddress] = sentAt + continue + } + if (existing > sentAt) { + seenPeers[peerAddress] = sentAt + } + } + return seenPeers + } } diff --git a/library/src/main/java/org/xmtp/android/library/Dm.kt b/library/src/main/java/org/xmtp/android/library/Dm.kt new file mode 100644 index 000000000..ec53ce397 --- /dev/null +++ b/library/src/main/java/org/xmtp/android/library/Dm.kt @@ -0,0 +1,222 @@ +package org.xmtp.android.library + +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow +import org.xmtp.android.library.codecs.ContentCodec +import org.xmtp.android.library.codecs.EncodedContent +import org.xmtp.android.library.codecs.compress +import org.xmtp.android.library.libxmtp.Member +import org.xmtp.android.library.libxmtp.MessageV3 +import org.xmtp.android.library.messages.DecryptedMessage +import org.xmtp.android.library.messages.MessageDeliveryStatus +import org.xmtp.android.library.messages.PagingInfoSortDirection +import org.xmtp.android.library.messages.Topic +import org.xmtp.proto.message.api.v1.MessageApiOuterClass.SortDirection +import uniffi.xmtpv3.FfiConversation +import uniffi.xmtpv3.FfiConversationMetadata +import uniffi.xmtpv3.FfiDeliveryStatus +import uniffi.xmtpv3.FfiDirection +import uniffi.xmtpv3.FfiListMessagesOptions +import uniffi.xmtpv3.FfiMessage +import uniffi.xmtpv3.FfiMessageCallback +import java.util.Date +import kotlin.time.Duration.Companion.nanoseconds +import kotlin.time.DurationUnit + +class Dm(val client: Client, private val libXMTPGroup: FfiConversation) { + val id: String + get() = libXMTPGroup.id().toHex() + + val topic: String + get() = Topic.groupMessage(id).description + + val createdAt: Date + get() = Date(libXMTPGroup.createdAtNs() / 1_000_000) + + private val metadata: FfiConversationMetadata + get() = libXMTPGroup.groupMetadata() + + suspend fun send(text: String): String { + return send(encodeContent(content = text, options = null)) + } + + suspend fun send(content: T, options: SendOptions? = null): String { + val preparedMessage = encodeContent(content = content, options = options) + return send(preparedMessage) + } + + suspend fun send(encodedContent: EncodedContent): String { + if (consentState() == ConsentState.UNKNOWN) { + updateConsentState(ConsentState.ALLOWED) + } + val messageId = libXMTPGroup.send(contentBytes = encodedContent.toByteArray()) + return messageId.toHex() + } + + fun encodeContent(content: T, options: SendOptions?): EncodedContent { + val codec = Client.codecRegistry.find(options?.contentType) + + fun > encode(codec: Codec, content: Any?): EncodedContent { + val contentType = content as? T + if (contentType != null) { + return codec.encode(contentType) + } else { + throw XMTPException("Codec type is not registered") + } + } + + var encoded = encode(codec = codec as ContentCodec, content = content) + val fallback = codec.fallback(content) + if (!fallback.isNullOrBlank()) { + encoded = encoded.toBuilder().also { + it.fallback = fallback + }.build() + } + val compression = options?.compression + if (compression != null) { + encoded = encoded.compress(compression) + } + return encoded + } + + suspend fun prepareMessage(content: T, options: SendOptions? = null): String { + if (consentState() == ConsentState.UNKNOWN) { + updateConsentState(ConsentState.ALLOWED) + } + val encodeContent = encodeContent(content = content, options = options) + return libXMTPGroup.sendOptimistic(encodeContent.toByteArray()).toHex() + } + + suspend fun publishMessages() { + libXMTPGroup.publishMessages() + } + + suspend fun sync() { + libXMTPGroup.sync() + } + + fun messages( + limit: Int? = null, + before: Date? = null, + after: Date? = null, + direction: PagingInfoSortDirection = SortDirection.SORT_DIRECTION_DESCENDING, + deliveryStatus: MessageDeliveryStatus = MessageDeliveryStatus.ALL, + ): List { + return libXMTPGroup.findMessages( + opts = FfiListMessagesOptions( + sentBeforeNs = before?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS), + sentAfterNs = after?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS), + limit = limit?.toLong(), + deliveryStatus = when (deliveryStatus) { + MessageDeliveryStatus.PUBLISHED -> FfiDeliveryStatus.PUBLISHED + MessageDeliveryStatus.UNPUBLISHED -> FfiDeliveryStatus.UNPUBLISHED + MessageDeliveryStatus.FAILED -> FfiDeliveryStatus.FAILED + else -> null + }, + direction = when (direction) { + SortDirection.SORT_DIRECTION_ASCENDING -> FfiDirection.ASCENDING + else -> FfiDirection.DESCENDING + } + ) + ).mapNotNull { + MessageV3(client, it).decodeOrNull() + } + } + + fun decryptedMessages( + limit: Int? = null, + before: Date? = null, + after: Date? = null, + direction: PagingInfoSortDirection = SortDirection.SORT_DIRECTION_DESCENDING, + deliveryStatus: MessageDeliveryStatus = MessageDeliveryStatus.ALL, + ): List { + return libXMTPGroup.findMessages( + opts = FfiListMessagesOptions( + sentBeforeNs = before?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS), + sentAfterNs = after?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS), + limit = limit?.toLong(), + deliveryStatus = when (deliveryStatus) { + MessageDeliveryStatus.PUBLISHED -> FfiDeliveryStatus.PUBLISHED + MessageDeliveryStatus.UNPUBLISHED -> FfiDeliveryStatus.UNPUBLISHED + MessageDeliveryStatus.FAILED -> FfiDeliveryStatus.FAILED + else -> null + }, + direction = when (direction) { + SortDirection.SORT_DIRECTION_ASCENDING -> FfiDirection.ASCENDING + else -> FfiDirection.DESCENDING + } + ) + ).mapNotNull { + MessageV3(client, it).decryptOrNull() + } + } + + suspend fun processMessage(envelopeBytes: ByteArray): MessageV3 { + val message = libXMTPGroup.processStreamedConversationMessage(envelopeBytes) + return MessageV3(client, message) + } + + fun creatorInboxId(): String { + return metadata.creatorInboxId() + } + + fun isCreator(): Boolean { + return metadata.creatorInboxId() == client.inboxId + } + + suspend fun members(): List { + return libXMTPGroup.listMembers().map { Member(it) } + } + + suspend fun peerInboxId(): String { + val ids = members().map { it.inboxId }.toMutableList() + ids.remove(client.inboxId) + return ids.first() + } + + fun streamMessages(): Flow = callbackFlow { + val messageCallback = object : FfiMessageCallback { + override fun onMessage(message: FfiMessage) { + val decodedMessage = MessageV3(client, message).decodeOrNull() + decodedMessage?.let { + trySend(it) + } + } + } + + val stream = libXMTPGroup.stream(messageCallback) + awaitClose { stream.end() } + } + + fun streamDecryptedMessages(): Flow = callbackFlow { + val messageCallback = object : FfiMessageCallback { + override fun onMessage(message: FfiMessage) { + val decryptedMessage = MessageV3(client, message).decryptOrNull() + decryptedMessage?.let { + trySend(it) + } + } + } + + val stream = libXMTPGroup.stream(messageCallback) + awaitClose { stream.end() } + } + + suspend fun updateConsentState(state: ConsentState) { + if (client.hasV2Client) { + when (state) { + ConsentState.ALLOWED -> client.contacts.allowGroups(groupIds = listOf(id)) + ConsentState.DENIED -> client.contacts.denyGroups(groupIds = listOf(id)) + ConsentState.UNKNOWN -> Unit + } + } + + val consentState = ConsentState.toFfiConsentState(state) + libXMTPGroup.updateConsentState(consentState) + } + + fun consentState(): ConsentState { + return ConsentState.fromFfiConsentState(libXMTPGroup.consentState()) + } +}