Skip to content

Commit

Permalink
Ephemeral topics (#105)
Browse files Browse the repository at this point in the history
* add api client with grpc kotlin

* add Ephemeral topic code to match swift

* add tests for it

* remove the flaky part of the test
  • Loading branch information
nplasterer authored Aug 4, 2023
1 parent 5cfaa52 commit 1183b26
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,4 +345,48 @@ class LocalInstrumentedTest {
private fun delayToPropagate() {
Thread.sleep(500)
}

@Test
fun testStreamEphemeralInV1Conversation() {
val bob = PrivateKeyBuilder()
val alice = PrivateKeyBuilder()
val clientOptions =
ClientOptions(api = ClientOptions.Api(env = XMTPEnvironment.LOCAL, isSecure = false))
val bobClient = Client().create(bob, clientOptions)
val aliceClient = Client().create(account = alice, options = clientOptions)
aliceClient.publishUserContact(legacy = true)
bobClient.publishUserContact(legacy = true)
val convo = ConversationV1(client = bobClient, peerAddress = alice.address, sentAt = Date())
convo.streamEphemeral().mapLatest {
assertEquals("hi", it.message.toStringUtf8())
}
convo.send(content = "hi", options = SendOptions(ephemeral = true))
val messages = convo.messages()
assertEquals(0, messages.size)
}

@Test
fun testStreamEphemeralInV2Conversation() {
val bob = PrivateKeyBuilder()
val alice = PrivateKeyBuilder()
val clientOptions =
ClientOptions(api = ClientOptions.Api(env = XMTPEnvironment.LOCAL, isSecure = false))
val bobClient = Client().create(bob, clientOptions)
val aliceClient = Client().create(account = alice, options = clientOptions)
val aliceConversation = aliceClient.conversations.newConversation(
bob.address,
context = InvitationV1ContextBuilder.buildFromConversation("https://example.com/3")
)
val bobConversation = bobClient.conversations.newConversation(
alice.address,
context = InvitationV1ContextBuilder.buildFromConversation("https://example.com/3")
)

bobConversation.streamEphemeral().mapLatest {
assertEquals("hi", it.message.toStringUtf8())
}
aliceConversation.send(content = "hi", options = SendOptions(ephemeral = true))
val messages = aliceConversation.messages()
assertEquals(0, messages.size)
}
}
19 changes: 16 additions & 3 deletions library/src/main/java/org/xmtp/android/library/Conversation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,18 @@ sealed class Conversation {
}
}

