Skip to content

Commit

Permalink
feat: update retry handling (#113)
Browse files Browse the repository at this point in the history
  • Loading branch information
qingzhuozhen authored Apr 10, 2023
1 parent 3689fc1 commit 1020acd
Show file tree
Hide file tree
Showing 15 changed files with 454 additions and 87 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pull-request-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ on: [pull_request]

jobs:
test:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'zulu'
distribution: 'temurin'
- uses: actions/checkout@v3
- name: Build
run: ./gradlew build
Expand Down
1 change: 1 addition & 0 deletions android/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ dependencies {
testImplementation 'androidx.test.ext:junit:1.1.3'
androidTestImplementation 'androidx.test.ext:junit:1.1.3'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.4.0'
testImplementation 'org.json:json:20211205'
}

tasks.dokkaHtmlPartial.configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ open class Configuration @JvmOverloads constructor(
var flushEventsOnClose: Boolean = true,
var minTimeBetweenSessionsMillis: Long = MIN_TIME_BETWEEN_SESSIONS_MILLIS,
var trackingSessionEvents: Boolean = true,
override var identifyBatchIntervalMillis: Long = IDENTIFY_BATCH_INVERVAL_MILLIS,
override var identifyBatchIntervalMillis: Long = IDENTIFY_BATCH_INTERVAL_MILLIS,
override var identifyInterceptStorageProvider: StorageProvider = AndroidStorageProvider()
) : Configuration(apiKey, flushQueueSize, flushIntervalMillis, instanceName, optOut, storageProvider, loggerProvider, minIdLength, partnerId, callback, flushMaxRetries, useBatch, serverZone, serverUrl, plan, ingestionMetadata, identifyBatchIntervalMillis, identifyInterceptStorageProvider) {
companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ class AndroidStorage(
return eventsFile.read()
}

override fun releaseFile(filePath: String) {
eventsFile.release(filePath)
}

override suspend fun getEventsString(content: Any): String {
return eventsFile.getEventString(content as String)
}
Expand All @@ -72,17 +76,13 @@ class AndroidStorage(
configuration: Configuration,
scope: CoroutineScope,
dispatcher: CoroutineDispatcher,
events: Any,
eventsString: String
): ResponseHandler {
return FileResponseHandler(
this,
eventPipeline,
configuration,
scope,
dispatcher,
events as String,
eventsString,
logger
)
}
Expand Down
2 changes: 2 additions & 0 deletions android/src/test/java/com/amplitude/android/AmplitudeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.mockk.mockkStatic
import io.mockk.slot
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.TestCoroutineScheduler
import kotlinx.coroutines.test.advanceUntilIdle
Expand All @@ -37,6 +38,7 @@ open class StubPlugin : EventPlugin {
override lateinit var amplitude: com.amplitude.core.Amplitude
}

@ExperimentalCoroutinesApi
class AmplitudeTest {
private var context: Context? = null
private var amplitude: Amplitude? = null
Expand Down
284 changes: 284 additions & 0 deletions android/src/test/java/com/amplitude/android/ResponseHandlerTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
package com.amplitude.android

import android.content.Context
import androidx.test.core.app.ApplicationProvider
import com.amplitude.android.events.EventOptions
import com.amplitude.android.plugins.AndroidLifecyclePlugin
import com.amplitude.common.android.AndroidContextProvider
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.utilities.toEvents
import io.mockk.every
import io.mockk.mockkConstructor
import io.mockk.mockkStatic
import kotlinx.coroutines.ExperimentalCoroutinesApi
import okhttp3.mockwebserver.MockResponse
import okhttp3.mockwebserver.MockWebServer
import okhttp3.mockwebserver.RecordedRequest
import org.json.JSONObject
import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.robolectric.RobolectricTestRunner
import java.util.concurrent.TimeUnit

@RunWith(RobolectricTestRunner::class)
class ResponseHandlerTest {
private lateinit var server: MockWebServer
private lateinit var amplitude: Amplitude

@ExperimentalCoroutinesApi
@Before
fun setup() {
server = MockWebServer()
server.start()
val context = ApplicationProvider.getApplicationContext<Context>()
mockContextProvider()

val apiKey = "test-api-key"
amplitude = Amplitude(apiKey, context) {
this.serverUrl = server.url("/").toString()
this.trackingSessionEvents = false
this.flushIntervalMillis = 150
this.identifyBatchIntervalMillis = 1000
this.flushMaxRetries = 3
}
}

@After
fun shutdown() {
server.shutdown()
}

@Test
fun `test handle on success`() {
server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200))
var eventCompleteCount = 0
val statusMap = mutableMapOf<Int, Int>()
val options = EventOptions()
options.callback = { _: BaseEvent, status: Int, _: String ->
eventCompleteCount++
statusMap.put(status, statusMap.getOrDefault(status, 0) + 1)
}
amplitude.track("test event 1", options = options)
amplitude.track("test event 2", options = options)
val request = runRequest()
// verify request
assertNotNull(request)
val events = getEventsFromRequest(request!!)
assertEquals(2, events.size)
assertEquals(1, server.requestCount)
Thread.sleep(100)
// verify events covered with correct response
assertEquals(2, statusMap.get(200))
assertEquals(2, eventCompleteCount)
}

@Test
fun `test handle on rate limit`() {
val rateLimitBody = """
{
"code": 429,
"error": "Too many requests for some devices and users",
"eps_threshold": 30
}
""".trimIndent()
for (i in 1..6) {
server.enqueue(MockResponse().setBody(rateLimitBody).setResponseCode(429))
}
for (k in 1..4) {
amplitude.track("test event $k")
runRequest()
}
Thread.sleep(100)
// verify the total request count when reaching max retries
assertEquals(6, server.requestCount)
}

@Test
fun `test handle payload too large with only one event`() {
server.enqueue(MockResponse().setBody("{\"code\": \"413\", \"error\": \"payload too large\"}").setResponseCode(413))
val options = EventOptions()
var statusCode = 0
var callFinished = false
options.callback = { _: BaseEvent, status: Int, _: String ->
statusCode = status
callFinished = true
}
amplitude.track("test event 1", options = options)
val request = runRequest()
// verify we send only one request
assertNotNull(request)
val events = getEventsFromRequest(request!!)
assertEquals(1, events.size)
assertEquals(1, server.requestCount)
Thread.sleep(100)
// verify we remove the large event
assertEquals(413, statusCode)
assertTrue(callFinished)
}

@Test
fun `test handle payload too large`() {
server.enqueue(MockResponse().setBody("{\"code\": \"413\", \"error\": \"payload too large\"}").setResponseCode(413))
server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200))
server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200))
server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200))
var eventCompleteCount = 0
val statusMap = mutableMapOf<Int, Int>()
val options = EventOptions()
options.callback = { _: BaseEvent, status: Int, _: String ->
eventCompleteCount++
statusMap.put(status, statusMap.getOrDefault(status, 0) + 1)
}
amplitude.track("test event 1", options = options)
amplitude.track("test event 2", options = options)
amplitude.track("test event 3", options = options)
amplitude.track("test event 4", options = options)
val request = runRequest()
// verify the first request hit 413
assertNotNull(request)
val events = getEventsFromRequest(request!!)
assertEquals(4, events.size)
amplitude.track("test event 5", options = options)
runRequest()
runRequest()
runRequest()
Thread.sleep(150)
// verify we completed processing for the events after file split
assertEquals(4, server.requestCount)
assertEquals(5, statusMap.get(200))
assertEquals(5, eventCompleteCount)
}

