diff --git a/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt b/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt index 8ededf50..68c7dbe2 100644 --- a/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt +++ b/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt @@ -21,7 +21,7 @@ class EventsFileManager( private val directory: File, private val storageKey: String, private val kvs: KeyValueStore, - private val logger: Logger + private val logger: Logger, ) { private val fileIndexKey = "amplitude.events.file.index.$storageKey" private val storageVersionKey = "amplitude.events.file.version.$storageKey" @@ -38,7 +38,7 @@ class EventsFileManager( private val readMutex = readMutexMap.getOrPut(storageKey) { Mutex() } init { - createDirectory(directory) + guardDirectory() handleV1Files() } @@ -47,27 +47,41 @@ class EventsFileManager( * opens a new file, if current file is full or uncreated * stores the event */ - suspend fun storeEvent(event: String) = writeMutex.withLock { - var file = currentFile() - if (!file.exists()) { - // create it - file.createNewFile() - } - - // check if file is at capacity - while (file.length() > MAX_FILE_SIZE) { - finish(file) - // update index - file = currentFile() + suspend fun storeEvent(event: String) = + writeMutex.withLock { + if (!guardDirectory()) { + return@withLock + } + var file = currentFile() if (!file.exists()) { // create it - file.createNewFile() + try { + file.createNewFile() + } catch (e: IOException) { + logger.error("Failed to create new storage file: ${file.path}") + return@withLock + } } - } - val contents = "${event}\n" - writeToFile(contents.toByteArray(), file) - } + // check if file is at capacity + while (file.length() > MAX_FILE_SIZE) { + finish(file) + // update index + file = currentFile() + if (!file.exists()) { + // create it + try { + file.createNewFile() + } catch (e: IOException) { + logger.error("Failed to create new storage file: ${file.path}") + return@withLock + } + } + } + + val contents = "${event}\n" + writeToFile(contents.toByteArray(), file) + } private fun incrementFileIndex(): Boolean { val index = kvs.getLong(fileIndexKey, 0) @@ -79,9 +93,10 @@ class EventsFileManager( */ fun read(): List { // we need to filter out .temp file, since it's operating on the writing thread - val fileList = directory.listFiles { _, name -> - name.contains(storageKey) && !name.endsWith(".tmp") && !name.endsWith(".properties") - } ?: emptyArray() + val fileList = + directory.listFiles { _, name -> + name.contains(storageKey) && !name.endsWith(".tmp") && !name.endsWith(".properties") + } ?: emptyArray() return fileList.sortedBy { it -> getSortKeyForFile(it) }.map { @@ -107,18 +122,22 @@ class EventsFileManager( * closes current file, and increase the index * so next write go to a new file */ - suspend fun rollover() = writeMutex.withLock { - val file = currentFile() - if (file.exists() && file.length() > 0) { - finish(file) + suspend fun rollover() = + writeMutex.withLock { + val file = currentFile() + if (file.exists() && file.length() > 0) { + finish(file) + } } - } /** * Split one file to two smaller file * This is used to handle payload too large error response */ - fun splitFile(filePath: String, events: JSONArray) { + fun splitFile( + filePath: String, + events: JSONArray, + ) { val originalFile = File(filePath) if (!originalFile.exists()) { return @@ -127,52 +146,53 @@ class EventsFileManager( val firstHalfFile = File(directory, "$fileName-1.tmp") val secondHalfFile = File(directory, "$fileName-2.tmp") val splitStrings = events.split() - writeEventsToFile(splitStrings.first, firstHalfFile) - writeEventsToFile(splitStrings.second, secondHalfFile) + writeEventsToSplitFile(splitStrings.first, firstHalfFile) + writeEventsToSplitFile(splitStrings.second, secondHalfFile) this.remove(filePath) } - suspend fun getEventString(filePath: String): String = readMutex.withLock { - // Block one time of file reads if another task has read the content of this file - if (filePathSet.contains(filePath)) { - filePathSet.remove(filePath) - return "" - } - filePathSet.add(filePath) - File(filePath).bufferedReader().use { - val content = it.readText() - val isCurrentVersion = content.endsWith("\n") - if (isCurrentVersion) { - // handle current version - val events = JSONArray() - content.split("\n").forEach { - if (it.isNotEmpty()) { - try { - events.put(JSONObject(it)) - } catch (e: JSONException) { - logger.error("Failed to parse event: $it") + suspend fun getEventString(filePath: String): String = + readMutex.withLock { + // Block one time of file reads if another task has read the content of this file + if (filePathSet.contains(filePath)) { + filePathSet.remove(filePath) + return@withLock "" + } + filePathSet.add(filePath) + File(filePath).bufferedReader().use { + val content = it.readText() + val isCurrentVersion = content.endsWith("\n") + if (isCurrentVersion) { + // handle current version + val events = JSONArray() + content.split("\n").forEach { + if (it.isNotEmpty()) { + try { + events.put(JSONObject(it)) + } catch (e: JSONException) { + logger.error("Failed to parse event: $it") + } } } - } - return if (events.length() > 0) { - events.toString() + return@use if (events.length() > 0) { + events.toString() + } else { + "" + } } else { - "" - } - } else { - // handle earlier versions - val normalizedContent = "[${content.trimStart('[', ',').trimEnd(']', ',')}]" - try { - val jsonArray = JSONArray(normalizedContent) - return jsonArray.toString() - } catch (e: JSONException) { - logger.error("Failed to parse events: $normalizedContent, dropping file: $filePath") - this.remove(filePath) - return "" + // handle earlier versions + val normalizedContent = "[${content.trimStart('[', ',').trimEnd(']', ',')}]" + try { + val jsonArray = JSONArray(normalizedContent) + return@use jsonArray.toString() + } catch (e: JSONException) { + logger.error("Failed to parse events: $normalizedContent, dropping file: $filePath") + this.remove(filePath) + return@use "" + } } } } - } fun release(filePath: String) { filePathSet.remove(filePath) @@ -183,12 +203,12 @@ class EventsFileManager( // if tmp file doesn't exist or empty then we don't need to do anything return } - val fileWithoutExtension = file.nameWithoutExtension - val finishedFile = File(directory, "$fileWithoutExtension") + val fileNameWithoutExtension = file.nameWithoutExtension + val finishedFile = File(directory, fileNameWithoutExtension) if (finishedFile.exists()) { logger.debug("File already exists: $finishedFile, handle gracefully.") // if the file already exists, race condition detected and rename the current file to a new name to avoid collision - val newName = "$fileWithoutExtension-${System.currentTimeMillis()}-${Random().nextInt(1000)}" + val newName = "$fileNameWithoutExtension-${System.currentTimeMillis()}-${Random().nextInt(1000)}" file.renameTo(File(directory, newName)) return } else { @@ -200,14 +220,16 @@ class EventsFileManager( // return the current tmp file private fun currentFile(): File { - val file = curFile[storageKey] ?: run { - // check leftover tmp file - val fileList = directory.listFiles { _, name -> - name.contains(storageKey) && name.endsWith(".tmp") - } ?: emptyArray() + val file = + curFile[storageKey] ?: run { + // check leftover tmp file + val fileList = + directory.listFiles { _, name -> + name.contains(storageKey) && name.endsWith(".tmp") + } ?: emptyArray() - fileList.getOrNull(0) - } + fileList.getOrNull(0) + } val index = kvs.getLong(fileIndexKey, 0) curFile[storageKey] = file ?: File(directory, "$storageKey-$index.tmp") return curFile[storageKey]!! @@ -223,33 +245,38 @@ class EventsFileManager( } // write to underlying file - private fun writeToFile(content: ByteArray, file: File) { + private fun writeToFile( + content: ByteArray, + file: File, + ) { try { FileOutputStream(file, true).use { it.write(content) it.flush() } } catch (e: FileNotFoundException) { - logger.error("File not found: ${file.absolutePath}") + logger.error("File not found: ${file.path}") } catch (e: IOException) { - logger.error("Failed to write to file: ${file.absolutePath}") + logger.error("Failed to write to file: ${file.path}") } catch (e: SecurityException) { logger.error("Security exception when saving event: ${e.message}") + } catch (e: Exception) { + logger.error("Failed to write to file: ${file.path}") } } - private fun writeToFile(content: String, file: File) { - file.createNewFile() - FileOutputStream(file, true).use { - it.write(content.toByteArray()) - it.flush() - } - file.renameTo(File(directory, file.nameWithoutExtension)) - } - - private fun writeEventsToFile(events: List, file: File) { + private fun writeEventsToSplitFile( + events: List, + file: File, + ) { val contents = events.joinToString(separator = "\n", postfix = "\n") { it.toString() } - writeToFile(contents, file) + try { + file.createNewFile() + writeToFile(contents.toByteArray(), file) + file.renameTo(File(directory, file.nameWithoutExtension)) + } catch (e: IOException) { + logger.error("Failed to create or write to split file: ${file.path}") + } } private fun reset() { @@ -260,9 +287,10 @@ class EventsFileManager( if (kvs.getLong(storageVersionKey, 1L) > 1L) { return } - val unFinishedFiles = directory.listFiles { _, name -> - name.contains(storageKey) && name.endsWith(".tmp") && !name.endsWith(".properties") - } ?: emptyArray() + val unFinishedFiles = + directory.listFiles { _, name -> + name.contains(storageKey) && name.endsWith(".tmp") && !name.endsWith(".properties") + } ?: emptyArray() unFinishedFiles.forEach { val content = it.readText() if (!content.endsWith("\n")) { @@ -272,4 +300,14 @@ class EventsFileManager( } kvs.putLong(storageVersionKey, 2) } + + private fun guardDirectory(): Boolean { + try { + createDirectory(directory) + return true + } catch (e: IOException) { + logger.error("Failed to create directory for events storage: ${directory.path}") + return false + } + } }