Skip to content

Commit

Permalink
Merge branch 'release/candidate' into fix/persist-mls-conversation
Browse files Browse the repository at this point in the history
  • Loading branch information
Garzas committed Jan 2, 2025
2 parents 7d6bcd6 + 7f1da82 commit f9577a4
Show file tree
Hide file tree
Showing 15 changed files with 285 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.wire.kalium.logger.obfuscateDomain
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.NetworkFailure
import com.wire.kalium.logic.StorageFailure
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.conversation.MemberMapper
import com.wire.kalium.logic.data.conversation.Recipient
import com.wire.kalium.logic.data.conversation.mls.NameAndHandle
Expand Down Expand Up @@ -167,6 +168,7 @@ interface UserRepository {
suspend fun getNameAndHandle(userId: UserId): Either<StorageFailure, NameAndHandle>
suspend fun migrateUserToTeam(teamName: String): Either<CoreFailure, CreateUserTeam>
suspend fun updateTeamId(userId: UserId, teamId: TeamId): Either<StorageFailure, Unit>
suspend fun isClientMlsCapable(userId: UserId, clientId: ClientId): Either<StorageFailure, Boolean>
}

@Suppress("LongParameterList", "TooManyFunctions")
Expand Down Expand Up @@ -668,6 +670,10 @@ internal class UserDataSource internal constructor(
userDAO.updateTeamId(userId.toDao(), teamId.value)
}

override suspend fun isClientMlsCapable(userId: UserId, clientId: ClientId): Either<StorageFailure, Boolean> = wrapStorageRequest {
clientDAO.isMLSCapable(userId.toDao(), clientId.value)
}

