diff --git a/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt b/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt index 203ca2116..c3bfa78f3 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt @@ -871,14 +871,14 @@ class GroupTest { runBlocking { alixClient.conversations.syncGroups() } val alixGroup: Group = alixClient.findGroup(boGroup.id)!! assert(!alixClient.contacts.isGroupAllowed(boGroup.id)) - val preparedMessage = runBlocking { alixGroup.prepareMessage("Test text") } + val preparedMessageId = runBlocking { alixGroup.prepareMessage("Test text") } assert(alixClient.contacts.isGroupAllowed(boGroup.id)) assertEquals(alixGroup.messages().size, 1) assertEquals(alixGroup.messages(deliveryStatus = MessageDeliveryStatus.PUBLISHED).size, 0) assertEquals(alixGroup.messages(deliveryStatus = MessageDeliveryStatus.UNPUBLISHED).size, 1) runBlocking { - preparedMessage.publish() + alixGroup.publishMessages() alixGroup.sync() } @@ -888,6 +888,6 @@ class GroupTest { val message = alixGroup.messages().first() - assertEquals(preparedMessage.messageId, message.id) + assertEquals(preparedMessageId, message.id) } } diff --git a/library/src/main/java/libxmtp-version.txt b/library/src/main/java/libxmtp-version.txt index d0a93e3a3..7b5e857e0 100644 --- a/library/src/main/java/libxmtp-version.txt +++ b/library/src/main/java/libxmtp-version.txt @@ -1,3 +1,3 @@ -Version: d4d8134a +Version: ec1da4e6 Branch: main -Date: 2024-07-08 15:44:32 +0000 +Date: 2024-07-09 15:32:56 +0000 diff --git a/library/src/main/java/org/xmtp/android/library/Group.kt b/library/src/main/java/org/xmtp/android/library/Group.kt index d56d5ce65..4c72acb10 100644 --- a/library/src/main/java/org/xmtp/android/library/Group.kt +++ b/library/src/main/java/org/xmtp/android/library/Group.kt @@ -24,7 +24,6 @@ import uniffi.xmtpv3.FfiMetadataField import uniffi.xmtpv3.FfiPermissionUpdateType import uniffi.xmtpv3.org.xmtp.android.library.libxmtp.PermissionOption import uniffi.xmtpv3.org.xmtp.android.library.libxmtp.PermissionPolicySet -import uniffi.xmtpv3.org.xmtp.android.library.libxmtp.UnpublishedMessage import java.util.Date import kotlin.time.Duration.Companion.nanoseconds import kotlin.time.DurationUnit @@ -100,12 +99,16 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) { return encoded } - suspend fun prepareMessage(content: T, options: SendOptions? = null): UnpublishedMessage { + suspend fun prepareMessage(content: T, options: SendOptions? = null): String { if (client.contacts.consentList.groupState(groupId = id) == ConsentState.UNKNOWN) { client.contacts.allowGroups(groupIds = listOf(id)) } val encodeContent = encodeContent(content = content, options = options) - return UnpublishedMessage(libXMTPGroup.sendOptimistic(encodeContent.toByteArray())) + return libXMTPGroup.sendOptimistic(encodeContent.toByteArray()).toHex() + } + + suspend fun publishMessages() { + libXMTPGroup.publishMessages() } suspend fun sync() { diff --git a/library/src/main/java/org/xmtp/android/library/libxmtp/UnpublishedMessage.kt b/library/src/main/java/org/xmtp/android/library/libxmtp/UnpublishedMessage.kt deleted file mode 100644 index 769e999ed..000000000 --- a/library/src/main/java/org/xmtp/android/library/libxmtp/UnpublishedMessage.kt +++ /dev/null @@ -1,14 +0,0 @@ -package uniffi.xmtpv3.org.xmtp.android.library.libxmtp - -import org.xmtp.android.library.toHex -import uniffi.xmtpv3.FfiUnpublishedMessage - -class UnpublishedMessage(private val libXMTPUnpublishedMessage: FfiUnpublishedMessage) { - val messageId: String - get() = libXMTPUnpublishedMessage.id().toHex() - - suspend fun publish(): String { - libXMTPUnpublishedMessage.publish() - return messageId - } -} diff --git a/library/src/main/java/xmtpv3.kt b/library/src/main/java/xmtpv3.kt index 381db9ecc..7f1ef7c1a 100644 --- a/library/src/main/java/xmtpv3.kt +++ b/library/src/main/java/xmtpv3.kt @@ -980,6 +980,10 @@ internal interface UniffiLib : Library { `ptr`: Pointer, `envelopeBytes`: RustBuffer.ByValue, ): Long + fun uniffi_xmtpv3_fn_method_ffigroup_publish_messages( + `ptr`: Pointer, + ): Long + fun uniffi_xmtpv3_fn_method_ffigroup_remove_admin( `ptr`: Pointer, `inboxId`: RustBuffer.ByValue, ): Long @@ -1002,7 +1006,7 @@ internal interface UniffiLib : Library { fun uniffi_xmtpv3_fn_method_ffigroup_send_optimistic( `ptr`: Pointer, `contentBytes`: RustBuffer.ByValue, uniffi_out_err: UniffiRustCallStatus, - ): Pointer + ): RustBuffer.ByValue fun uniffi_xmtpv3_fn_method_ffigroup_stream( `ptr`: Pointer, `messageCallback`: Long, @@ -1118,22 +1122,6 @@ internal interface UniffiLib : Library { `ptr`: Pointer, uniffi_out_err: UniffiRustCallStatus, ): Byte - fun uniffi_xmtpv3_fn_clone_ffiunpublishedmessage( - `ptr`: Pointer, uniffi_out_err: UniffiRustCallStatus, - ): Pointer - - fun uniffi_xmtpv3_fn_free_ffiunpublishedmessage( - `ptr`: Pointer, uniffi_out_err: UniffiRustCallStatus, - ): Unit - - fun uniffi_xmtpv3_fn_method_ffiunpublishedmessage_id( - `ptr`: Pointer, uniffi_out_err: UniffiRustCallStatus, - ): RustBuffer.ByValue - - fun uniffi_xmtpv3_fn_method_ffiunpublishedmessage_publish( - `ptr`: Pointer, - ): Long - fun uniffi_xmtpv3_fn_clone_ffiv2apiclient( `ptr`: Pointer, uniffi_out_err: UniffiRustCallStatus, ): Pointer @@ -1702,6 +1690,9 @@ internal interface UniffiLib : Library { fun uniffi_xmtpv3_checksum_method_ffigroup_process_streamed_group_message( ): Short + fun uniffi_xmtpv3_checksum_method_ffigroup_publish_messages( + ): Short + fun uniffi_xmtpv3_checksum_method_ffigroup_remove_admin( ): Short @@ -1777,12 +1768,6 @@ internal interface UniffiLib : Library { fun uniffi_xmtpv3_checksum_method_ffistreamcloser_is_closed( ): Short - fun uniffi_xmtpv3_checksum_method_ffiunpublishedmessage_id( - ): Short - - fun uniffi_xmtpv3_checksum_method_ffiunpublishedmessage_publish( - ): Short - fun uniffi_xmtpv3_checksum_method_ffiv2apiclient_batch_query( ): Short @@ -2004,6 +1989,9 @@ private fun uniffiCheckApiChecksums(lib: UniffiLib) { if (lib.uniffi_xmtpv3_checksum_method_ffigroup_process_streamed_group_message() != 19069.toShort()) { throw RuntimeException("UniFFI API checksum mismatch: try cleaning and rebuilding your project") } + if (lib.uniffi_xmtpv3_checksum_method_ffigroup_publish_messages() != 52808.toShort()) { + throw RuntimeException("UniFFI API checksum mismatch: try cleaning and rebuilding your project") + } if (lib.uniffi_xmtpv3_checksum_method_ffigroup_remove_admin() != 57094.toShort()) { throw RuntimeException("UniFFI API checksum mismatch: try cleaning and rebuilding your project") } @@ -2019,7 +2007,7 @@ private fun uniffiCheckApiChecksums(lib: UniffiLib) { if (lib.uniffi_xmtpv3_checksum_method_ffigroup_send() != 37701.toShort()) { throw RuntimeException("UniFFI API checksum mismatch: try cleaning and rebuilding your project") } - if (lib.uniffi_xmtpv3_checksum_method_ffigroup_send_optimistic() != 22919.toShort()) { + if (lib.uniffi_xmtpv3_checksum_method_ffigroup_send_optimistic() != 13872.toShort()) { throw RuntimeException("UniFFI API checksum mismatch: try cleaning and rebuilding your project") } if (lib.uniffi_xmtpv3_checksum_method_ffigroup_stream() != 45558.toShort()) { @@ -2079,12 +2067,6 @@ private fun uniffiCheckApiChecksums(lib: UniffiLib) { if (lib.uniffi_xmtpv3_checksum_method_ffistreamcloser_is_closed() != 62423.toShort()) { throw RuntimeException("UniFFI API checksum mismatch: try cleaning and rebuilding your project") } - if (lib.uniffi_xmtpv3_checksum_method_ffiunpublishedmessage_id() != 4148.toShort()) { - throw RuntimeException("UniFFI API checksum mismatch: try cleaning and rebuilding your project") - } - if (lib.uniffi_xmtpv3_checksum_method_ffiunpublishedmessage_publish() != 47708.toShort()) { - throw RuntimeException("UniFFI API checksum mismatch: try cleaning and rebuilding your project") - } if (lib.uniffi_xmtpv3_checksum_method_ffiv2apiclient_batch_query() != 26551.toShort()) { throw RuntimeException("UniFFI API checksum mismatch: try cleaning and rebuilding your project") } @@ -3044,6 +3026,11 @@ public interface FfiGroupInterface { suspend fun `processStreamedGroupMessage`(`envelopeBytes`: kotlin.ByteArray): FfiMessage + /** + * Publish all unpublished messages + */ + suspend fun `publishMessages`() + suspend fun `removeAdmin`(`inboxId`: kotlin.String) suspend fun `removeMembers`(`accountAddresses`: List) @@ -3057,7 +3044,7 @@ public interface FfiGroupInterface { /** * send a message without immediately publishing to the delivery service. */ - fun `sendOptimistic`(`contentBytes`: kotlin.ByteArray): FfiUnpublishedMessage + fun `sendOptimistic`(`contentBytes`: kotlin.ByteArray): kotlin.ByteArray suspend fun `stream`(`messageCallback`: FfiMessageCallback): FfiStreamCloser @@ -3536,6 +3523,42 @@ open class FfiGroup : Disposable, AutoCloseable, FfiGroupInterface { } + /** + * Publish all unpublished messages + */ + @Throws(GenericException::class) + @Suppress("ASSIGNED_BUT_NEVER_ACCESSED_VARIABLE") + override suspend fun `publishMessages`() { + return uniffiRustCallAsync( + callWithPointer { thisPtr -> + UniffiLib.INSTANCE.uniffi_xmtpv3_fn_method_ffigroup_publish_messages( + thisPtr, + + ) + }, + { future, callback, continuation -> + UniffiLib.INSTANCE.ffi_xmtpv3_rust_future_poll_void( + future, + callback, + continuation + ) + }, + { future, continuation -> + UniffiLib.INSTANCE.ffi_xmtpv3_rust_future_complete_void( + future, + continuation + ) + }, + { future -> UniffiLib.INSTANCE.ffi_xmtpv3_rust_future_free_void(future) }, + // lift function + { Unit }, + + // Error FFI converter + GenericException.ErrorHandler, + ) + } + + @Throws(GenericException::class) @Suppress("ASSIGNED_BUT_NEVER_ACCESSED_VARIABLE") override suspend fun `removeAdmin`(`inboxId`: kotlin.String) { @@ -3704,8 +3727,8 @@ open class FfiGroup : Disposable, AutoCloseable, FfiGroupInterface { * send a message without immediately publishing to the delivery service. */ @Throws(GenericException::class) - override fun `sendOptimistic`(`contentBytes`: kotlin.ByteArray): FfiUnpublishedMessage { - return FfiConverterTypeFfiUnpublishedMessage.lift( + override fun `sendOptimistic`(`contentBytes`: kotlin.ByteArray): kotlin.ByteArray { + return FfiConverterByteArray.lift( callWithPointer { uniffiRustCallWithError(GenericException) { _status -> UniffiLib.INSTANCE.uniffi_xmtpv3_fn_method_ffigroup_send_optimistic( @@ -5229,270 +5252,6 @@ public object FfiConverterTypeFfiStreamCloser : FfiConverter callWithPointer(block: (ptr: Pointer) -> R): R { - // Check and increment the call counter, to keep the object alive. - // This needs a compare-and-set retry loop in case of concurrent updates. - do { - val c = this.callCounter.get() - if (c == 0L) { - throw IllegalStateException("${this.javaClass.simpleName} object has already been destroyed") - } - if (c == Long.MAX_VALUE) { - throw IllegalStateException("${this.javaClass.simpleName} call counter would overflow") - } - } while (!this.callCounter.compareAndSet(c, c + 1L)) - // Now we can safely do the method call without the pointer being freed concurrently. - try { - return block(this.uniffiClonePointer()) - } finally { - // This decrement always matches the increment we performed above. - if (this.callCounter.decrementAndGet() == 0L) { - cleanable.clean() - } - } - } - - // Use a static inner class instead of a closure so as not to accidentally - // capture `this` as part of the cleanable's action. - private class UniffiCleanAction(private val pointer: Pointer?) : Runnable { - override fun run() { - pointer?.let { ptr -> - uniffiRustCall { status -> - UniffiLib.INSTANCE.uniffi_xmtpv3_fn_free_ffiunpublishedmessage(ptr, status) - } - } - } - } - - fun uniffiClonePointer(): Pointer { - return uniffiRustCall() { status -> - UniffiLib.INSTANCE.uniffi_xmtpv3_fn_clone_ffiunpublishedmessage(pointer!!, status) - } - } - - override fun `id`(): kotlin.ByteArray { - return FfiConverterByteArray.lift( - callWithPointer { - uniffiRustCall() { _status -> - UniffiLib.INSTANCE.uniffi_xmtpv3_fn_method_ffiunpublishedmessage_id( - it, _status - ) - } - } - ) - } - - - @Throws(GenericException::class) - @Suppress("ASSIGNED_BUT_NEVER_ACCESSED_VARIABLE") - override suspend fun `publish`() { - return uniffiRustCallAsync( - callWithPointer { thisPtr -> - UniffiLib.INSTANCE.uniffi_xmtpv3_fn_method_ffiunpublishedmessage_publish( - thisPtr, - - ) - }, - { future, callback, continuation -> - UniffiLib.INSTANCE.ffi_xmtpv3_rust_future_poll_void( - future, - callback, - continuation - ) - }, - { future, continuation -> - UniffiLib.INSTANCE.ffi_xmtpv3_rust_future_complete_void( - future, - continuation - ) - }, - { future -> UniffiLib.INSTANCE.ffi_xmtpv3_rust_future_free_void(future) }, - // lift function - { Unit }, - - // Error FFI converter - GenericException.ErrorHandler, - ) - } - - - companion object - -} - -public object FfiConverterTypeFfiUnpublishedMessage : FfiConverter { - - override fun lower(value: FfiUnpublishedMessage): Pointer { - return value.uniffiClonePointer() - } - - override fun lift(value: Pointer): FfiUnpublishedMessage { - return FfiUnpublishedMessage(value) - } - - override fun read(buf: ByteBuffer): FfiUnpublishedMessage { - // The Rust code always writes pointers as 8 bytes, and will - // fail to compile if they don't fit. - return lift(Pointer(buf.getLong())) - } - - override fun allocationSize(value: FfiUnpublishedMessage) = 8UL - - override fun write(value: FfiUnpublishedMessage, buf: ByteBuffer) { - // The Rust code always expects pointers written as 8 bytes, - // and will fail to compile if they don't fit. - buf.putLong(Pointer.nativeValue(lower(value))) - } -} - - -// This template implements a class for working with a Rust struct via a Pointer/Arc -// to the live Rust struct on the other side of the FFI. -// -// Each instance implements core operations for working with the Rust `Arc` and the -// Kotlin Pointer to work with the live Rust struct on the other side of the FFI. -// -// There's some subtlety here, because we have to be careful not to operate on a Rust -// struct after it has been dropped, and because we must expose a public API for freeing -// theq Kotlin wrapper object in lieu of reliable finalizers. The core requirements are: -// -// * Each instance holds an opaque pointer to the underlying Rust struct. -// Method calls need to read this pointer from the object's state and pass it in to -// the Rust FFI. -// -// * When an instance is no longer needed, its pointer should be passed to a -// special destructor function provided by the Rust FFI, which will drop the -// underlying Rust struct. -// -// * Given an instance, calling code is expected to call the special -// `destroy` method in order to free it after use, either by calling it explicitly -// or by using a higher-level helper like the `use` method. Failing to do so risks -// leaking the underlying Rust struct. -// -// * We can't assume that calling code will do the right thing, and must be prepared -// to handle Kotlin method calls executing concurrently with or even after a call to -// `destroy`, and to handle multiple (possibly concurrent!) calls to `destroy`. -// -// * We must never allow Rust code to operate on the underlying Rust struct after -// the destructor has been called, and must never call the destructor more than once. -// Doing so may trigger memory unsafety. -// -// * To mitigate many of the risks of leaking memory and use-after-free unsafety, a `Cleaner` -// is implemented to call the destructor when the Kotlin object becomes unreachable. -// This is done in a background thread. This is not a panacea, and client code should be aware that -// 1. the thread may starve if some there are objects that have poorly performing -// `drop` methods or do significant work in their `drop` methods. -// 2. the thread is shared across the whole library. This can be tuned by using `android_cleaner = true`, -// or `android = true` in the [`kotlin` section of the `uniffi.toml` file](https://mozilla.github.io/uniffi-rs/kotlin/configuration.html). -// -// If we try to implement this with mutual exclusion on access to the pointer, there is the -// possibility of a race between a method call and a concurrent call to `destroy`: -// -// * Thread A starts a method call, reads the value of the pointer, but is interrupted -// before it can pass the pointer over the FFI to Rust. -// * Thread B calls `destroy` and frees the underlying Rust struct. -// * Thread A resumes, passing the already-read pointer value to Rust and triggering -// a use-after-free. -// -// One possible solution would be to use a `ReadWriteLock`, with each method call taking -// a read lock (and thus allowed to run concurrently) and the special `destroy` method -// taking a write lock (and thus blocking on live method calls). However, we aim not to -// generate methods with any hidden blocking semantics, and a `destroy` method that might -// block if called incorrectly seems to meet that bar. -// -// So, we achieve our goals by giving each instance an associated `AtomicLong` counter to track -// the number of in-flight method calls, and an `AtomicBoolean` flag to indicate whether `destroy` -// has been called. These are updated according to the following rules: -// -// * The initial value of the counter is 1, indicating a live object with no in-flight calls. -// The initial value for the flag is false. -// -// * At the start of each method call, we atomically check the counter. -// If it is 0 then the underlying Rust struct has already been destroyed and the call is aborted. -// If it is nonzero them we atomically increment it by 1 and proceed with the method call. -// -// * At the end of each method call, we atomically decrement and check the counter. -// If it has reached zero then we destroy the underlying Rust struct. -// -// * When `destroy` is called, we atomically flip the flag from false to true. -// If the flag was already true we silently fail. -// Otherwise we atomically decrement and check the counter. -// If it has reached zero then we destroy the underlying Rust struct. -// -// Astute readers may observe that this all sounds very similar to the way that Rust's `Arc` works, -// and indeed it is, with the addition of a flag to guard against multiple calls to `destroy`. -// -// The overall effect is that the underlying Rust struct is destroyed only when `destroy` has been -// called *and* all in-flight method calls have completed, avoiding violating any of the expectations -// of the underlying Rust code. -// -// This makes a cleaner a better alternative to _not_ calling `destroy()` as -// and when the object is finished with, but the abstraction is not perfect: if the Rust object's `drop` -// method is slow, and/or there are many objects to cleanup, and it's on a low end Android device, then the cleaner -// thread may be starved, and the app will leak memory. -// -// In this case, `destroy`ing manually may be a better solution. -// -// The cleaner can live side by side with the manual calling of `destroy`. In the order of responsiveness, uniffi objects -// with Rust peers are reclaimed: -// -// 1. By calling the `destroy` method of the object, which calls `rustObject.free()`. If that doesn't happen: -// 2. When the object becomes unreachable, AND the Cleaner thread gets to call `rustObject.free()`. If the thread is starved then: -// 3. The memory is reclaimed when the process terminates. -// -// [1] https://stackoverflow.com/questions/24376768/can-java-finalize-an-object-when-it-is-still-in-scope/24380219 -// - - public interface FfiV2ApiClientInterface { suspend fun `batchQuery`(`req`: FfiV2BatchQueryRequest): FfiV2BatchQueryResponse diff --git a/library/src/main/jniLibs/arm64-v8a/libuniffi_xmtpv3.so b/library/src/main/jniLibs/arm64-v8a/libuniffi_xmtpv3.so index bfa22e6ed..4c61f55f0 100755 Binary files a/library/src/main/jniLibs/arm64-v8a/libuniffi_xmtpv3.so and b/library/src/main/jniLibs/arm64-v8a/libuniffi_xmtpv3.so differ diff --git a/library/src/main/jniLibs/armeabi-v7a/libuniffi_xmtpv3.so b/library/src/main/jniLibs/armeabi-v7a/libuniffi_xmtpv3.so index 1eda384bf..7d1cfc519 100755 Binary files a/library/src/main/jniLibs/armeabi-v7a/libuniffi_xmtpv3.so and b/library/src/main/jniLibs/armeabi-v7a/libuniffi_xmtpv3.so differ diff --git a/library/src/main/jniLibs/x86/libuniffi_xmtpv3.so b/library/src/main/jniLibs/x86/libuniffi_xmtpv3.so index e93b8a59c..ccefda85a 100755 Binary files a/library/src/main/jniLibs/x86/libuniffi_xmtpv3.so and b/library/src/main/jniLibs/x86/libuniffi_xmtpv3.so differ diff --git a/library/src/main/jniLibs/x86_64/libuniffi_xmtpv3.so b/library/src/main/jniLibs/x86_64/libuniffi_xmtpv3.so index 31cfedb86..662f4f3f8 100755 Binary files a/library/src/main/jniLibs/x86_64/libuniffi_xmtpv3.so and b/library/src/main/jniLibs/x86_64/libuniffi_xmtpv3.so differ