Skip to content

Commit

Permalink
[source-mysql] fix for datetime in mysql (airbytehq#48628)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong authored Nov 25, 2024
1 parent 2f12036 commit 86a0d52
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 2 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.9.0-rc.19
dockerImageTag: 3.9.0-rc.20
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import io.airbyte.cdk.util.Jsons
import io.micronaut.context.annotation.Primary
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.time.format.DateTimeFormatterBuilder
import java.time.format.DateTimeParseException
import java.time.temporal.ChronoField
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import javax.inject.Singleton
Expand Down Expand Up @@ -312,7 +314,13 @@ class MysqlJdbcPartitionFactory(
val timestampInStatePattern = "yyyy-MM-dd'T'HH:mm:ss"
try {
val formatter: DateTimeFormatter =
DateTimeFormatter.ofPattern(timestampInStatePattern)
DateTimeFormatterBuilder()
.appendPattern(timestampInStatePattern)
.optionalStart()
.appendFraction(ChronoField.NANO_OF_SECOND, 1, 9, true)
.optionalEnd()
.toFormatter()

Jsons.textNode(
LocalDateTime.parse(stateValue, formatter)
.format(LocalDateTimeCodec.formatter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.airbyte.cdk.discover.MetaFieldDecorator
import io.airbyte.cdk.jdbc.BinaryStreamFieldType
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
import io.airbyte.cdk.jdbc.IntFieldType
import io.airbyte.cdk.jdbc.LocalDateTimeFieldType
import io.airbyte.cdk.jdbc.OffsetDateTimeFieldType
import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.cdk.read.ConcurrencyResource
Expand Down Expand Up @@ -89,6 +90,20 @@ class MysqlJdbcPartitionFactoryTest {
configuredCursor = binaryFieldId,
)

val datetimeFieldId = Field("id4", LocalDateTimeFieldType)

val datetimeStream =
Stream(
id =
StreamIdentifier.from(
StreamDescriptor().withNamespace("test").withName("stream4")
),
schema = setOf(datetimeFieldId),
configuredSyncMode = ConfiguredSyncMode.INCREMENTAL,
configuredPrimaryKey = listOf(datetimeFieldId),
configuredCursor = datetimeFieldId,
)

private fun sharedState(
global: Boolean = false,
): DefaultJdbcSharedState {
Expand Down Expand Up @@ -235,6 +250,37 @@ class MysqlJdbcPartitionFactoryTest {
)
}

@Test
fun testResumeFromCompletedCursorBasedReadTimestampWithoutTimezone() {
val incomingStateValue: OpaqueStateValue =
Jsons.readTree(
"""
{
"cursor": "2024-11-21T11:59:57.123",
"version": 2,
"state_type": "cursor_based",
"stream_name": "stream4",
"cursor_field": [
"id4"
],
"stream_namespace": "test",
"cursor_record_count": 1
}
""".trimIndent()
)

val jdbcPartition =
mysqlJdbcPartitionFactory.create(
streamFeedBootstrap(datetimeStream, incomingStateValue)
)
assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition)

assertEquals(
Jsons.valueToTree("2024-11-21T11:59:57.123000"),
(jdbcPartition as MysqlJdbcCursorIncrementalPartition).cursorLowerBound
)
}

@Test
fun testResumeFromCursorBasedReadInitialRead() {
val incomingStateValue: OpaqueStateValue =
Expand Down

0 comments on commit 86a0d52

Please sign in to comment.