@Test
fun `test handle bad request response`() {
val badRequestResponseBody = """
{
"code": 400,
"error": "Request missing required field",
"events_with_invalid_fields": {
"time": [
3
]
},
"events_with_missing_fields": {
"event_type": [
2
]
}
}
""".trimIndent()
server.enqueue(MockResponse().setBody(badRequestResponseBody).setResponseCode(400))
server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200))
var eventCompleteCount = 0
val statusMap = mutableMapOf<Int, Int>()
val options = EventOptions()
options.callback = { _: BaseEvent, status: Int, _: String ->
eventCompleteCount++
statusMap.put(status, statusMap.getOrDefault(status, 0) + 1)
}
amplitude.track("test event 1", options = options)
amplitude.track("test event 2", options = options)
amplitude.track("test event 3", options = options)
amplitude.track("test event 4", options = options)
// verify first request take 4 events hit 400
val request = runRequest()
assertNotNull(request)
val events = getEventsFromRequest(request!!)
assertEquals(4, events.size)
// verify second request take 2 events after removing 2 bad events
val request2 = runRequest()
assertNotNull(request2)
val events2 = getEventsFromRequest(request2!!)
assertEquals(2, events2.size)
assertEquals(2, server.requestCount)
Thread.sleep(10)
// verify the processed status
assertEquals(2, statusMap.get(400))
assertEquals(2, statusMap.get(200))
assertEquals(4, eventCompleteCount)
}

