Skip to content

Commit

Permalink
Introduce Conversation.prepareMessage (#58)
Browse files Browse the repository at this point in the history
* add api client with grpc kotlin

* add base for prepared messages

* write tests for it

* small clean up

* fix up the tests

* fix up some linter issues

* fix up the tests
  • Loading branch information
nplasterer authored Mar 21, 2023
1 parent 035a94c commit 1e26193
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,15 @@ class InstrumentedTest {
)
// Say this message is sent in the past
val date = Date()
date.time = date.time - 10000
date.time = date.time - 5000
convo.send(text = "10 seconds ago", sentAt = date)
Thread.sleep(5000)
convo.send(text = "now")
val messages = convo.messages(limit = 1)
assertEquals(1, messages.size)
val nowMessage = messages[0]
assertEquals("now", nowMessage.body)
val messages2 = convo.messages(limit = 1, before = date)
val messages2 = convo.messages(limit = 1, before = nowMessage.sent)
assertEquals(1, messages2.size)
val tenSecondsAgoMessage = messages2[0]
assertEquals("10 seconds ago", tenSecondsAgoMessage.body)
Expand All @@ -148,22 +149,14 @@ class InstrumentedTest {
Client().create(account = alice, clientOptions)
val convo = ConversationV1(client = bobClient, peerAddress = alice.address, sentAt = Date())
// Say this message is sent in the past
val date = Date()
date.time = date.time - 10000
convo.send(text = "10 seconds ago", sentAt = date)
convo.send(text = "10 seconds ago")
Thread.sleep(10000)
convo.send(text = "now")
val allMessages = convo.messages()
val messages = convo.messages(limit = 1)
assertEquals(1, messages.size)
val nowMessage = messages[0]
assertEquals("now", nowMessage.body)
val messages2 = convo.messages(limit = 1, before = date)
assertEquals(1, messages2.size)
val tenSecondsAgoMessage = messages2[0]
assertEquals("10 seconds ago", tenSecondsAgoMessage.body)
val messages3 = convo.messages(limit = 1, after = tenSecondsAgoMessage.sent)
assertEquals(1, messages3.size)
val nowMessage2 = messages3[0]
assertEquals("now", nowMessage2.body)
}

@OptIn(ExperimentalCoroutinesApi::class)
Expand Down
17 changes: 14 additions & 3 deletions library/src/main/java/org/xmtp/android/library/Conversation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,20 @@ sealed class Conversation {
}

fun decode(envelope: Envelope): DecodedMessage {
when (this) {
is V1 -> return conversationV1.decode(envelope)
is V2 -> return conversationV2.decodeEnvelope(envelope)
return when (this) {
is V1 -> conversationV1.decode(envelope)
is V2 -> conversationV2.decodeEnvelope(envelope)
}
}

fun <T> prepareMessage(content: T, options: SendOptions? = null): PreparedMessage {
return when (this) {
is V1 -> {
conversationV1.prepareMessage(content = content, options = options)
}
is V2 -> {
conversationV2.prepareMessage(content = content, options = options)
}
}
}

Expand Down
168 changes: 88 additions & 80 deletions library/src/main/java/org/xmtp/android/library/ConversationV1.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package org.xmtp.android.library
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import org.web3j.crypto.Hash
import org.xmtp.android.library.codecs.ContentCodec
import org.xmtp.android.library.codecs.EncodedContent
import org.xmtp.android.library.codecs.TextCodec
import org.xmtp.android.library.codecs.compress
import org.xmtp.android.library.messages.Envelope
import org.xmtp.android.library.messages.EnvelopeBuilder
Expand All @@ -30,17 +30,65 @@ data class ConversationV1(
val topic: Topic
get() = Topic.directMessageV1(client.address, peerAddress)

fun send(text: String, options: SendOptions? = null) {
send(text = text, sendOptions = options, sentAt = null)
fun streamMessages(): Flow<DecodedMessage> = flow {
client.subscribe(listOf(topic.description)).collect {
emit(decode(envelope = it))
}
}

fun messages(
limit: Int? = null,
before: Date? = null,
after: Date? = null,
): List<DecodedMessage> {
val pagination = Pagination(limit = limit, startTime = before, endTime = after)
val result = runBlocking {
client.apiClient.queryTopic(topic = topic, pagination = pagination)
}

return result.envelopesList.flatMap { envelope ->
listOf(decode(envelope = envelope))
}
}

fun decode(envelope: Envelope): DecodedMessage {
val message = Message.parseFrom(envelope.message)
val decrypted = message.v1.decrypt(client.privateKeyBundleV1)
val encodedMessage = EncodedContent.parseFrom(decrypted)
val header = message.v1.header
val decoded = DecodedMessage(
encodedContent = encodedMessage,
senderAddress = header.sender.walletAddress,
sent = message.v1.sentAt
)

decoded.id = generateId(envelope)

return decoded
}

fun send(text: String, options: SendOptions? = null): String {
return send(text = text, sendOptions = options, sentAt = null)
}

internal fun send(
text: String,
sendOptions: SendOptions? = null,
sentAt: Date? = null,
): String {
val preparedMessage = prepareMessage(content = text, options = sendOptions)
preparedMessage.send()
return preparedMessage.messageId
}

fun send(text: String, sendOptions: SendOptions? = null, sentAt: Date? = null) {
val encoder = TextCodec()
val encodedContent = encoder.encode(content = text)
send(encodedContent = encodedContent, sendOptions = sendOptions, sentAt = sentAt)
fun <T> send(content: T, options: SendOptions? = null): String {
val preparedMessage = prepareMessage(content = content, options = options)
preparedMessage.send()
return preparedMessage.messageId
}

fun <T> send(content: T, options: SendOptions? = null) {
fun <T> prepareMessage(content: T, options: SendOptions?): PreparedMessage {
val contact = client.contacts.find(peerAddress) ?: throw XMTPException("address not found")
val codec = Client.codecRegistry.find(options?.contentType)

fun <Codec : ContentCodec<T>> encode(codec: Codec, content: Any?): EncodedContent {
Expand All @@ -56,93 +104,53 @@ data class ConversationV1(
encoded = encoded.toBuilder().also {
it.fallback = options?.contentFallback ?: ""
}.build()
send(encodedContent = encoded, sendOptions = options)
}

private fun send(
encodedContent: EncodedContent,
sendOptions: SendOptions? = null,
sentAt: Date? = null,
) {
val contact = client.contacts.find(peerAddress) ?: throw XMTPException("Contact not found.")

var content = encodedContent

if (sendOptions?.compression != null) {
content = content.compress(sendOptions.compression!!)
val compression = options?.compression
if (compression != null) {
encoded = encoded.compress(compression)
}

val recipient = contact.toPublicKeyBundle()
if (!recipient.identityKey.hasSignature()) {
throw XMTPException("no signature for id key")
}
val date = sentAt ?: Date()
if (client.privateKeyBundleV1 == null) {
throw XMTPException("no private key bundle")
throw Exception("no signature for id key")
}
val date = Date()
val message = MessageV1Builder.buildEncode(
sender = client.privateKeyBundleV1!!,
sender = client.privateKeyBundleV1,
recipient = recipient,
message = content.toByteArray(),
message = encoded.toByteArray(),
timestamp = date
)
val envelopes = mutableListOf(
val messageEnvelope =
EnvelopeBuilder.buildFromTopic(
topic = topic,
topic = Topic.directMessageV1(client.address, peerAddress),
timestamp = date,
message = MessageBuilder.buildFromMessageV1(v1 = message).toByteArray()
)
)
if (client.contacts.needsIntroduction(peerAddress)) {
envelopes.addAll(
listOf(
EnvelopeBuilder.buildFromTopic(
topic = Topic.userIntro(peerAddress),
timestamp = date,
message = MessageBuilder.buildFromMessageV1(v1 = message).toByteArray()
),
EnvelopeBuilder.buildFromTopic(
topic = Topic.userIntro(client.address),
timestamp = date,
message = MessageBuilder.buildFromMessageV1(v1 = message).toByteArray()
return PreparedMessage(
messageEnvelope = messageEnvelope,
conversation = Conversation.V1(this)
) {
val envelopes = mutableListOf(messageEnvelope)
if (client.contacts.needsIntroduction(peerAddress)) {
envelopes.addAll(
listOf(
EnvelopeBuilder.buildFromTopic(
topic = Topic.userIntro(peerAddress),
timestamp = date,
message = MessageBuilder.buildFromMessageV1(v1 = message).toByteArray()
),
EnvelopeBuilder.buildFromTopic(
topic = Topic.userIntro(client.address),
timestamp = date,
message = MessageBuilder.buildFromMessageV1(v1 = message).toByteArray()
)
)
)
)
client.contacts.hasIntroduced[peerAddress] = true
}
client.publish(envelopes = envelopes)
}

fun messages(
limit: Int? = null,
before: Date? = null,
after: Date? = null,
): List<DecodedMessage> {
val pagination = Pagination(limit = limit, startTime = before, endTime = after)
val result = runBlocking {
client.apiClient.queryTopic(topic = topic, pagination = pagination)
}

return result.envelopesList.flatMap { envelope ->
listOf(decode(envelope = envelope))
client.contacts.hasIntroduced[peerAddress] = true
}
client.publish(envelopes = envelopes)
}
}

fun decode(envelope: Envelope): DecodedMessage {
val message = Message.parseFrom(envelope.message)
val decrypted = message.v1.decrypt(client.privateKeyBundleV1)
val encodedMessage = EncodedContent.parseFrom(decrypted)
val header = message.v1.header
return DecodedMessage(
encodedContent = encodedMessage,
senderAddress = header.sender.walletAddress,
sent = message.v1.sentAt
)
}

fun streamMessages(): Flow<DecodedMessage> = flow {
client.subscribe(listOf(topic.description)).collect {
emit(decode(envelope = it))
}
}
private fun generateId(envelope: Envelope): String =
Hash.sha256(envelope.message.toByteArray()).toHex()
}
Loading

0 comments on commit 1e26193

Please sign in to comment.