Skip to content

Commit

Permalink
fix: app registering many config sync jobs [WPB-10234] (#2929) (#2946)
Browse files Browse the repository at this point in the history
* fix: queue only one sync jobs

* fix tests

* detekt

Co-authored-by: Mohamad Jaara <[email protected]>
Co-authored-by: Yamil Medina <[email protected]>
  • Loading branch information
3 people authored Aug 23, 2024
1 parent 49582cd commit 1588f3d
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import com.wire.kalium.logic.kaliumLogger
/**
* Use case to check if the CRL is expired and if so, register CRL and update conversation statuses if there is a change.
*/
interface RevocationListChecker {
internal interface RevocationListChecker {
suspend fun check(url: String): Either<CoreFailure, ULong?>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,32 @@ import com.wire.kalium.logic.data.e2ei.RevocationListChecker
import com.wire.kalium.logic.data.sync.IncrementalSyncRepository
import com.wire.kalium.logic.data.sync.IncrementalSyncStatus
import com.wire.kalium.logic.functional.map
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.datetime.Clock

/**
* This worker will wait until the sync is done and then check the CRLs if needed.
* This use case will wait until the sync is done and then check the CRLs if needed.
*
*/
interface CertificateRevocationListCheckWorker {
suspend fun execute()
}

/**
* Base implementation of [CertificateRevocationListCheckWorker].
* Base implementation of [SyncCertificateRevocationListUseCase].
* @param certificateRevocationListRepository The CRL repository.
* @param incrementalSyncRepository The incremental sync repository.
* @param revocationListChecker The check revocation list use case.
*
*/
internal class CertificateRevocationListCheckWorkerImpl(
class SyncCertificateRevocationListUseCase internal constructor(
private val certificateRevocationListRepository: CertificateRevocationListRepository,
private val incrementalSyncRepository: IncrementalSyncRepository,
private val revocationListChecker: RevocationListChecker,
kaliumLogger: KaliumLogger
) : CertificateRevocationListCheckWorker {
) {

private val logger = kaliumLogger.withTextTag("CertificateRevocationListCheckWorker")

override suspend fun execute() {
suspend operator fun invoke() {
logger.d("Starting to monitor")
incrementalSyncRepository.incrementalSyncState
.filter { it is IncrementalSyncStatus.Live }
.collect {
.first { it is IncrementalSyncStatus.Live }
.let {
logger.i("Checking certificate revocation list (CRL)..")
certificateRevocationListRepository.getCRLs()?.cRLWithExpirationList?.forEach { crl ->
if (crl.expiration < Clock.System.now().epochSeconds.toULong()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import com.wire.kalium.logger.KaliumLogger
import com.wire.kalium.logic.configuration.UserConfigRepository
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.feature.e2ei.MLSClientE2EIStatus
import com.wire.kalium.logic.feature.e2ei.MLSClientIdentity
import com.wire.kalium.logic.functional.isRight
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.functional.onSuccess

/**
* Use case to observe certificate revocation for self client.
Expand All @@ -44,9 +42,9 @@ internal class ObserveCertificateRevocationForSelfClientUseCaseImpl(

override suspend fun invoke() {
logger.d("Checking if should notify certificate revocation")
currentClientIdProvider().map { clientId ->
getE2eiCertificate(clientId).run {
if (isRight() && (value as MLSClientIdentity).e2eiStatus == MLSClientE2EIStatus.REVOKED) {
currentClientIdProvider().onSuccess { clientId ->
getE2eiCertificate(clientId).onSuccess {
if (it.e2eiStatus == MLSClientE2EIStatus.REVOKED) {
logger.i("Setting that should notify certificate revocation")
userConfigRepository.setShouldNotifyForRevokedCertificate(true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,10 @@
*/
package com.wire.kalium.logic.feature.featureConfig

import com.wire.kalium.logger.KaliumLogLevel
import com.wire.kalium.logger.KaliumLogger
import com.wire.kalium.logic.data.sync.IncrementalSyncRepository
import com.wire.kalium.logic.data.sync.IncrementalSyncStatus
import com.wire.kalium.logic.logStructuredJson
import kotlinx.coroutines.flow.filter
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
import kotlinx.coroutines.flow.first

/**
* Worker that periodically syncs feature flags.
Expand All @@ -39,43 +33,16 @@ internal class FeatureFlagSyncWorkerImpl(
private val incrementalSyncRepository: IncrementalSyncRepository,
private val syncFeatureConfigs: SyncFeatureConfigsUseCase,
kaliumLogger: KaliumLogger,
private val minIntervalBetweenPulls: Duration = MIN_INTERVAL_BETWEEN_PULLS,
private val clock: Clock = Clock.System
) : FeatureFlagsSyncWorker {

private var lastPullInstant: Instant? = null
private val logger = kaliumLogger.withTextTag("FeatureFlagSyncWorker")

override suspend fun execute() {
logger.d("Starting to monitor")
incrementalSyncRepository.incrementalSyncState.filter {
incrementalSyncRepository.incrementalSyncState.first {
it is IncrementalSyncStatus.Live
}.collect {
syncFeatureFlagsIfNeeded()
}
}

private suspend fun FeatureFlagSyncWorkerImpl.syncFeatureFlagsIfNeeded() {
val now = clock.now()
val wasLastPullRecent = lastPullInstant?.let { lastPull ->
lastPull + minIntervalBetweenPulls > now
} ?: false
logger.logStructuredJson(
level = KaliumLogLevel.INFO,
leadingMessage = "syncFeatureFlagsIfNeeded",
jsonStringKeyValues = mapOf(
"lastPullInstant" to lastPullInstant,
"wasLastPullRecent" to wasLastPullRecent
)
)
if (!wasLastPullRecent) {
logger.i("Synching feature configs and updating lastPullInstant")
}.let {
syncFeatureConfigs()
lastPullInstant = now
}
}

private companion object {
val MIN_INTERVAL_BETWEEN_PULLS = 60.minutes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ import com.wire.kalium.logic.feature.auth.ValidateUserHandleUseCaseImpl
import com.wire.kalium.logic.feature.client.FinalizeMLSClientAfterE2EIEnrollment
import com.wire.kalium.logic.feature.client.FinalizeMLSClientAfterE2EIEnrollmentImpl
import com.wire.kalium.logic.feature.conversation.GetAllContactsNotInConversationUseCase
import com.wire.kalium.logic.feature.e2ei.CertificateRevocationListCheckWorker
import com.wire.kalium.logic.feature.e2ei.CertificateRevocationListCheckWorkerImpl
import com.wire.kalium.logic.feature.e2ei.SyncCertificateRevocationListUseCase
import com.wire.kalium.logic.feature.e2ei.usecase.EnrollE2EIUseCase
import com.wire.kalium.logic.feature.e2ei.usecase.EnrollE2EIUseCaseImpl
import com.wire.kalium.logic.feature.e2ei.usecase.GetMLSClientIdentityUseCase
Expand Down Expand Up @@ -209,14 +208,13 @@ class UserScope internal constructor(
kaliumLogger = userScopedLogger,
)

val certificateRevocationListCheckWorker: CertificateRevocationListCheckWorker by lazy {
CertificateRevocationListCheckWorkerImpl(
val syncCertificateRevocationListUseCase: SyncCertificateRevocationListUseCase get() =
SyncCertificateRevocationListUseCase(
certificateRevocationListRepository = certificateRevocationListRepository,
incrementalSyncRepository = incrementalSyncRepository,
revocationListChecker = checkRevocationList,
kaliumLogger = userScopedLogger,
)
}

val featureFlagsSyncWorker: FeatureFlagsSyncWorker by lazy {
FeatureFlagSyncWorkerImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class CertificateRevocationListCheckWorkerTest {
.withCheckRevocationListResult()
.arrange()

checkCrlWorker.execute()
checkCrlWorker()

coVerify {
arrangement.certificateRevocationListRepository.getCRLs()
Expand All @@ -75,7 +75,7 @@ class CertificateRevocationListCheckWorkerTest {
@Mock
val checkRevocationList = mock(RevocationListChecker::class)

fun arrange() = this to CertificateRevocationListCheckWorkerImpl(
fun arrange() = this to SyncCertificateRevocationListUseCase(
certificateRevocationListRepository, incrementalSyncRepository, checkRevocationList, kaliumLogger
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,15 @@ class FeatureFlagSyncWorkerTest {

@Test
fun givenSyncIsLiveTwiceInAShortInterval_thenShouldCallFeatureConfigsUseCaseOnlyOnce() = runTest {
val minimumInterval = 5.minutes
val stateChannel = Channel<IncrementalSyncStatus>(capacity = Channel.UNLIMITED)

val (arrangement, featureFlagSyncWorker) = arrange {
minimumIntervalBetweenPulls = minimumInterval
withIncrementalSyncState(stateChannel.consumeAsFlow())
}
val job = launch {
featureFlagSyncWorker.execute()
}
stateChannel.send(IncrementalSyncStatus.Live)
stateChannel.send(IncrementalSyncStatus.Pending)
advanceUntilIdle()
stateChannel.send(IncrementalSyncStatus.Live)
advanceUntilIdle() // Not enough to run twice
coVerify {
Expand All @@ -82,59 +78,13 @@ class FeatureFlagSyncWorkerTest {
job.cancel()
}

@Test
fun givenSyncIsLiveAgainAfterMinInterval_thenShouldCallFeatureConfigsUseCaseTwice() = runTest {
val minInterval = 5.minutes
val now = Clock.System.now()
val stateTimes = mapOf(
now to IncrementalSyncStatus.Live,
now + minInterval + 1.milliseconds to IncrementalSyncStatus.Pending,
now + minInterval + 2.milliseconds to IncrementalSyncStatus.Live
)
val fakeClock = object : Clock {
var callCount = 0
override fun now(): Instant {
return stateTimes.keys.toList()[callCount].also { callCount++ }
}
}
val stateChannel = Channel<IncrementalSyncStatus>(capacity = Channel.UNLIMITED)
val (arrangement, featureFlagSyncWorker) = arrange {
minimumIntervalBetweenPulls = minInterval
withIncrementalSyncState(stateChannel.consumeAsFlow())
clock = fakeClock
}
stateChannel.send(stateTimes.values.toList()[0])
val job = launch {
featureFlagSyncWorker.execute()
}
advanceUntilIdle()

coVerify {
arrangement.syncFeatureConfigsUseCase.invoke()
}.wasInvoked(exactly = once)
stateChannel.send(stateTimes.values.toList()[1])
advanceUntilIdle()

stateChannel.send(stateTimes.values.toList()[2])
advanceUntilIdle()

coVerify {
arrangement.syncFeatureConfigsUseCase.invoke()
}.wasInvoked(exactly = once)
job.cancel()
}

private class Arrangement(
private val configure: Arrangement.() -> Unit
) : IncrementalSyncRepositoryArrangement by IncrementalSyncRepositoryArrangementImpl() {

@Mock
val syncFeatureConfigsUseCase: SyncFeatureConfigsUseCase = mock(SyncFeatureConfigsUseCase::class)

var minimumIntervalBetweenPulls: Duration = 1.minutes

var clock: Clock = Clock.System

suspend fun arrange(): Pair<Arrangement, FeatureFlagSyncWorkerImpl> = run {
coEvery {
syncFeatureConfigsUseCase.invoke()
Expand All @@ -143,8 +93,6 @@ class FeatureFlagSyncWorkerTest {
this@Arrangement to FeatureFlagSyncWorkerImpl(
incrementalSyncRepository = incrementalSyncRepository,
syncFeatureConfigs = syncFeatureConfigsUseCase,
minIntervalBetweenPulls = minimumIntervalBetweenPulls,
clock = clock,
kaliumLogger = kaliumLogger
)
}
Expand Down

0 comments on commit 1588f3d

Please sign in to comment.