Skip to content

Commit

Permalink
fix: move session-specific logic to Timeline (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
falconandy authored Oct 28, 2022
1 parent 84bce16 commit b353b8c
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 164 deletions.
140 changes: 21 additions & 119 deletions android/src/main/java/com/amplitude/android/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import android.content.Context
import com.amplitude.android.plugins.AndroidContextPlugin
import com.amplitude.android.plugins.AndroidLifecyclePlugin
import com.amplitude.core.Amplitude
import com.amplitude.core.Storage
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.plugins.AmplitudeDestination
import com.amplitude.core.platform.plugins.GetAmpliExtrasPlugin
Expand All @@ -23,22 +22,23 @@ open class Amplitude(
) : Amplitude(configuration) {

internal var inForeground = false
var sessionId: Long
private set
internal var lastEventId: Long
var lastEventTime: Long
private lateinit var androidContextPlugin: AndroidContextPlugin

init {
storage = configuration.storageProvider.getStorage(this)
(timeline as Timeline).start()
registerShutdownHook()
}

this.sessionId = storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLong() ?: -1
this.lastEventId = storage.read(Storage.Constants.LAST_EVENT_ID)?.toLong() ?: 0
this.lastEventTime = storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLong() ?: -1
override fun createTimeline(): Timeline {
return Timeline().also { it.amplitude = this }
}

override fun build(): Deferred<Boolean> {
val client = this

val built = amplitudeScope.async(amplitudeDispatcher) {
storage = configuration.storageProvider.getStorage(client)

val storageDirectory = (configuration as Configuration).context.getDir("${FileStorage.STORAGE_PREFIX}-${configuration.instanceName}", Context.MODE_PRIVATE)
idContainer = IdentityContainer.getInstance(
IdentityConfiguration(
Expand Down Expand Up @@ -79,59 +79,19 @@ open class Amplitude(
return this
}

override fun processEvent(event: BaseEvent): Iterable<BaseEvent>? {
val eventTimestamp = event.timestamp ?: System.currentTimeMillis()
event.timestamp = eventTimestamp
var sessionEvents: Iterable<BaseEvent>? = null

if (!(event.eventType == START_SESSION_EVENT || event.eventType == END_SESSION_EVENT)) {
if (!inForeground) {
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
} else {
refreshSessionTime(eventTimestamp)
}
}

if (event.sessionId < 0) {
event.sessionId = sessionId
}

val savedLastEventId = lastEventId

sessionEvents ?. let {
it.forEach { e ->
e.eventId ?: let {
val newEventId = lastEventId + 1
e.eventId = newEventId
lastEventId = newEventId
}
}
}

event.eventId ?: let {
val newEventId = lastEventId + 1
event.eventId = newEventId
lastEventId = newEventId
}

if (lastEventId > savedLastEventId) {
amplitudeScope.launch(amplitudeDispatcher) {
storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
}
}

return sessionEvents
}

fun onEnterForeground(timestamp: Long) {
startNewSessionIfNeeded(timestamp) ?. let {
it.forEach { event -> process(event) }
}
inForeground = true

val dummySessionStartEvent = BaseEvent()
dummySessionStartEvent.eventType = START_SESSION_EVENT
dummySessionStartEvent.timestamp = timestamp
dummySessionStartEvent.sessionId = -1
timeline.process(dummySessionStartEvent)
}

fun onExitForeground() {
inForeground = false

amplitudeScope.launch(amplitudeDispatcher) {
isBuilt.await()
if ((configuration as Configuration).flushEventsOnClose) {
Expand All @@ -140,70 +100,12 @@ open class Amplitude(
}
}

fun startNewSessionIfNeeded(timestamp: Long): Iterable<BaseEvent>? {
if (inSession()) {

if (isWithinMinTimeBetweenSessions(timestamp)) {
refreshSessionTime(timestamp)
return null
private fun registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(object : Thread() {
override fun run() {
(this@Amplitude.timeline as Timeline).stop()
}

return startNewSession(timestamp)
}

return startNewSession(timestamp)
}

private fun setSessionId(timestamp: Long) {
sessionId = timestamp
amplitudeScope.launch(amplitudeDispatcher) {
storage.write(Storage.Constants.PREVIOUS_SESSION_ID, sessionId.toString())
}
}

private fun startNewSession(timestamp: Long): Iterable<BaseEvent> {
val sessionEvents = mutableListOf<BaseEvent>()

// end previous session
if ((configuration as Configuration).trackingSessionEvents && inSession()) {
val sessionEndEvent = BaseEvent()
sessionEndEvent.eventType = END_SESSION_EVENT
sessionEndEvent.timestamp = if (lastEventTime > 0) lastEventTime else null
sessionEndEvent.sessionId = sessionId
sessionEvents.add(sessionEndEvent)
}

// start new session
setSessionId(timestamp)
refreshSessionTime(timestamp)
if (configuration.trackingSessionEvents) {
val sessionStartEvent = BaseEvent()
sessionStartEvent.eventType = START_SESSION_EVENT
sessionStartEvent.timestamp = timestamp
sessionStartEvent.sessionId = sessionId
sessionEvents.add(sessionStartEvent)
}

return sessionEvents
}

fun refreshSessionTime(timestamp: Long) {
if (!inSession()) {
return
}
lastEventTime = timestamp
amplitudeScope.launch(amplitudeDispatcher) {
storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString())
}
}

private fun isWithinMinTimeBetweenSessions(timestamp: Long): Boolean {
val sessionLimit: Long = (configuration as Configuration).minTimeBetweenSessionsMillis
return timestamp - lastEventTime < sessionLimit
}

private fun inSession(): Boolean {
return sessionId >= 0
})
}

companion object {
Expand Down
166 changes: 166 additions & 0 deletions android/src/main/java/com/amplitude/android/Timeline.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package com.amplitude.android

import com.amplitude.core.Storage
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.Timeline
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch

class Timeline : Timeline() {
private val eventMessageChannel: Channel<EventQueueMessage> = Channel(Channel.UNLIMITED)
var sessionId: Long = -1
private set
internal var lastEventId: Long = 0
var lastEventTime: Long = -1

internal fun start() {
amplitude.amplitudeScope.launch(amplitude.storageIODispatcher) {
amplitude.isBuilt.await()

sessionId = amplitude.storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLong() ?: -1
lastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLong() ?: 0
lastEventTime = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLong() ?: -1

for (message in eventMessageChannel) {
processEventMessage(message)
}
}
}

internal fun stop() {
this.eventMessageChannel.cancel()
}

override fun process(incomingEvent: BaseEvent) {
if (incomingEvent.timestamp == null) {
incomingEvent.timestamp = System.currentTimeMillis()
}

eventMessageChannel.trySend(EventQueueMessage(incomingEvent, (amplitude as Amplitude).inForeground))
}

private suspend fun processEventMessage(message: EventQueueMessage) {
val event = message.event
var sessionEvents: Iterable<BaseEvent>? = null
val eventTimestamp = event.timestamp!!
var skipEvent = false

if (event.eventType == Amplitude.START_SESSION_EVENT) {
if (event.sessionId < 0) { // dummy start_session event
skipEvent = true
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
} else {
setSessionId(event.sessionId)
refreshSessionTime(eventTimestamp)
}
} else if (event.eventType == Amplitude.END_SESSION_EVENT) {
// do nothing
} else {
if (!message.inForeground) {
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
} else {
refreshSessionTime(eventTimestamp)
}
}

if (!skipEvent && event.sessionId < 0) {
event.sessionId = sessionId
}

val savedLastEventId = lastEventId

sessionEvents ?. let {
it.forEach { e ->
e.eventId ?: let {
val newEventId = lastEventId + 1
e.eventId = newEventId
lastEventId = newEventId
}
}
}

if (!skipEvent) {
event.eventId ?: let {
val newEventId = lastEventId + 1
event.eventId = newEventId
lastEventId = newEventId
}
}

if (lastEventId > savedLastEventId) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
}

sessionEvents ?. let {
it.forEach { e ->
super.process(e)
}
}

if (!skipEvent) {
super.process(event)
}
}

private suspend fun startNewSessionIfNeeded(timestamp: Long): Iterable<BaseEvent>? {
if (inSession() && isWithinMinTimeBetweenSessions(timestamp)) {
refreshSessionTime(timestamp)
return null
}
return startNewSession(timestamp)
}

private suspend fun setSessionId(timestamp: Long) {
sessionId = timestamp
amplitude.storage.write(Storage.Constants.PREVIOUS_SESSION_ID, sessionId.toString())
}

private suspend fun startNewSession(timestamp: Long): Iterable<BaseEvent> {
val sessionEvents = mutableListOf<BaseEvent>()
val trackingSessionEvents = (amplitude.configuration as Configuration).trackingSessionEvents

// end previous session
if (trackingSessionEvents && inSession()) {
val sessionEndEvent = BaseEvent()
sessionEndEvent.eventType = Amplitude.END_SESSION_EVENT
sessionEndEvent.timestamp = if (lastEventTime > 0) lastEventTime else null
sessionEndEvent.sessionId = sessionId
sessionEvents.add(sessionEndEvent)
}

// start new session
setSessionId(timestamp)
refreshSessionTime(timestamp)
if (trackingSessionEvents) {
val sessionStartEvent = BaseEvent()
sessionStartEvent.eventType = Amplitude.START_SESSION_EVENT
sessionStartEvent.timestamp = timestamp
sessionStartEvent.sessionId = sessionId
sessionEvents.add(sessionStartEvent)
}

return sessionEvents
}

private suspend fun refreshSessionTime(timestamp: Long) {
if (!inSession()) {
return
}
lastEventTime = timestamp
amplitude.storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString())
}

private fun isWithinMinTimeBetweenSessions(timestamp: Long): Boolean {
val sessionLimit: Long = (amplitude.configuration as Configuration).minTimeBetweenSessionsMillis
return timestamp - lastEventTime < sessionLimit
}

private fun inSession(): Boolean {
return sessionId >= 0
}
}

data class EventQueueMessage(
val event: BaseEvent,
val inForeground: Boolean
)
Loading

0 comments on commit b353b8c

Please sign in to comment.