diff --git a/android/src/main/java/com/amplitude/android/Amplitude.kt b/android/src/main/java/com/amplitude/android/Amplitude.kt index fdddba9c..63ccc3b8 100644 --- a/android/src/main/java/com/amplitude/android/Amplitude.kt +++ b/android/src/main/java/com/amplitude/android/Amplitude.kt @@ -4,7 +4,6 @@ import android.content.Context import com.amplitude.android.plugins.AndroidContextPlugin import com.amplitude.android.plugins.AndroidLifecyclePlugin import com.amplitude.core.Amplitude -import com.amplitude.core.Storage import com.amplitude.core.events.BaseEvent import com.amplitude.core.platform.plugins.AmplitudeDestination import com.amplitude.core.platform.plugins.GetAmpliExtrasPlugin @@ -23,22 +22,23 @@ open class Amplitude( ) : Amplitude(configuration) { internal var inForeground = false - var sessionId: Long - private set - internal var lastEventId: Long - var lastEventTime: Long private lateinit var androidContextPlugin: AndroidContextPlugin init { - storage = configuration.storageProvider.getStorage(this) + (timeline as Timeline).start() + registerShutdownHook() + } - this.sessionId = storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLong() ?: -1 - this.lastEventId = storage.read(Storage.Constants.LAST_EVENT_ID)?.toLong() ?: 0 - this.lastEventTime = storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLong() ?: -1 + override fun createTimeline(): Timeline { + return Timeline().also { it.amplitude = this } } override fun build(): Deferred { + val client = this + val built = amplitudeScope.async(amplitudeDispatcher) { + storage = configuration.storageProvider.getStorage(client) + val storageDirectory = (configuration as Configuration).context.getDir("${FileStorage.STORAGE_PREFIX}-${configuration.instanceName}", Context.MODE_PRIVATE) idContainer = IdentityContainer.getInstance( IdentityConfiguration( @@ -79,59 +79,19 @@ open class Amplitude( return this } - override fun processEvent(event: BaseEvent): Iterable? { - val eventTimestamp = event.timestamp ?: System.currentTimeMillis() - event.timestamp = eventTimestamp - var sessionEvents: Iterable? = null - - if (!(event.eventType == START_SESSION_EVENT || event.eventType == END_SESSION_EVENT)) { - if (!inForeground) { - sessionEvents = startNewSessionIfNeeded(eventTimestamp) - } else { - refreshSessionTime(eventTimestamp) - } - } - - if (event.sessionId < 0) { - event.sessionId = sessionId - } - - val savedLastEventId = lastEventId - - sessionEvents ?. let { - it.forEach { e -> - e.eventId ?: let { - val newEventId = lastEventId + 1 - e.eventId = newEventId - lastEventId = newEventId - } - } - } - - event.eventId ?: let { - val newEventId = lastEventId + 1 - event.eventId = newEventId - lastEventId = newEventId - } - - if (lastEventId > savedLastEventId) { - amplitudeScope.launch(amplitudeDispatcher) { - storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString()) - } - } - - return sessionEvents - } - fun onEnterForeground(timestamp: Long) { - startNewSessionIfNeeded(timestamp) ?. let { - it.forEach { event -> process(event) } - } inForeground = true + + val dummySessionStartEvent = BaseEvent() + dummySessionStartEvent.eventType = START_SESSION_EVENT + dummySessionStartEvent.timestamp = timestamp + dummySessionStartEvent.sessionId = -1 + timeline.process(dummySessionStartEvent) } fun onExitForeground() { inForeground = false + amplitudeScope.launch(amplitudeDispatcher) { isBuilt.await() if ((configuration as Configuration).flushEventsOnClose) { @@ -140,70 +100,12 @@ open class Amplitude( } } - fun startNewSessionIfNeeded(timestamp: Long): Iterable? { - if (inSession()) { - - if (isWithinMinTimeBetweenSessions(timestamp)) { - refreshSessionTime(timestamp) - return null + private fun registerShutdownHook() { + Runtime.getRuntime().addShutdownHook(object : Thread() { + override fun run() { + (this@Amplitude.timeline as Timeline).stop() } - - return startNewSession(timestamp) - } - - return startNewSession(timestamp) - } - - private fun setSessionId(timestamp: Long) { - sessionId = timestamp - amplitudeScope.launch(amplitudeDispatcher) { - storage.write(Storage.Constants.PREVIOUS_SESSION_ID, sessionId.toString()) - } - } - - private fun startNewSession(timestamp: Long): Iterable { - val sessionEvents = mutableListOf() - - // end previous session - if ((configuration as Configuration).trackingSessionEvents && inSession()) { - val sessionEndEvent = BaseEvent() - sessionEndEvent.eventType = END_SESSION_EVENT - sessionEndEvent.timestamp = if (lastEventTime > 0) lastEventTime else null - sessionEndEvent.sessionId = sessionId - sessionEvents.add(sessionEndEvent) - } - - // start new session - setSessionId(timestamp) - refreshSessionTime(timestamp) - if (configuration.trackingSessionEvents) { - val sessionStartEvent = BaseEvent() - sessionStartEvent.eventType = START_SESSION_EVENT - sessionStartEvent.timestamp = timestamp - sessionStartEvent.sessionId = sessionId - sessionEvents.add(sessionStartEvent) - } - - return sessionEvents - } - - fun refreshSessionTime(timestamp: Long) { - if (!inSession()) { - return - } - lastEventTime = timestamp - amplitudeScope.launch(amplitudeDispatcher) { - storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString()) - } - } - - private fun isWithinMinTimeBetweenSessions(timestamp: Long): Boolean { - val sessionLimit: Long = (configuration as Configuration).minTimeBetweenSessionsMillis - return timestamp - lastEventTime < sessionLimit - } - - private fun inSession(): Boolean { - return sessionId >= 0 + }) } companion object { diff --git a/android/src/main/java/com/amplitude/android/Timeline.kt b/android/src/main/java/com/amplitude/android/Timeline.kt new file mode 100644 index 00000000..4c44d142 --- /dev/null +++ b/android/src/main/java/com/amplitude/android/Timeline.kt @@ -0,0 +1,166 @@ +package com.amplitude.android + +import com.amplitude.core.Storage +import com.amplitude.core.events.BaseEvent +import com.amplitude.core.platform.Timeline +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch + +class Timeline : Timeline() { + private val eventMessageChannel: Channel = Channel(Channel.UNLIMITED) + var sessionId: Long = -1 + private set + internal var lastEventId: Long = 0 + var lastEventTime: Long = -1 + + internal fun start() { + amplitude.amplitudeScope.launch(amplitude.storageIODispatcher) { + amplitude.isBuilt.await() + + sessionId = amplitude.storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLong() ?: -1 + lastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLong() ?: 0 + lastEventTime = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLong() ?: -1 + + for (message in eventMessageChannel) { + processEventMessage(message) + } + } + } + + internal fun stop() { + this.eventMessageChannel.cancel() + } + + override fun process(incomingEvent: BaseEvent) { + if (incomingEvent.timestamp == null) { + incomingEvent.timestamp = System.currentTimeMillis() + } + + eventMessageChannel.trySend(EventQueueMessage(incomingEvent, (amplitude as Amplitude).inForeground)) + } + + private suspend fun processEventMessage(message: EventQueueMessage) { + val event = message.event + var sessionEvents: Iterable? = null + val eventTimestamp = event.timestamp!! + var skipEvent = false + + if (event.eventType == Amplitude.START_SESSION_EVENT) { + if (event.sessionId < 0) { // dummy start_session event + skipEvent = true + sessionEvents = startNewSessionIfNeeded(eventTimestamp) + } else { + setSessionId(event.sessionId) + refreshSessionTime(eventTimestamp) + } + } else if (event.eventType == Amplitude.END_SESSION_EVENT) { + // do nothing + } else { + if (!message.inForeground) { + sessionEvents = startNewSessionIfNeeded(eventTimestamp) + } else { + refreshSessionTime(eventTimestamp) + } + } + + if (!skipEvent && event.sessionId < 0) { + event.sessionId = sessionId + } + + val savedLastEventId = lastEventId + + sessionEvents ?. let { + it.forEach { e -> + e.eventId ?: let { + val newEventId = lastEventId + 1 + e.eventId = newEventId + lastEventId = newEventId + } + } + } + + if (!skipEvent) { + event.eventId ?: let { + val newEventId = lastEventId + 1 + event.eventId = newEventId + lastEventId = newEventId + } + } + + if (lastEventId > savedLastEventId) { + amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString()) + } + + sessionEvents ?. let { + it.forEach { e -> + super.process(e) + } + } + + if (!skipEvent) { + super.process(event) + } + } + + private suspend fun startNewSessionIfNeeded(timestamp: Long): Iterable? { + if (inSession() && isWithinMinTimeBetweenSessions(timestamp)) { + refreshSessionTime(timestamp) + return null + } + return startNewSession(timestamp) + } + + private suspend fun setSessionId(timestamp: Long) { + sessionId = timestamp + amplitude.storage.write(Storage.Constants.PREVIOUS_SESSION_ID, sessionId.toString()) + } + + private suspend fun startNewSession(timestamp: Long): Iterable { + val sessionEvents = mutableListOf() + val trackingSessionEvents = (amplitude.configuration as Configuration).trackingSessionEvents + + // end previous session + if (trackingSessionEvents && inSession()) { + val sessionEndEvent = BaseEvent() + sessionEndEvent.eventType = Amplitude.END_SESSION_EVENT + sessionEndEvent.timestamp = if (lastEventTime > 0) lastEventTime else null + sessionEndEvent.sessionId = sessionId + sessionEvents.add(sessionEndEvent) + } + + // start new session + setSessionId(timestamp) + refreshSessionTime(timestamp) + if (trackingSessionEvents) { + val sessionStartEvent = BaseEvent() + sessionStartEvent.eventType = Amplitude.START_SESSION_EVENT + sessionStartEvent.timestamp = timestamp + sessionStartEvent.sessionId = sessionId + sessionEvents.add(sessionStartEvent) + } + + return sessionEvents + } + + private suspend fun refreshSessionTime(timestamp: Long) { + if (!inSession()) { + return + } + lastEventTime = timestamp + amplitude.storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString()) + } + + private fun isWithinMinTimeBetweenSessions(timestamp: Long): Boolean { + val sessionLimit: Long = (amplitude.configuration as Configuration).minTimeBetweenSessionsMillis + return timestamp - lastEventTime < sessionLimit + } + + private fun inSession(): Boolean { + return sessionId >= 0 + } +} + +data class EventQueueMessage( + val event: BaseEvent, + val inForeground: Boolean +) diff --git a/android/src/test/java/com/amplitude/android/AmplitudeTest.kt b/android/src/test/java/com/amplitude/android/AmplitudeTest.kt index 8e6acadd..986c08e7 100644 --- a/android/src/test/java/com/amplitude/android/AmplitudeTest.kt +++ b/android/src/test/java/com/amplitude/android/AmplitudeTest.kt @@ -119,6 +119,8 @@ class AmplitudeTest { event.eventType = "test event" amplitude?.track(event) advanceUntilIdle() + Thread.sleep(100) + val track = slot() verify { mockedPlugin.track(capture(track)) } track.captured.let { @@ -140,6 +142,8 @@ class AmplitudeTest { event.ip = "127.0.0.1" amplitude?.track(event) advanceUntilIdle() + Thread.sleep(100) + val track = slot() verify { mockedPlugin.track(capture(track)) } track.captured.let { @@ -158,7 +162,7 @@ class AmplitudeTest { val mockedPlugin = spyk(StubPlugin()) amplitude.add(mockedPlugin) - amplitude.isBuilt!!.await() + amplitude.isBuilt.await() val event1 = BaseEvent() event1.eventType = "test event 1" @@ -317,45 +321,54 @@ class AmplitudeTest { val amplitude1 = Amplitude(createConfiguration(100, InstanceStorageProvider(storage))) amplitude1.isBuilt.await() + val timeline1 = amplitude1.timeline as Timeline amplitude1.onEnterForeground(1000) + advanceUntilIdle() + Thread.sleep(100) - Assertions.assertEquals(1000L, amplitude1.sessionId) - Assertions.assertEquals(1L, amplitude1.lastEventId) - Assertions.assertEquals(1000L, amplitude1.lastEventTime) + Assertions.assertEquals(1000L, timeline1.sessionId) + Assertions.assertEquals(1L, timeline1.lastEventId) + Assertions.assertEquals(1000L, timeline1.lastEventTime) val event1 = BaseEvent() event1.eventType = "test event 1" event1.timestamp = 1200 amplitude1.track(event1) - - Assertions.assertEquals(1000L, amplitude1.sessionId) - Assertions.assertEquals(2L, amplitude1.lastEventId) - Assertions.assertEquals(1200L, amplitude1.lastEventTime) - advanceUntilIdle() + Thread.sleep(100) + + Assertions.assertEquals(1000L, timeline1.sessionId) + Assertions.assertEquals(2L, timeline1.lastEventId) + Assertions.assertEquals(1200L, timeline1.lastEventTime) val amplitude2 = Amplitude(createConfiguration(100, InstanceStorageProvider(storage))) amplitude2.isBuilt.await() - - Assertions.assertEquals(1000L, amplitude2.sessionId) - Assertions.assertEquals(2L, amplitude2.lastEventId) - Assertions.assertEquals(1200L, amplitude2.lastEventTime) - + val timeline2 = amplitude2.timeline as Timeline advanceUntilIdle() + Thread.sleep(100) + + Assertions.assertEquals(1000L, timeline2.sessionId) + Assertions.assertEquals(2L, timeline2.lastEventId) + Assertions.assertEquals(1200L, timeline2.lastEventTime) val amplitude3 = Amplitude(createConfiguration(100, InstanceStorageProvider(storage))) amplitude3.isBuilt.await() + val timeline3 = amplitude3.timeline as Timeline + advanceUntilIdle() + Thread.sleep(100) - Assertions.assertEquals(1000L, amplitude3.sessionId) - Assertions.assertEquals(2L, amplitude3.lastEventId) - Assertions.assertEquals(1200L, amplitude3.lastEventTime) + Assertions.assertEquals(1000L, timeline3.sessionId) + Assertions.assertEquals(2L, timeline3.lastEventId) + Assertions.assertEquals(1200L, timeline3.lastEventTime) amplitude3.onEnterForeground(1400) + advanceUntilIdle() + Thread.sleep(100) - Assertions.assertEquals(1400L, amplitude3.sessionId) - Assertions.assertEquals(4L, amplitude3.lastEventId) - Assertions.assertEquals(1400L, amplitude3.lastEventTime) + Assertions.assertEquals(1400L, timeline3.sessionId) + Assertions.assertEquals(4L, timeline3.lastEventId) + Assertions.assertEquals(1400L, timeline3.lastEventTime) } } diff --git a/core/src/main/java/com/amplitude/core/Amplitude.kt b/core/src/main/java/com/amplitude/core/Amplitude.kt index ca042825..e7e16f0c 100644 --- a/core/src/main/java/com/amplitude/core/Amplitude.kt +++ b/core/src/main/java/com/amplitude/core/Amplitude.kt @@ -44,10 +44,10 @@ open class Amplitude internal constructor( val amplitudeScope: CoroutineScope = CoroutineScope(SupervisorJob()), val amplitudeDispatcher: CoroutineDispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher(), val networkIODispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher(), - val storageIODispatcher: CoroutineDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher(), + val storageIODispatcher: CoroutineDispatcher = Executors.newFixedThreadPool(3).asCoroutineDispatcher(), val retryDispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() ) { - internal val timeline: Timeline + val timeline: Timeline lateinit var storage: Storage val logger: Logger protected lateinit var idContainer: IdentityContainer @@ -55,7 +55,7 @@ open class Amplitude internal constructor( init { require(configuration.isValid()) { "invalid configuration" } - timeline = Timeline().also { it.amplitude = this } + timeline = this.createTimeline() logger = configuration.loggerProvider.getLogger(this) isBuilt = build() } @@ -65,6 +65,10 @@ open class Amplitude internal constructor( */ constructor(configuration: Configuration) : this(configuration, State()) + open fun createTimeline(): Timeline { + return Timeline().also { it.amplitude = this } + } + open fun build(): Deferred { storage = configuration.storageProvider.getStorage(this) idContainer = IdentityContainer.getInstance(IdentityConfiguration(instanceName = configuration.instanceName, apiKey = configuration.apiKey, identityStorageProvider = IMIdentityStorageProvider())) @@ -315,30 +319,17 @@ open class Amplitude internal constructor( return this } - protected fun process(event: BaseEvent) { + private fun process(event: BaseEvent) { if (configuration.optOut) { logger.info("Skip event for opt out config.") return } - val beforeEvents = processEvent(event) - beforeEvents ?. let { - it.forEach { e -> - amplitudeScope.launch(amplitudeDispatcher) { - isBuilt.await() - timeline.process(e) - } - } - } - - amplitudeScope.launch(amplitudeDispatcher) { - isBuilt.await() - timeline.process(event) + if (event.timestamp == null) { + event.timestamp = System.currentTimeMillis() } - } - protected open fun processEvent(event: BaseEvent): Iterable? { - return null + timeline.process(event) } /** diff --git a/core/src/main/java/com/amplitude/core/platform/Timeline.kt b/core/src/main/java/com/amplitude/core/platform/Timeline.kt index fdb5368f..7b913a74 100644 --- a/core/src/main/java/com/amplitude/core/platform/Timeline.kt +++ b/core/src/main/java/com/amplitude/core/platform/Timeline.kt @@ -3,7 +3,7 @@ package com.amplitude.core.platform import com.amplitude.core.Amplitude import com.amplitude.core.events.BaseEvent -internal class Timeline { +open class Timeline { internal val plugins: Map = mapOf( Plugin.Type.Before to Mediator(mutableListOf()), Plugin.Type.Enrichment to Mediator(mutableListOf()), @@ -12,13 +12,11 @@ internal class Timeline { ) lateinit var amplitude: Amplitude - fun process(incomingEvent: BaseEvent): BaseEvent? { + open fun process(incomingEvent: BaseEvent) { val beforeResult = applyPlugins(Plugin.Type.Before, incomingEvent) val enrichmentResult = applyPlugins(Plugin.Type.Enrichment, beforeResult) applyPlugins(Plugin.Type.Destination, enrichmentResult) - - return enrichmentResult } fun add(plugin: Plugin) { @@ -33,7 +31,7 @@ internal class Timeline { return result } - fun applyPlugins(mediator: Mediator?, event: BaseEvent?): BaseEvent? { + private fun applyPlugins(mediator: Mediator?, event: BaseEvent?): BaseEvent? { var result: BaseEvent? = event result?.let { e -> result = mediator?.execute(e)