Skip to content

Commit

Permalink
AMP-94015 add exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
qingzhuozhen committed Feb 23, 2024
1 parent c323a4a commit 8776d17
Showing 1 changed file with 130 additions and 92 deletions.
222 changes: 130 additions & 92 deletions core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class EventsFileManager(
private val directory: File,
private val storageKey: String,
private val kvs: KeyValueStore,
private val logger: Logger
private val logger: Logger,
) {
private val fileIndexKey = "amplitude.events.file.index.$storageKey"
private val storageVersionKey = "amplitude.events.file.version.$storageKey"
Expand All @@ -38,7 +38,7 @@ class EventsFileManager(
private val readMutex = readMutexMap.getOrPut(storageKey) { Mutex() }

init {
createDirectory(directory)
guardDirectory()
handleV1Files()
}

Expand All @@ -47,27 +47,41 @@ class EventsFileManager(
* opens a new file, if current file is full or uncreated
* stores the event
*/
suspend fun storeEvent(event: String) = writeMutex.withLock {
var file = currentFile()
if (!file.exists()) {
// create it
file.createNewFile()
}

// check if file is at capacity
while (file.length() > MAX_FILE_SIZE) {
finish(file)
// update index
file = currentFile()
suspend fun storeEvent(event: String) =
writeMutex.withLock {
if (!guardDirectory()) {
return@withLock
}
var file = currentFile()
if (!file.exists()) {
// create it
file.createNewFile()
try {
file.createNewFile()
} catch (e: IOException) {
logger.error("Failed to create new storage file: ${file.path}")
return@withLock
}
}
}

val contents = "${event}\n"
writeToFile(contents.toByteArray(), file)
}
// check if file is at capacity
while (file.length() > MAX_FILE_SIZE) {
finish(file)
// update index
file = currentFile()
if (!file.exists()) {
// create it
try {
file.createNewFile()
} catch (e: IOException) {
logger.error("Failed to create new storage file: ${file.path}")
return@withLock
}
}
}

val contents = "${event}\n"
writeToFile(contents.toByteArray(), file)
}

private fun incrementFileIndex(): Boolean {
val index = kvs.getLong(fileIndexKey, 0)
Expand All @@ -79,9 +93,10 @@ class EventsFileManager(
*/
fun read(): List<String> {
// we need to filter out .temp file, since it's operating on the writing thread
val fileList = directory.listFiles { _, name ->
name.contains(storageKey) && !name.endsWith(".tmp") && !name.endsWith(".properties")
} ?: emptyArray()
val fileList =
directory.listFiles { _, name ->
name.contains(storageKey) && !name.endsWith(".tmp") && !name.endsWith(".properties")
} ?: emptyArray()
return fileList.sortedBy { it ->
getSortKeyForFile(it)
}.map {
Expand All @@ -107,18 +122,22 @@ class EventsFileManager(
* closes current file, and increase the index
* so next write go to a new file
*/
suspend fun rollover() = writeMutex.withLock {
val file = currentFile()
if (file.exists() && file.length() > 0) {
finish(file)
suspend fun rollover() =
writeMutex.withLock {
val file = currentFile()
if (file.exists() && file.length() > 0) {
finish(file)
}
}
}

/**
* Split one file to two smaller file
* This is used to handle payload too large error response
*/
fun splitFile(filePath: String, events: JSONArray) {
fun splitFile(
filePath: String,
events: JSONArray,
) {
val originalFile = File(filePath)
if (!originalFile.exists()) {
return
Expand All @@ -127,52 +146,53 @@ class EventsFileManager(
val firstHalfFile = File(directory, "$fileName-1.tmp")
val secondHalfFile = File(directory, "$fileName-2.tmp")
val splitStrings = events.split()
writeEventsToFile(splitStrings.first, firstHalfFile)
writeEventsToFile(splitStrings.second, secondHalfFile)
writeEventsToSplitFile(splitStrings.first, firstHalfFile)
writeEventsToSplitFile(splitStrings.second, secondHalfFile)
this.remove(filePath)
}

suspend fun getEventString(filePath: String): String = readMutex.withLock {
// Block one time of file reads if another task has read the content of this file
if (filePathSet.contains(filePath)) {
filePathSet.remove(filePath)
return ""
}
filePathSet.add(filePath)
File(filePath).bufferedReader().use<BufferedReader, String> {
val content = it.readText()
val isCurrentVersion = content.endsWith("\n")
if (isCurrentVersion) {
// handle current version
val events = JSONArray()
content.split("\n").forEach {
if (it.isNotEmpty()) {
try {
events.put(JSONObject(it))
} catch (e: JSONException) {
logger.error("Failed to parse event: $it")
suspend fun getEventString(filePath: String): String =
readMutex.withLock {
// Block one time of file reads if another task has read the content of this file
if (filePathSet.contains(filePath)) {
filePathSet.remove(filePath)
return@withLock ""
}
filePathSet.add(filePath)
File(filePath).bufferedReader().use<BufferedReader, String> {
val content = it.readText()
val isCurrentVersion = content.endsWith("\n")
if (isCurrentVersion) {
// handle current version
val events = JSONArray()
content.split("\n").forEach {
if (it.isNotEmpty()) {
try {
events.put(JSONObject(it))
} catch (e: JSONException) {
logger.error("Failed to parse event: $it")
}
}
}
}
return if (events.length() > 0) {
events.toString()
return@use if (events.length() > 0) {
events.toString()
} else {
""
}
} else {
""
}
} else {
// handle earlier versions
val normalizedContent = "[${content.trimStart('[', ',').trimEnd(']', ',')}]"
try {
val jsonArray = JSONArray(normalizedContent)
return jsonArray.toString()
} catch (e: JSONException) {
logger.error("Failed to parse events: $normalizedContent, dropping file: $filePath")
this.remove(filePath)
return ""
// handle earlier versions
val normalizedContent = "[${content.trimStart('[', ',').trimEnd(']', ',')}]"
try {
val jsonArray = JSONArray(normalizedContent)
return@use jsonArray.toString()
} catch (e: JSONException) {
logger.error("Failed to parse events: $normalizedContent, dropping file: $filePath")
this.remove(filePath)
return@use ""
}
}
}
}
}

fun release(filePath: String) {
filePathSet.remove(filePath)
Expand All @@ -183,12 +203,12 @@ class EventsFileManager(
// if tmp file doesn't exist or empty then we don't need to do anything
return
}
val fileWithoutExtension = file.nameWithoutExtension
val finishedFile = File(directory, "$fileWithoutExtension")
val fileNameWithoutExtension = file.nameWithoutExtension
val finishedFile = File(directory, fileNameWithoutExtension)
if (finishedFile.exists()) {
logger.debug("File already exists: $finishedFile, handle gracefully.")
// if the file already exists, race condition detected and rename the current file to a new name to avoid collision
val newName = "$fileWithoutExtension-${System.currentTimeMillis()}-${Random().nextInt(1000)}"
val newName = "$fileNameWithoutExtension-${System.currentTimeMillis()}-${Random().nextInt(1000)}"
file.renameTo(File(directory, newName))
return
} else {
Expand All @@ -200,14 +220,16 @@ class EventsFileManager(

// return the current tmp file
private fun currentFile(): File {
val file = curFile[storageKey] ?: run {
// check leftover tmp file
val fileList = directory.listFiles { _, name ->
name.contains(storageKey) && name.endsWith(".tmp")
} ?: emptyArray()
val file =
curFile[storageKey] ?: run {
// check leftover tmp file
val fileList =
directory.listFiles { _, name ->
name.contains(storageKey) && name.endsWith(".tmp")
} ?: emptyArray()

fileList.getOrNull(0)
}
fileList.getOrNull(0)
}
val index = kvs.getLong(fileIndexKey, 0)
curFile[storageKey] = file ?: File(directory, "$storageKey-$index.tmp")
return curFile[storageKey]!!
Expand All @@ -223,33 +245,38 @@ class EventsFileManager(
}

// write to underlying file
private fun writeToFile(content: ByteArray, file: File) {
private fun writeToFile(
content: ByteArray,
file: File,
) {
try {
FileOutputStream(file, true).use {
it.write(content)
it.flush()
}
} catch (e: FileNotFoundException) {
logger.error("File not found: ${file.absolutePath}")
logger.error("File not found: ${file.path}")
} catch (e: IOException) {
logger.error("Failed to write to file: ${file.absolutePath}")
logger.error("Failed to write to file: ${file.path}")
} catch (e: SecurityException) {
logger.error("Security exception when saving event: ${e.message}")
} catch (e: Exception) {
logger.error("Failed to write to file: ${file.path}")
}
}

private fun writeToFile(content: String, file: File) {
file.createNewFile()
FileOutputStream(file, true).use {
it.write(content.toByteArray())
it.flush()
}
file.renameTo(File(directory, file.nameWithoutExtension))
}

private fun writeEventsToFile(events: List<JSONObject>, file: File) {
private fun writeEventsToSplitFile(
events: List<JSONObject>,
file: File,
) {
val contents = events.joinToString(separator = "\n", postfix = "\n") { it.toString() }
writeToFile(contents, file)
try {
file.createNewFile()
writeToFile(contents.toByteArray(), file)
file.renameTo(File(directory, file.nameWithoutExtension))
} catch (e: IOException) {
logger.error("Failed to create or write to split file: ${file.path}")
}
}

private fun reset() {
Expand All @@ -260,9 +287,10 @@ class EventsFileManager(
if (kvs.getLong(storageVersionKey, 1L) > 1L) {
return
}
val unFinishedFiles = directory.listFiles { _, name ->
name.contains(storageKey) && name.endsWith(".tmp") && !name.endsWith(".properties")
} ?: emptyArray()
val unFinishedFiles =
directory.listFiles { _, name ->
name.contains(storageKey) && name.endsWith(".tmp") && !name.endsWith(".properties")
} ?: emptyArray()
unFinishedFiles.forEach {
val content = it.readText()
if (!content.endsWith("\n")) {
Expand All @@ -272,4 +300,14 @@ class EventsFileManager(
}
kvs.putLong(storageVersionKey, 2)
}

private fun guardDirectory(): Boolean {
try {
createDirectory(directory)
return true
} catch (e: IOException) {
logger.error("Failed to create directory for events storage: ${directory.path}")
return false
}
}
}

0 comments on commit 8776d17

Please sign in to comment.