fun send(encodedContent: EncodedContent): String {
fun send(encodedContent: EncodedContent, options: SendOptions? = null): String {
return when (this) {
is V1 -> conversationV1.send(encodedContent = encodedContent)
is V2 -> conversationV2.send(encodedContent = encodedContent)
is V1 -> conversationV1.send(encodedContent = encodedContent, options = options)
is V2 -> conversationV2.send(encodedContent = encodedContent, options = options)
}
}

val clientAddress: String
get() {
return client.address
}

val topic: String
get() {
return when (this) {
Expand Down Expand Up @@ -176,4 +182,11 @@ sealed class Conversation {
is V2 -> conversationV2.streamMessages()
}
}

fun streamEphemeral(): Flow<Envelope> {
return when (this) {
is V1 -> return conversationV1.streamEphemeral()
is V2 -> return conversationV2.streamEphemeral()
}
}
}
30 changes: 23 additions & 7 deletions library/src/main/java/org/xmtp/android/library/ConversationV1.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.xmtp.android.library

import android.util.Log
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import org.web3j.crypto.Hash
Expand Down Expand Up @@ -97,8 +98,8 @@ data class ConversationV1(
return preparedMessage.messageId
}

fun send(encodedContent: EncodedContent): String {
val preparedMessage = prepareMessage(encodedContent = encodedContent)
fun send(encodedContent: EncodedContent, options: SendOptions? = null): String {
val preparedMessage = prepareMessage(encodedContent = encodedContent, options = options)
preparedMessage.send()
return preparedMessage.messageId
}
Expand All @@ -123,10 +124,13 @@ data class ConversationV1(
if (compression != null) {
encoded = encoded.compress(compression)
}
return prepareMessage(encodedContent = encoded)
return prepareMessage(encodedContent = encoded, options = options)
}

fun prepareMessage(encodedContent: EncodedContent): PreparedMessage {
fun prepareMessage(
encodedContent: EncodedContent,
options: SendOptions? = null,
): PreparedMessage {
val contact = client.contacts.find(peerAddress) ?: throw XMTPException("address not found")
val recipient = contact.toPublicKeyBundle()
if (!recipient.identityKey.hasSignature()) {
Expand All @@ -139,9 +143,12 @@ data class ConversationV1(
message = encodedContent.toByteArray(),
timestamp = date
)

val isEphemeral: Boolean = options != null && options.ephemeral

val messageEnvelope =
EnvelopeBuilder.buildFromTopic(
topic = Topic.directMessageV1(client.address, peerAddress),
EnvelopeBuilder.buildFromString(
topic = if (isEphemeral) ephemeralTopic else topic.description,
timestamp = date,
message = MessageBuilder.buildFromMessageV1(v1 = message).toByteArray()
)
Expand All @@ -150,7 +157,7 @@ data class ConversationV1(
conversation = Conversation.V1(this)
) {
val envelopes = mutableListOf(messageEnvelope)
if (client.contacts.needsIntroduction(peerAddress)) {
if (client.contacts.needsIntroduction(peerAddress) && !isEphemeral) {
envelopes.addAll(
listOf(
EnvelopeBuilder.buildFromTopic(
Expand All @@ -173,4 +180,13 @@ data class ConversationV1(

private fun generateId(envelope: Envelope): String =
Hash.sha256(envelope.message.toByteArray()).toHex()

val ephemeralTopic: String
get() = topic.description.replace("/xmtp/0/dm-", "/xmtp/0/dmE-")

fun streamEphemeral(): Flow<Envelope> = flow {
client.subscribe(topics = listOf(ephemeralTopic)).collect {
emit(it)
}
}
}
22 changes: 17 additions & 5 deletions library/src/main/java/org/xmtp/android/library/ConversationV2.kt
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ data class ConversationV2(
return preparedMessage.messageId
}

fun send(encodedContent: EncodedContent): String {
val preparedMessage = prepareMessage(encodedContent = encodedContent)
fun send(encodedContent: EncodedContent, options: SendOptions?): String {
val preparedMessage = prepareMessage(encodedContent = encodedContent, options = options)
preparedMessage.send()
return preparedMessage.messageId
}
Expand Down Expand Up @@ -155,18 +155,21 @@ data class ConversationV2(
if (compression != null) {
encoded = encoded.compress(compression)
}
return prepareMessage(encoded)
return prepareMessage(encoded, options = options)
}

fun prepareMessage(encodedContent: EncodedContent): PreparedMessage {
fun prepareMessage(encodedContent: EncodedContent, options: SendOptions?): PreparedMessage {
val message = MessageV2Builder.buildEncode(
client = client,
encodedContent = encodedContent,
topic = topic,
keyMaterial = keyMaterial
)

val newTopic = if (options?.ephemeral == true) ephemeralTopic else topic

val envelope = EnvelopeBuilder.buildFromString(
topic = topic,
topic = newTopic,
timestamp = Date(),
message = MessageBuilder.buildFromMessageV2(v2 = message).toByteArray()
)
Expand All @@ -177,4 +180,13 @@ data class ConversationV2(

private fun generateId(envelope: Envelope): String =
Hash.sha256(envelope.message.toByteArray()).toHex()

val ephemeralTopic: String
get() = topic.replace("/xmtp/0/m", "/xmtp/0/mE")

fun streamEphemeral(): Flow<Envelope> = flow {
client.subscribe(topics = listOf(ephemeralTopic)).collect {
emit(it)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ import org.xmtp.proto.message.contents.Content
data class SendOptions(
var compression: EncodedContentCompression? = null,
var contentType: Content.ContentTypeId? = null,
var contentFallback: String? = null
var contentFallback: String? = null,
var ephemeral: Boolean = false
)

0 comments on commit 1183b26

Please sign in to comment.