@Test
fun `test handle timeout response`() {
server.enqueue(MockResponse().setBody("{\"code\": \"408\"}").setResponseCode(408))
server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200))
server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200))
var eventCompleteCount = 0
val statusMap = mutableMapOf<Int, Int>()
val options = EventOptions()
options.callback = { _: BaseEvent, status: Int, _: String ->
eventCompleteCount++
statusMap.put(status, statusMap.getOrDefault(status, 0) + 1)
}
amplitude.track("test event 1", options = options)
runRequest()
amplitude.track("test event 2", options = options)
runRequest()
runRequest()
Thread.sleep(100)
// verify retry events success
assertEquals(3, server.requestCount)
assertEquals(2, statusMap.get(200))
assertEquals(2, eventCompleteCount)
}

@Test
fun `test handle failed response`() {
server.enqueue(MockResponse().setBody("{\"code\": \"500\"}").setResponseCode(500))
server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200))
server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200))
var eventCompleteCount = 0
val statusMap = mutableMapOf<Int, Int>()
val options = EventOptions()
options.callback = { _: BaseEvent, status: Int, _: String ->
eventCompleteCount++
statusMap.put(status, statusMap.getOrDefault(status, 0) + 1)
}
amplitude.track("test event 1", options = options)
runRequest()
amplitude.track("test event 2", options = options)
runRequest()
runRequest()
Thread.sleep(100)
// verify retry events success
assertEquals(3, server.requestCount)
assertEquals(2, statusMap.get(200))
assertEquals(2, eventCompleteCount)
}

private fun getEventsFromRequest(request: RecordedRequest): List<BaseEvent> {
val body = request.body.readUtf8()
return JSONObject(body).getJSONArray("events").toEvents()
}

private fun runRequest(): RecordedRequest? {
return try {
server.takeRequest(5, TimeUnit.SECONDS)
} catch (e: InterruptedException) {
null
}
}

private fun mockContextProvider() {
mockkStatic(AndroidLifecyclePlugin::class)

mockkConstructor(AndroidContextProvider::class)
every { anyConstructed<AndroidContextProvider>().osName } returns "android"
every { anyConstructed<AndroidContextProvider>().osVersion } returns "10"
every { anyConstructed<AndroidContextProvider>().brand } returns "google"
every { anyConstructed<AndroidContextProvider>().manufacturer } returns "Android"
every { anyConstructed<AndroidContextProvider>().model } returns "Android SDK built for x86"
every { anyConstructed<AndroidContextProvider>().language } returns "English"
every { anyConstructed<AndroidContextProvider>().advertisingId } returns ""
every { anyConstructed<AndroidContextProvider>().versionName } returns "1.0"
every { anyConstructed<AndroidContextProvider>().carrier } returns "Android"
every { anyConstructed<AndroidContextProvider>().country } returns "US"
every { anyConstructed<AndroidContextProvider>().mostRecentLocation } returns null
every { anyConstructed<AndroidContextProvider>().appSetId } returns ""
}
}
4 changes: 2 additions & 2 deletions core/src/main/java/com/amplitude/core/Configuration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ open class Configuration @JvmOverloads constructor(
open var serverUrl: String? = null,
open var plan: Plan? = null,
open var ingestionMetadata: IngestionMetadata? = null,
open var identifyBatchIntervalMillis: Long = IDENTIFY_BATCH_INVERVAL_MILLIS,
open var identifyBatchIntervalMillis: Long = IDENTIFY_BATCH_INTERVAL_MILLIS,
open var identifyInterceptStorageProvider: StorageProvider = InMemoryStorageProvider()
) {

Expand All @@ -34,7 +34,7 @@ open class Configuration @JvmOverloads constructor(
const val FLUSH_INTERVAL_MILLIS = 30 * 1000 // 30s
const val FLUSH_MAX_RETRIES = 5
const val DEFAULT_INSTANCE = "\$default_instance"
const val IDENTIFY_BATCH_INVERVAL_MILLIS = 30 * 1000L // 30s
const val IDENTIFY_BATCH_INTERVAL_MILLIS = 30 * 1000L // 30s
}

fun isValid(): Boolean {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/com/amplitude/core/Storage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ interface Storage {

suspend fun getEventsString(content: Any): String

fun getResponseHandler(eventPipeline: EventPipeline, configuration: Configuration, scope: CoroutineScope, dispatcher: CoroutineDispatcher, events: Any, eventsString: String): ResponseHandler
fun getResponseHandler(eventPipeline: EventPipeline, configuration: Configuration, scope: CoroutineScope, dispatcher: CoroutineDispatcher): ResponseHandler
}

interface StorageProvider {
Expand Down
Loading

0 comments on commit 1020acd

Please sign in to comment.