companion object {
internal const val SELF_USER_ID_KEY = "selfUserID"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,12 @@ class ConversationScope internal constructor(
get() = ObserveIsSelfUserMemberUseCaseImpl(conversationRepository, selfUserId)

val observeConversationInteractionAvailabilityUseCase: ObserveConversationInteractionAvailabilityUseCase
get() = ObserveConversationInteractionAvailabilityUseCase(conversationRepository, userRepository)
get() = ObserveConversationInteractionAvailabilityUseCase(
conversationRepository,
selfUserId = selfUserId,
selfClientIdProvider = currentClientIdProvider,
userRepository = userRepository
)

val deleteTeamConversation: DeleteTeamConversationUseCase
get() = DeleteTeamConversationUseCaseImpl(selfTeamIdProvider, teamRepository, conversationRepository)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ import kotlinx.coroutines.flow.first
/**
* Operation that creates one-to-one Conversation with specific [UserId] (only if it is absent in local DB)
* and returns [Conversation] data.
*
* @param otherUserId [UserId] private conversation with which we are interested in.
* @return Result with [Conversation] in case of success, or [CoreFailure] if something went wrong:
* can't get data from local DB, or can't create a conversation.
*/
interface GetOrCreateOneToOneConversationUseCase {
suspend operator fun invoke(otherUserId: UserId): CreateConversationResult
Expand All @@ -47,6 +43,14 @@ internal class GetOrCreateOneToOneConversationUseCaseImpl(
private val userRepository: UserRepository,
private val oneOnOneResolver: OneOnOneResolver
) : GetOrCreateOneToOneConversationUseCase {

/**
* The use case operation operation params and return type.
*
* @param otherUserId [UserId] private conversation with which we are interested in.
* @return Result with [Conversation] in case of success, or [CoreFailure] if something went wrong:
* can't get data from local DB, or can't create a conversation.
*/
override suspend operator fun invoke(otherUserId: UserId): CreateConversationResult {
// TODO periodically re-resolve one-on-one
return conversationRepository.observeOneToOneConversationWithOtherUser(otherUserId)
Expand All @@ -66,6 +70,18 @@ internal class GetOrCreateOneToOneConversationUseCaseImpl(
})
}

/**
* Resolves one-on-one conversation with the user.
* Resolving conversations is the process of:
*
* - Intersecting the supported protocols of the self user and the other user.
* - Selecting the common protocol, based on the team settings with the highest priority.
* - Get or create a conversation with the other user.
* - If the protocol now is MLS, migrate the existing Proteus conversation to MLS.
* - Mark the conversation as active.
*
* If no common protocol is found, and we have existing Proteus conversations, we do best effort to use them as fallback.
*/
private suspend fun resolveOneOnOneConversationWithUser(otherUserId: UserId): Either<CoreFailure, Conversation> =
userRepository.userById(otherUserId).flatMap { otherUser ->
// TODO support lazily establishing mls group for team 1-1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.conversation.InteractionAvailability
import com.wire.kalium.logic.data.conversation.interactionAvailability
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.user.SelfUser
import com.wire.kalium.logic.data.user.SupportedProtocol
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.getOrElse
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.util.KaliumDispatcher
import com.wire.kalium.util.KaliumDispatcherImpl
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.withContext

Expand All @@ -48,6 +51,8 @@ import kotlinx.coroutines.withContext
class ObserveConversationInteractionAvailabilityUseCase internal constructor(
private val conversationRepository: ConversationRepository,
private val userRepository: UserRepository,
private val selfUserId: UserId,
private val selfClientIdProvider: CurrentClientIdProvider,
private val dispatcher: KaliumDispatcher = KaliumDispatcherImpl,
) {

Expand All @@ -56,13 +61,21 @@ class ObserveConversationInteractionAvailabilityUseCase internal constructor(
* @return an [IsInteractionAvailableResult] containing Success or Failure cases
*/
suspend operator fun invoke(conversationId: ConversationId): Flow<IsInteractionAvailableResult> = withContext(dispatcher.io) {
conversationRepository.observeConversationDetailsById(conversationId).combine(
userRepository.observeSelfUser()
) { conversation, selfUser ->
conversation to selfUser
}.map { (eitherConversation, selfUser) ->

val isSelfClientMlsCapable = selfClientIdProvider().flatMap {
userRepository.isClientMlsCapable(selfUserId, it)
}.getOrElse {
return@withContext flow { IsInteractionAvailableResult.Failure(it) }
}

kaliumLogger.withTextTag("ObserveConversationInteractionAvailabilityUseCase").d("isSelfClientMlsCapable $isSelfClientMlsCapable")

conversationRepository.observeConversationDetailsById(conversationId).map { eitherConversation ->
eitherConversation.fold({ failure -> IsInteractionAvailableResult.Failure(failure) }, { conversationDetails ->
val isProtocolSupported = doesUserSupportConversationProtocol(conversationDetails, selfUser)
val isProtocolSupported = doesUserSupportConversationProtocol(
conversationDetails = conversationDetails,
isSelfClientMlsCapable = isSelfClientMlsCapable
)
if (!isProtocolSupported) { // short-circuit to Unsupported Protocol if it's the case
return@fold IsInteractionAvailableResult.Success(InteractionAvailability.UNSUPPORTED_PROTOCOL)
}
Expand All @@ -74,19 +87,12 @@ class ObserveConversationInteractionAvailabilityUseCase internal constructor(

private fun doesUserSupportConversationProtocol(
conversationDetails: ConversationDetails,
selfUser: SelfUser
): Boolean {
val protocolInfo = conversationDetails.conversation.protocol
val acceptableProtocols = when (protocolInfo) {
is Conversation.ProtocolInfo.MLS -> setOf(SupportedProtocol.MLS)
// Messages in mixed conversations are sent through Proteus
is Conversation.ProtocolInfo.Mixed -> setOf(SupportedProtocol.PROTEUS)
Conversation.ProtocolInfo.Proteus -> setOf(SupportedProtocol.PROTEUS)
}
val isProtocolSupported = selfUser.supportedProtocols?.any { supported ->
acceptableProtocols.contains(supported)
} ?: false
return isProtocolSupported
isSelfClientMlsCapable: Boolean
): Boolean = when (conversationDetails.conversation.protocol) {
is Conversation.ProtocolInfo.MLS -> isSelfClientMlsCapable
// Messages in mixed conversations are sent through Proteus
is Conversation.ProtocolInfo.Mixed,
Conversation.ProtocolInfo.Proteus -> true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,19 @@ import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger

interface OneOnOneMigrator {
/**
* Migrates the user's one-on-one Proteus. Without creating a new one since MLS is the default, marking it as active.
*/
suspend fun migrateExistingProteus(user: OtherUser): Either<CoreFailure, ConversationId>

/**
* Get one-on-one conversation with the user, if not found, create a new one (Proteus still default) and mark it as active.
*/
suspend fun migrateToProteus(user: OtherUser): Either<CoreFailure, ConversationId>

/**
* Perform migration of Proteus to MLS keeping history and marking the new conversation as active.
*/
suspend fun migrateToMLS(user: OtherUser): Either<CoreFailure, ConversationId>
}

Expand Down Expand Up @@ -100,19 +112,35 @@ internal class OneOnOneMigratorImpl(
}
}

override suspend fun migrateExistingProteus(user: OtherUser): Either<CoreFailure, ConversationId> =
conversationRepository.getOneOnOneConversationsWithOtherUser(user.id, Conversation.Protocol.PROTEUS).flatMap { conversationIds ->
if (conversationIds.isNotEmpty()) {
val conversationId = conversationIds.first()
Either.Right(conversationId)
} else {
Either.Left(StorageFailure.DataNotFound)
}
}.flatMap { conversationId ->
if (user.activeOneOnOneConversationId != conversationId) {
kaliumLogger.d("resolved existing one-on-one to proteus, user = ${user.id.toLogString()}")
userRepository.updateActiveOneOnOneConversation(user.id, conversationId)
}
Either.Right(conversationId)
}

private suspend fun migrateOneOnOneHistory(user: OtherUser, targetConversation: ConversationId): Either<CoreFailure, Unit> {
return conversationRepository.getOneOnOneConversationsWithOtherUser(
otherUserId = user.id,
protocol = Conversation.Protocol.PROTEUS
).flatMap { proteusOneOnOneConversations ->
// We can theoretically have more than one proteus 1-1 conversation with
// team members since there was no backend safeguards against this
proteusOneOnOneConversations.foldToEitherWhileRight(Unit) { proteusOneOnOneConversation, _ ->
messageRepository.moveMessagesToAnotherConversation(
originalConversation = proteusOneOnOneConversation,
targetConversation = targetConversation
)
}
return conversationRepository.getOneOnOneConversationsWithOtherUser(
otherUserId = user.id,
protocol = Conversation.Protocol.PROTEUS
).flatMap { proteusOneOnOneConversations ->
// We can theoretically have more than one proteus 1-1 conversation with
// team members since there was no backend safeguards against this
proteusOneOnOneConversations.foldToEitherWhileRight(Unit) { proteusOneOnOneConversation, _ ->
messageRepository.moveMessagesToAnotherConversation(
originalConversation = proteusOneOnOneConversation,
targetConversation = targetConversation
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import com.wire.kalium.logic.feature.protocol.OneOnOneProtocolSelector
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.flatMapLeft
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import com.wire.kalium.logic.functional.left
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.util.KaliumDispatcher
Expand Down Expand Up @@ -154,11 +156,18 @@ internal class OneOnOneResolverImpl(
if (invalidateCurrentKnownProtocols) {
userRepository.fetchUsersByIds(setOf(user.id))
}
return oneOnOneProtocolSelector.getProtocolForUser(user.id).flatMap { supportedProtocol ->
return oneOnOneProtocolSelector.getProtocolForUser(user.id).fold({ coreFailure ->
if (coreFailure is CoreFailure.NoCommonProtocolFound.OtherNeedToUpdate) {
kaliumLogger.i("Resolving existing proteus 1:1 as not matching protocol found with: ${user.id.toLogString()}")
oneOnOneMigrator.migrateExistingProteus(user)
} else {
coreFailure.left()
}
}, { supportedProtocol ->
when (supportedProtocol) {
SupportedProtocol.PROTEUS -> oneOnOneMigrator.migrateToProteus(user)
SupportedProtocol.MLS -> oneOnOneMigrator.migrateToMLS(user)
}
}
})
}
}
Loading

0 comments on commit f9577a4

Please sign in to comment.