Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bmoric/test file #48556

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
301793c
[WIP] Prerelease S3V2 Connector
johnny-schmidt Nov 15, 2024
410876b
Merge branch 'master' into jschmidt/s3v2/pre-release-2
benmoriceau Nov 18, 2024
fe99b0a
Merge branch 'master' into jschmidt/s3v2/pre-release-2
benmoriceau Nov 19, 2024
179579e
Merge branch 'jschmidt/s3v2/pre-release-2' of github.com:airbytehq/ai…
benmoriceau Nov 19, 2024
2abbb58
Logs
benmoriceau Nov 19, 2024
d07b139
force file transfer
benmoriceau Nov 19, 2024
8693f7f
Force stagin
benmoriceau Nov 19, 2024
9fa72fd
add log
benmoriceau Nov 19, 2024
0b6468b
More logs
benmoriceau Nov 19, 2024
5233138
Log everything
benmoriceau Nov 19, 2024
7f4b454
Try a fix
benmoriceau Nov 19, 2024
028dada
Fix deser
benmoriceau Nov 19, 2024
9b2b1ce
Fix build
benmoriceau Nov 19, 2024
44971da
Add part
benmoriceau Nov 19, 2024
9a3d406
Complete
benmoriceau Nov 19, 2024
16b51b3
Some fix
benmoriceau Nov 19, 2024
03b6e69
More lofs
benmoriceau Nov 19, 2024
05a32ff
test
benmoriceau Nov 19, 2024
003eb80
More test
benmoriceau Nov 19, 2024
5981728
Manager
benmoriceau Nov 19, 2024
fad72e2
Force close stream task
benmoriceau Nov 19, 2024
dbdb318
Fix build
benmoriceau Nov 19, 2024
6a90da4
Fix staging stuff
benmoriceau Nov 19, 2024
00a01ca
Fix state
benmoriceau Nov 20, 2024
c6c6b3b
Try add index
benmoriceau Nov 20, 2024
b20016d
fix build
benmoriceau Nov 20, 2024
f605a27
Try app yaml
benmoriceau Nov 20, 2024
4e672a5
Revert "Try app yaml"
benmoriceau Nov 20, 2024
0f88710
Revert "Revert "Try app yaml""
benmoriceau Nov 20, 2024
d4854f6
app yaml test
benmoriceau Nov 20, 2024
7bcea30
Delete s3 app
benmoriceau Nov 20, 2024
7249b52
Working integration test
benmoriceau Nov 20, 2024
72defa8
rm unsused
benmoriceau Nov 20, 2024
0ce13d6
Test
benmoriceau Nov 20, 2024
f79eeac
Rm double count
benmoriceau Nov 20, 2024
1f2dc1e
Send batch
benmoriceau Nov 20, 2024
77ad32d
Build
benmoriceau Nov 20, 2024
e8921b0
remove unnecessary call
benmoriceau Nov 20, 2024
2d6c4a6
test shave some coede
benmoriceau Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.airbyte.protocol.models.v0.AirbyteStreamState
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Value
import jakarta.inject.Singleton

Expand Down Expand Up @@ -171,22 +172,32 @@ data class DestinationFile(
@set:JsonProperty("source_file_url")
@JsonProperty("source_file_url")
var sourceFileUrl: String? = null
override fun toString(): String {
return "AirbyteRecordMessageFile(fileUrl=$fileUrl, bytes=$bytes, fileRelativePath=$fileRelativePath, modified=$modified, sourceFileUrl=$sourceFileUrl)"
}


}

override fun asProtocolMessage(): AirbyteMessage =
AirbyteMessage()
override fun asProtocolMessage(): AirbyteMessage {
val file = mapOf(
"file_url" to fileMessage.fileUrl,
"file_relative_path" to fileMessage.fileRelativePath,
"source_file_url" to fileMessage.sourceFileUrl,
"modified" to fileMessage.modified,
"bytes" to fileMessage.bytes,
)

return AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(
AirbyteRecordMessage()
.withStream(stream.name)
.withNamespace(stream.namespace)
.withEmittedAt(emittedAtMs)
.withAdditionalProperty("file_url", fileMessage.fileUrl)
.withAdditionalProperty("file_relative_path", fileMessage.fileRelativePath)
.withAdditionalProperty("source_file_url", fileMessage.sourceFileUrl)
.withAdditionalProperty("modified", fileMessage.modified)
.withAdditionalProperty("bytes", fileMessage.bytes)
.withAdditionalProperty("file", file)
)
}
}

