Skip to content

Commit

Permalink
fix: start tracking sessions at init for session replay, support addi…
Browse files Browse the repository at this point in the history
…ng plugins to initial config, centralize time to use SystemTime for easier testing
  • Loading branch information
justin-fiedler committed Mar 19, 2024
1 parent bb3003c commit 307e196
Show file tree
Hide file tree
Showing 18 changed files with 467 additions and 204 deletions.
31 changes: 28 additions & 3 deletions android/src/main/java/com/amplitude/android/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,36 @@ import com.amplitude.android.plugins.AnalyticsConnectorPlugin
import com.amplitude.android.plugins.AndroidContextPlugin
import com.amplitude.android.plugins.AndroidLifecyclePlugin
import com.amplitude.android.plugins.AndroidNetworkConnectivityCheckerPlugin
import com.amplitude.android.utilities.Session
import com.amplitude.android.utilities.SystemTime
import com.amplitude.core.Amplitude
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.plugins.AmplitudeDestination
import com.amplitude.core.platform.plugins.GetAmpliExtrasPlugin
import com.amplitude.core.utilities.FileStorage
import com.amplitude.id.IdentityConfiguration
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

open class Amplitude(
configuration: Configuration
) : Amplitude(configuration) {

internal var inForeground = false
private lateinit var androidContextPlugin: AndroidContextPlugin
internal val session: Session = Session(configuration)

val sessionId: Long
get() {
return (timeline as Timeline).sessionId
return session.sessionId
}

init {
registerShutdownHook()
}

override fun createTimeline(): Timeline {
return Timeline(configuration.sessionId).also { it.amplitude = this }
return Timeline(logger = logger).also { it.amplitude = this }
}

override fun createIdentityConfiguration(): IdentityConfiguration {
Expand All @@ -50,11 +54,19 @@ open class Amplitude(
}

override suspend fun buildInternal(identityConfiguration: IdentityConfiguration) {
session.configure(configuration as Configuration, storage, store, logger)
logger.debug("Configured session. Session=$session")

ApiKeyStorageMigration(this).execute()

if ((this.configuration as Configuration).migrateLegacyData) {
RemnantDataMigration(this).execute()
}

// WARNING: Session events need to run after migrations as not to modify `lastEventTime`
// Check if we need to start a new session
val sessionEvents = session.startNewSessionIfNeeded(SystemTime.getCurrentTimeMillis())

this.createIdentityContainer(identityConfiguration)

if (this.configuration.offline != AndroidNetworkConnectivityCheckerPlugin.Disabled) {
Expand All @@ -72,8 +84,21 @@ open class Amplitude(
add(AnalyticsConnectorIdentityPlugin())
add(AnalyticsConnectorPlugin())
add(AmplitudeDestination())
val plugins = configuration.plugins
if (plugins != null) {
for (plugin in plugins) {
add(plugin)
}
}

val androidTimeline = timeline as Timeline
androidTimeline.start(session)

(timeline as Timeline).start()
runBlocking {
sessionEvents?.forEach {
androidTimeline.processImmediately(it)
}
}
}

/**
Expand Down
3 changes: 3 additions & 0 deletions android/src/main/java/com/amplitude/android/Configuration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.amplitude.core.ServerZone
import com.amplitude.core.StorageProvider
import com.amplitude.core.events.IngestionMetadata
import com.amplitude.core.events.Plan
import com.amplitude.core.platform.Plugin
import com.amplitude.id.FileIdentityStorageProvider
import com.amplitude.id.IdentityStorageProvider

Expand Down Expand Up @@ -49,6 +50,7 @@ open class Configuration @JvmOverloads constructor(
override var offline: Boolean? = false,
override var deviceId: String? = null,
override var sessionId: Long? = null,
override var plugins: List<Plugin>? = null,
) : Configuration(
apiKey,
flushQueueSize,
Expand All @@ -72,6 +74,7 @@ open class Configuration @JvmOverloads constructor(
offline,
deviceId,
sessionId,
plugins
) {
companion object {
const val MIN_TIME_BETWEEN_SESSIONS_MILLIS: Long = 300000
Expand Down
126 changes: 30 additions & 96 deletions android/src/main/java/com/amplitude/android/Timeline.kt
Original file line number Diff line number Diff line change
@@ -1,40 +1,26 @@
package com.amplitude.android

import com.amplitude.android.utilities.Session
import com.amplitude.android.utilities.SystemTime
import com.amplitude.common.Logger
import com.amplitude.core.Storage
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.Timeline
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import java.util.concurrent.atomic.AtomicLong

class Timeline(
private val initialSessionId: Long? = null,
private val logger: Logger? = null,
) : Timeline() {
private val eventMessageChannel: Channel<EventQueueMessage> = Channel(Channel.UNLIMITED)
private lateinit var session: Session

private val _sessionId = AtomicLong(initialSessionId ?: -1L)
val sessionId: Long
get() {
return _sessionId.get()
}

internal var lastEventId: Long = 0
var lastEventTime: Long = -1L

internal fun start() {
internal fun start(session: Session) {
this.session = session
amplitude.amplitudeScope.launch(amplitude.storageIODispatcher) {
// Wait until build (including possible legacy data migration) is finished.
amplitude.isBuilt.await()

if (initialSessionId == null) {
_sessionId.set(
amplitude.storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLongOrNull()
?: -1
)
}
lastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLongOrNull() ?: 0
lastEventTime = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLongOrNull() ?: -1

for (message in eventMessageChannel) {
processEventMessage(message)
}
Expand All @@ -47,12 +33,20 @@ class Timeline(

override fun process(incomingEvent: BaseEvent) {
if (incomingEvent.timestamp == null) {
incomingEvent.timestamp = System.currentTimeMillis()
incomingEvent.timestamp = SystemTime.getCurrentTimeMillis()
}

eventMessageChannel.trySend(EventQueueMessage(incomingEvent, (amplitude as Amplitude).inForeground))
}

internal suspend fun processImmediately(incomingEvent: BaseEvent) {
if (incomingEvent.timestamp == null) {
incomingEvent.timestamp = SystemTime.getCurrentTimeMillis()
}

processEventMessage(EventQueueMessage(incomingEvent, (amplitude as Amplitude).inForeground))
}

private suspend fun processEventMessage(message: EventQueueMessage) {
val event = message.event
var sessionEvents: Iterable<BaseEvent>? = null
Expand All @@ -61,50 +55,50 @@ class Timeline(
var skipEvent = false

if (event.eventType == Amplitude.START_SESSION_EVENT) {
setSessionId(eventSessionId ?: eventTimestamp)
refreshSessionTime(eventTimestamp)
session.setSessionId(eventSessionId ?: eventTimestamp)
session.refreshSessionTime(eventTimestamp)
} else if (event.eventType == Amplitude.END_SESSION_EVENT) {
// do nothing
} else if (event.eventType == Amplitude.DUMMY_ENTER_FOREGROUND_EVENT) {
skipEvent = true
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
sessionEvents = session.startNewSessionIfNeeded(eventTimestamp)
} else if (event.eventType == Amplitude.DUMMY_EXIT_FOREGROUND_EVENT) {
skipEvent = true
refreshSessionTime(eventTimestamp)
session.refreshSessionTime(eventTimestamp)
} else {
if (!message.inForeground) {
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
sessionEvents = session.startNewSessionIfNeeded(eventTimestamp)
} else {
refreshSessionTime(eventTimestamp)
session.refreshSessionTime(eventTimestamp)
}
}

if (!skipEvent && event.sessionId == null) {
event.sessionId = sessionId
event.sessionId = session.sessionId
}

val savedLastEventId = lastEventId
val savedLastEventId = session.lastEventId

sessionEvents?.let {
it.forEach { e ->
e.eventId ?: let {
val newEventId = lastEventId + 1
val newEventId = session.lastEventId + 1
e.eventId = newEventId
lastEventId = newEventId
session.lastEventId = newEventId
}
}
}

if (!skipEvent) {
event.eventId ?: let {
val newEventId = lastEventId + 1
val newEventId = session.lastEventId + 1
event.eventId = newEventId
lastEventId = newEventId
session?.lastEventId = newEventId
}
}

if (lastEventId > savedLastEventId) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
if (session.lastEventId > savedLastEventId) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, session.lastEventId.toString())
}

sessionEvents?.let {
Expand All @@ -117,66 +111,6 @@ class Timeline(
super.process(event)
}
}

private suspend fun startNewSessionIfNeeded(timestamp: Long): Iterable<BaseEvent>? {
if (inSession() && isWithinMinTimeBetweenSessions(timestamp)) {
refreshSessionTime(timestamp)
return null
}
return startNewSession(timestamp)
}

private suspend fun setSessionId(timestamp: Long) {
_sessionId.set(timestamp)
amplitude.storage.write(Storage.Constants.PREVIOUS_SESSION_ID, sessionId.toString())
}

private suspend fun startNewSession(timestamp: Long): Iterable<BaseEvent> {
val sessionEvents = mutableListOf<BaseEvent>()
val configuration = amplitude.configuration as Configuration
// If any trackingSessionEvents is false (default value is true), means it is manually set
@Suppress("DEPRECATION")
val trackingSessionEvents = configuration.trackingSessionEvents && configuration.defaultTracking.sessions

// end previous session
if (trackingSessionEvents && inSession()) {
val sessionEndEvent = BaseEvent()
sessionEndEvent.eventType = Amplitude.END_SESSION_EVENT
sessionEndEvent.timestamp = if (lastEventTime > 0) lastEventTime else null
sessionEndEvent.sessionId = sessionId
sessionEvents.add(sessionEndEvent)
}

// start new session
setSessionId(timestamp)
refreshSessionTime(timestamp)
if (trackingSessionEvents) {
val sessionStartEvent = BaseEvent()
sessionStartEvent.eventType = Amplitude.START_SESSION_EVENT
sessionStartEvent.timestamp = timestamp
sessionStartEvent.sessionId = sessionId
sessionEvents.add(sessionStartEvent)
}

return sessionEvents
}

private suspend fun refreshSessionTime(timestamp: Long) {
if (!inSession()) {
return
}
lastEventTime = timestamp
amplitude.storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString())
}

private fun isWithinMinTimeBetweenSessions(timestamp: Long): Boolean {
val sessionLimit: Long = (amplitude.configuration as Configuration).minTimeBetweenSessionsMillis
return timestamp - lastEventTime < sessionLimit
}

private fun inSession(): Boolean {
return sessionId >= 0
}
}

data class EventQueueMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ class RemnantDataMigration(
companion object {
const val DEVICE_ID_KEY = "device_id"
const val USER_ID_KEY = "user_id"
const val LAST_EVENT_TIME_KEY = "last_event_time"
const val LAST_EVENT_ID_KEY = "last_event_id"
const val PREVIOUS_SESSION_ID_KEY = "previous_session_id"
// const val LAST_EVENT_TIME_KEY = "last_event_time"
// const val LAST_EVENT_ID_KEY = "last_event_id"
// const val PREVIOUS_SESSION_ID_KEY = "previous_session_id"
}

lateinit var databaseStorage: DatabaseStorage
Expand All @@ -34,7 +34,8 @@ class RemnantDataMigration(
val firstRunSinceUpgrade = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLongOrNull() == null

moveDeviceAndUserId()
moveSessionData()
// We don't migrate session data as we want to reset on a new app install
// moveSessionData()

if (firstRunSinceUpgrade) {
moveInterceptedIdentifies()
Expand Down Expand Up @@ -67,36 +68,36 @@ class RemnantDataMigration(
}
}

private suspend fun moveSessionData() {
try {
val currentSessionId = amplitude.storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLongOrNull()
val currentLastEventTime = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLongOrNull()
val currentLastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLongOrNull()

val previousSessionId = databaseStorage.getLongValue(PREVIOUS_SESSION_ID_KEY)
val lastEventTime = databaseStorage.getLongValue(LAST_EVENT_TIME_KEY)
val lastEventId = databaseStorage.getLongValue(LAST_EVENT_ID_KEY)

if (currentSessionId == null && previousSessionId != null) {
amplitude.storage.write(Storage.Constants.PREVIOUS_SESSION_ID, previousSessionId.toString())
databaseStorage.removeLongValue(PREVIOUS_SESSION_ID_KEY)
}

if (currentLastEventTime == null && lastEventTime != null) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString())
databaseStorage.removeLongValue(LAST_EVENT_TIME_KEY)
}

if (currentLastEventId == null && lastEventId != null) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
databaseStorage.removeLongValue(LAST_EVENT_ID_KEY)
}
} catch (e: Exception) {
LogcatLogger.logger.error(
"session data migration failed: ${e.message}"
)
}
}
// private suspend fun moveSessionData() {
// try {
// val currentSessionId = amplitude.storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLongOrNull()
// val currentLastEventTime = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLongOrNull()
// val currentLastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLongOrNull()
//
// val previousSessionId = databaseStorage.getLongValue(PREVIOUS_SESSION_ID_KEY)
// val lastEventTime = databaseStorage.getLongValue(LAST_EVENT_TIME_KEY)
// val lastEventId = databaseStorage.getLongValue(LAST_EVENT_ID_KEY)
//
// if (currentSessionId == null && previousSessionId != null) {
// amplitude.storage.write(Storage.Constants.PREVIOUS_SESSION_ID, previousSessionId.toString())
// databaseStorage.removeLongValue(PREVIOUS_SESSION_ID_KEY)
// }
//
// if (currentLastEventTime == null && lastEventTime != null) {
// amplitude.storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString())
// databaseStorage.removeLongValue(LAST_EVENT_TIME_KEY)
// }
//
// if (currentLastEventId == null && lastEventId != null) {
// amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
// databaseStorage.removeLongValue(LAST_EVENT_ID_KEY)
// }
// } catch (e: Exception) {
// LogcatLogger.logger.error(
// "session data migration failed: ${e.message}"
// )
// }
// }

private suspend fun moveEvents() {
try {
Expand Down
Loading

0 comments on commit 307e196

Please sign in to comment.