diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 9ccd737be23c..5eb91b53f81c 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.9.0-rc.19 + dockerImageTag: 3.9.0-rc.20 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt index 1fa92d391805..2ba193cddb31 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt @@ -28,7 +28,9 @@ import io.airbyte.cdk.util.Jsons import io.micronaut.context.annotation.Primary import java.time.LocalDateTime import java.time.format.DateTimeFormatter +import java.time.format.DateTimeFormatterBuilder import java.time.format.DateTimeParseException +import java.time.temporal.ChronoField import java.util.* import java.util.concurrent.ConcurrentHashMap import javax.inject.Singleton @@ -312,7 +314,13 @@ class MysqlJdbcPartitionFactory( val timestampInStatePattern = "yyyy-MM-dd'T'HH:mm:ss" try { val formatter: DateTimeFormatter = - DateTimeFormatter.ofPattern(timestampInStatePattern) + DateTimeFormatterBuilder() + .appendPattern(timestampInStatePattern) + .optionalStart() + .appendFraction(ChronoField.NANO_OF_SECOND, 1, 9, true) + .optionalEnd() + .toFormatter() + Jsons.textNode( LocalDateTime.parse(stateValue, formatter) .format(LocalDateTimeCodec.formatter) diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt index ff732cf8a9a2..4a3e7b16cb4c 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt @@ -15,6 +15,7 @@ import io.airbyte.cdk.discover.MetaFieldDecorator import io.airbyte.cdk.jdbc.BinaryStreamFieldType import io.airbyte.cdk.jdbc.DefaultJdbcConstants import io.airbyte.cdk.jdbc.IntFieldType +import io.airbyte.cdk.jdbc.LocalDateTimeFieldType import io.airbyte.cdk.jdbc.OffsetDateTimeFieldType import io.airbyte.cdk.output.BufferingOutputConsumer import io.airbyte.cdk.read.ConcurrencyResource @@ -89,6 +90,20 @@ class MysqlJdbcPartitionFactoryTest { configuredCursor = binaryFieldId, ) + val datetimeFieldId = Field("id4", LocalDateTimeFieldType) + + val datetimeStream = + Stream( + id = + StreamIdentifier.from( + StreamDescriptor().withNamespace("test").withName("stream4") + ), + schema = setOf(datetimeFieldId), + configuredSyncMode = ConfiguredSyncMode.INCREMENTAL, + configuredPrimaryKey = listOf(datetimeFieldId), + configuredCursor = datetimeFieldId, + ) + private fun sharedState( global: Boolean = false, ): DefaultJdbcSharedState { @@ -235,6 +250,37 @@ class MysqlJdbcPartitionFactoryTest { ) } + @Test + fun testResumeFromCompletedCursorBasedReadTimestampWithoutTimezone() { + val incomingStateValue: OpaqueStateValue = + Jsons.readTree( + """ + { + "cursor": "2024-11-21T11:59:57.123", + "version": 2, + "state_type": "cursor_based", + "stream_name": "stream4", + "cursor_field": [ + "id4" + ], + "stream_namespace": "test", + "cursor_record_count": 1 + } + """.trimIndent() + ) + + val jdbcPartition = + mysqlJdbcPartitionFactory.create( + streamFeedBootstrap(datetimeStream, incomingStateValue) + ) + assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition) + + assertEquals( + Jsons.valueToTree("2024-11-21T11:59:57.123000"), + (jdbcPartition as MysqlJdbcCursorIncrementalPartition).cursorLowerBound + ) + } + @Test fun testResumeFromCursorBasedReadInitialRead() { val incomingStateValue: OpaqueStateValue =