private fun statusToProtocolMessage(
Expand Down Expand Up @@ -352,9 +363,13 @@ data object Undefined : DestinationMessage {
@Singleton
class DestinationMessageFactory(
private val catalog: DestinationCatalog,
@Value("airbyte.file-transfer.enabled") private val fileTransferEnabled: Boolean,
@Value("\${airbyte.file-transfer.enabled}") private val fileTransferEnabled: Boolean,
) {
private val log = KotlinLogging.logger {}
fun fromAirbyteMessage(message: AirbyteMessage, serialized: String): DestinationMessage {
if (fileTransferEnabled) {
log.info { "File transfer is enabled" }
}
fun toLong(value: Any?, name: String): Long? {
return value?.let {
when (it) {
Expand All @@ -378,29 +393,31 @@ class DestinationMessageFactory(
name = message.record.stream,
)
if (fileTransferEnabled) {
@Suppress("UNCHECKED_CAST")
val fileMessage = message.record.additionalProperties["file"] as Map<String, Any>
DestinationFile(
stream = stream.descriptor,
emittedAtMs = message.record.emittedAt,
serialized = serialized,
fileMessage =
DestinationFile.AirbyteRecordMessageFile(
fileUrl =
message.record.additionalProperties["file_url"] as String?,
fileMessage["file_url"] as String?,
bytes =
toLong(
message.record.additionalProperties["bytes"],
fileMessage["bytes"],
"message.record.bytes"
),
fileRelativePath =
message.record.additionalProperties["file_relative_path"]
fileMessage["file_relative_path"]
as String?,
modified =
toLong(
message.record.additionalProperties["modified"],
fileMessage["modified"],
"message.record.modified"
),
sourceFileUrl =
message.record.additionalProperties["source_file_url"]
fileMessage["source_file_url"]
as String?
)
)
Expand Down Expand Up @@ -439,6 +456,7 @@ class DestinationMessageFactory(
when (status.status) {
AirbyteStreamStatus.COMPLETE ->
if (fileTransferEnabled) {
log.info { "received complete status for $stream" }
DestinationFileStreamComplete(
stream.descriptor,
message.trace.emittedAt.toLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.message

import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton

interface Deserializer<T : Any> {
Expand All @@ -19,13 +20,15 @@ interface Deserializer<T : Any> {
@Singleton
class DefaultDestinationMessageDeserializer(private val messageFactory: DestinationMessageFactory) :
Deserializer<DestinationMessage> {
val log = KotlinLogging.logger {}

override fun deserialize(serialized: String): DestinationMessage {
try {
log.error { "Deserializing : $serialized" }
val airbyteMessage = Jsons.readValue(serialized, AirbyteMessage::class.java)
return messageFactory.fromAirbyteMessage(airbyteMessage, serialized)
} catch (t: Throwable) {
throw RuntimeException("Failed to deserialize AirbyteMessage")
throw RuntimeException("Failed to deserialize AirbyteMessage $serialized")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ interface DestinationTaskLauncher : TaskLauncher {
suspend fun handleNewBatch(stream: DestinationStream.Descriptor, wrapped: BatchEnvelope<*>)
suspend fun handleStreamClosed(stream: DestinationStream.Descriptor)
suspend fun handleTeardownComplete()
suspend fun handleFile(stream: DestinationStream.Descriptor, file: DestinationFile)
suspend fun handleFile(stream: DestinationStream.Descriptor, file: DestinationFile, index: Long)
}

/**
Expand Down Expand Up @@ -115,7 +115,7 @@ class DefaultDestinationTaskLauncher(

// Exception handling
private val exceptionHandler: TaskExceptionHandler<LeveledTask, WrappedTask<ScopedTask>>,
@Value("airbyte.file-transfer.enabled") private val fileTransferEnabled: Boolean,
@Value("\${airbyte.file-transfer.enabled}") private val fileTransferEnabled: Boolean,

// Input Comsumer requirements
private val inputFlow: SizedInputFlow<Reserved<DestinationMessage>>,
Expand All @@ -135,6 +135,11 @@ class DefaultDestinationTaskLauncher(
}

override suspend fun run() {
if (fileTransferEnabled) {
log.info { "File transfer is enabled" }
} else {
log.info { "File transfer is disabled" }
}
exceptionHandler.setCallback { succeeded.send(false) }

// Start the input consumer ASAP
Expand All @@ -146,6 +151,7 @@ class DefaultDestinationTaskLauncher(
recordQueueSupplier = recordQueueSupplier,
checkpointQueue = checkpointQueue,
this,
closeStreamTaskFactory
)
enqueue(inputConsumerTask)

Expand Down Expand Up @@ -260,7 +266,7 @@ class DefaultDestinationTaskLauncher(
succeeded.send(true)
}

override suspend fun handleFile(stream: DestinationStream.Descriptor, file: DestinationFile) {
enqueue(processFileTaskFactory.make(this, stream, file))
override suspend fun handleFile(stream: DestinationStream.Descriptor, file: DestinationFile, index: Long) {
enqueue(processFileTaskFactory.make(this, stream, file, index))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.task.implementor

import com.google.common.collect.Range
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.DestinationFile
Expand All @@ -21,7 +22,8 @@ class DefaultProcessFileTask(
override val streamDescriptor: DestinationStream.Descriptor,
private val taskLauncher: DestinationTaskLauncher,
private val syncManager: SyncManager,
private val file: DestinationFile
private val file: DestinationFile,
private val index: Long,
) : ProcessFileTask {
val log = KotlinLogging.logger {}

Expand All @@ -30,7 +32,7 @@ class DefaultProcessFileTask(

val batch = streamLoader.processFile(file)

val wrapped = BatchEnvelope(batch)
val wrapped = BatchEnvelope(batch, Range.singleton(index))
taskLauncher.handleNewBatch(streamDescriptor, wrapped)
}
}
Expand All @@ -40,6 +42,7 @@ interface ProcessFileTaskFactory {
taskLauncher: DestinationTaskLauncher,
stream: DestinationStream.Descriptor,
file: DestinationFile,
index: Long,
): ProcessFileTask
}

Expand All @@ -52,7 +55,8 @@ class DefaultFileRecordsTaskFactory(
taskLauncher: DestinationTaskLauncher,
stream: DestinationStream.Descriptor,
file: DestinationFile,
index: Long,
): ProcessFileTask {
return DefaultProcessFileTask(stream, taskLauncher, syncManager, file)
return DefaultProcessFileTask(stream, taskLauncher, syncManager, file, index)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

package io.airbyte.cdk.load.task.internal

import com.google.common.collect.Range
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.CheckpointMessage
import io.airbyte.cdk.load.message.CheckpointMessageWrapped
import io.airbyte.cdk.load.message.DestinationFile
Expand All @@ -22,6 +25,7 @@ import io.airbyte.cdk.load.message.GlobalCheckpoint
import io.airbyte.cdk.load.message.GlobalCheckpointWrapped
import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.load.message.QueueWriter
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.message.StreamCheckpoint
import io.airbyte.cdk.load.message.StreamCheckpointWrapped
import io.airbyte.cdk.load.message.StreamRecordCompleteWrapped
Expand All @@ -32,7 +36,9 @@ import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.load.task.KillableScope
import io.airbyte.cdk.load.task.SyncLevel
import io.airbyte.cdk.load.task.implementor.CloseStreamTaskFactory
import io.airbyte.cdk.load.util.use
import io.airbyte.protocol.models.Jsons
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
Expand All @@ -59,6 +65,7 @@ class DefaultInputConsumerTask(
private val checkpointQueue: QueueWriter<Reserved<CheckpointMessageWrapped>>,
private val syncManager: SyncManager,
private val destinationTaskLauncher: DestinationTaskLauncher,
private val closeStreamTaskFactory: CloseStreamTaskFactory,
) : InputConsumerTask {
private val log = KotlinLogging.logger {}

Expand Down Expand Up @@ -88,10 +95,24 @@ class DefaultInputConsumerTask(
is DestinationRecordStreamIncomplete ->
throw IllegalStateException("Stream $stream failed upstream, cannot continue.")
is DestinationFile -> {
destinationTaskLauncher.handleFile(stream, message)
val index = manager.countRecordIn()
destinationTaskLauncher.handleFile(stream, message, index)
}
is DestinationFileStreamComplete -> {
// manager.countRecordIn()
// val index = manager.countRecordIn()
reserved.release() // safe because multiple calls conflate
log.info { "marking EOS" }
manager.markEndOfStream()
val envelope = BatchEnvelope(SimpleBatch(Batch.State.COMPLETE))
// handleCheckpoint(reserved.replace(StreamCheckpoint(
// CheckpointMessage.Checkpoint(stream, Jsons.emptyObject()),
// null
// )), sizeBytes)
// val task = closeStreamTaskFactory.make(destinationTaskLauncher, stream)
// task.execute()
destinationTaskLauncher.handleNewBatch(stream, envelope)
log.info { "marking EOS" }
}
is DestinationFileStreamIncomplete ->
throw IllegalStateException("File stream $stream failed upstream, cannot continue.")
Expand Down Expand Up @@ -182,6 +203,7 @@ interface InputConsumerTaskFactory {
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationRecordWrapped>>,
checkpointQueue: QueueWriter<Reserved<CheckpointMessageWrapped>>,
destinationTaskLauncher: DestinationTaskLauncher,
closeStreamTaskFactory: CloseStreamTaskFactory,
): InputConsumerTask
}

Expand All @@ -196,14 +218,16 @@ class DefaultInputConsumerTaskFactory(private val syncManager: SyncManager) :
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationRecordWrapped>>,
checkpointQueue: QueueWriter<Reserved<CheckpointMessageWrapped>>,
destinationTaskLauncher: DestinationTaskLauncher,
closeStreamTaskFactory: CloseStreamTaskFactory,
): InputConsumerTask {
return DefaultInputConsumerTask(
catalog,
inputFlow,
recordQueueSupplier,
checkpointQueue,
syncManager,
destinationTaskLauncher
destinationTaskLauncher,
closeStreamTaskFactory,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ abstract class IntegrationTest(
@BeforeAll
fun setEnvVars() {
nonDockerMockEnvVars.set("WORKER_JOB_ID", "0")
nonDockerMockEnvVars.set("USE_FILE_TRANSFER", "true")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
import io.airbyte.cdk.load.data.TimestampValue
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecord.Change
import io.airbyte.cdk.load.message.StreamCheckpoint
Expand All @@ -51,6 +52,7 @@ import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import java.io.File
import java.math.BigDecimal
import java.time.LocalDate
import java.time.LocalDateTime
Expand Down Expand Up @@ -229,6 +231,46 @@ abstract class BasicFunctionalityIntegrationTest(
)
}

@Test
open fun testBasicWriteFile() {
File("/tmp/test_file").writeText("")
val stream =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(linkedMapOf("id" to intType)),
generationId = 0,
minimumGenerationId = 0,
syncId = 42,
)
val fileMessage = DestinationFile.AirbyteRecordMessageFile(
fileUrl = "/tmp/test_file",
bytes = 1234L,
fileRelativePath = "path/to/file",
modified = 4321L,
sourceFileUrl = "file://path/to/source",
)

runSync(
configContents,
stream,
listOf(
DestinationFile(
stream = stream.descriptor,
emittedAtMs = 1234,
serialized = "",
fileMessage = fileMessage,
),
StreamCheckpoint(
streamName = "test_stream",
streamNamespace = randomizedNamespace,
blob = """{"foo": "bar"}""",
sourceRecordCount = 1,
)
)
)
}

@Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/10413")
@Test
open fun testMidSyncCheckpointingStreamState(): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ObjectStoragePathFactory(
formatConfigProvider: ObjectStorageFormatConfigurationProvider? = null,
compressionConfigProvider: ObjectStorageCompressionConfigurationProvider<*>? = null,
timeProvider: TimeProvider,
// TODO: Inject value
) : PathFactory {
private val loadedAt = timeProvider.let { Instant.ofEpochMilli(it.currentTimeMillis()) }
private val pathConfig = pathConfigProvider.objectStoragePathConfiguration
Expand Down
Loading
Loading