Skip to content

Commit

Permalink
fix? status type can be null too?
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Nov 25, 2024
1 parent 59807a1 commit 114a17d
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.load.command

import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
Expand All @@ -14,6 +15,8 @@ import jakarta.inject.Singleton
* for usability.
*/
data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()) {
private val log = KotlinLogging.logger {}

private val byDescriptor: Map<DestinationStream.Descriptor, DestinationStream> =
streams.associateBy { it.descriptor }

Expand All @@ -23,6 +26,7 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()
"Catalog must have at least one stream: check that files are in the correct location."
)
}
log.info { "Destination catalog initialized: $streams"}
}

fun getStream(name: String, namespace: String?): DestinationStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ class DestinationMessageFactory(
namespace = status.streamDescriptor.namespace,
name = status.streamDescriptor.name,
)
if (message.trace.type == AirbyteTraceMessage.Type.STREAM_STATUS) {
if ((message.trace.type ?: AirbyteTraceMessage.Type.STREAM_STATUS) == AirbyteTraceMessage.Type.STREAM_STATUS) {
when (status.status) {
AirbyteStreamStatus.COMPLETE ->
if (fileTransferEnabled) {
Expand Down

0 comments on commit 114a17d

Please sign in to comment.