Skip to content

Commit

Permalink
[Optimization-2734][cdc] Optimize CDCSOURCE convert type and add exte…
Browse files Browse the repository at this point in the history
…nds path (#2735)

Co-authored-by: wenmo <[email protected]>
  • Loading branch information
aiwenmo and aiwenmo authored Dec 24, 2023
1 parent 594ca90 commit e286965
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 37 deletions.
3 changes: 1 addition & 2 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.dinky.daemon.pool.ScheduleThreadPool;
import org.dinky.daemon.task.DaemonTask;
import org.dinky.daemon.task.DaemonTaskConfig;
import org.dinky.data.exception.BusException;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.Configuration;
import org.dinky.data.model.SystemConfiguration;
Expand Down Expand Up @@ -204,7 +203,7 @@ private void aboutDolphinSchedulerInitOperation(Object v) {
}
} catch (Exception e) {
log.error("Error in DolphinScheduler: ", e);
throw new BusException(
log.error(
"get or create DolphinScheduler project failed, please check the config of DolphinScheduler!");
}
}
Expand Down
13 changes: 6 additions & 7 deletions dinky-assembly/src/main/assembly/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
<include>**/*.yaml</include>
<include>**/log4j2.xml</include>
<include>**/DinkyFlinkDockerfile</include>

</includes>
</fileSet>
<fileSet>
Expand Down Expand Up @@ -70,7 +69,7 @@
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/build/extends/</directory>
<outputDirectory>plugins/flink1.14</outputDirectory>
<outputDirectory>extends/flink1.14/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.14-${project.version}.jar</include>
<include>dinky-client-1.14-${project.version}.jar</include>
Expand All @@ -79,39 +78,39 @@
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/build/extends/</directory>
<outputDirectory>plugins/flink1.15</outputDirectory>
<outputDirectory>extends/flink1.15/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.15-${project.version}.jar</include>
<include>dinky-client-1.15-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/build/extends/</directory>
<outputDirectory>plugins/flink1.16</outputDirectory>
<outputDirectory>extends/flink1.16/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.16-${project.version}.jar</include>
<include>dinky-client-1.16-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/build/extends/</directory>
<outputDirectory>plugins/flink1.17</outputDirectory>
<outputDirectory>extends/flink1.17/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.17-${project.version}.jar</include>
<include>dinky-client-1.17-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/build/extends/</directory>
<outputDirectory>plugins/flink1.18</outputDirectory>
<outputDirectory>extends/flink1.18/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.18-${project.version}.jar</include>
<include>dinky-client-1.18-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/build/extends/</directory>
<outputDirectory>plugins</outputDirectory>
<outputDirectory>extends</outputDirectory>
<includes>
<include>dinky-client-base-${project.version}.jar</include>
</includes>
Expand Down
46 changes: 46 additions & 0 deletions dinky-cdc/dinky-cdc-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,51 @@
<artifactId>dinky-flink-${dinky.flink.version}</artifactId>
<scope>${scope.runtime}</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<type>jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType;
Expand Down Expand Up @@ -337,7 +336,11 @@ public LogicalType getLogicalType(Column column) {
return new DateType();
case LOCAL_DATETIME:
case TIMESTAMP:
return new TimestampType();
if (column.getLength() != null) {
return new TimestampType(column.getLength());
} else {
return new TimestampType(3);
}
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default:
Expand Down Expand Up @@ -409,18 +412,28 @@ protected Optional<Object> convertDecimalType(Object value, LogicalType logicalT
protected Optional<Object> convertTimestampType(Object value, LogicalType logicalType) {
if (logicalType instanceof TimestampType) {
if (value instanceof Integer) {
return Optional.of(TimestampData.fromLocalDateTime(Instant.ofEpochMilli(((Integer) value).longValue())
return Optional.of(Instant.ofEpochMilli(((Integer) value).longValue())
.atZone(sinkTimeZone)
.toLocalDateTime()));
}

if (value instanceof Long) {
return Optional.of(TimestampData.fromLocalDateTime(
Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime()));
.toLocalDateTime());
} else if (value instanceof String) {
return Optional.of(
Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime());
} else {
TimestampType logicalType1 = (TimestampType) logicalType;
if (logicalType1.getPrecision() == 3) {
return Optional.of(Instant.ofEpochMilli((long) value)
.atZone(sinkTimeZone)
.toLocalDateTime());
} else if (logicalType1.getPrecision() > 3) {
return Optional.of(
Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, logicalType1.getPrecision() - 3))
.atZone(sinkTimeZone)
.toLocalDateTime());
}
return Optional.of(Instant.ofEpochSecond(((long) value))
.atZone(sinkTimeZone)
.toLocalDateTime());
}

return Optional.of(TimestampData.fromLocalDateTime(
Instant.parse(value.toString()).atZone(sinkTimeZone).toLocalDateTime()));
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,23 +164,25 @@ protected Optional<Object> convertTimestampType(Object value, LogicalType logica
return Optional.of(Instant.ofEpochMilli(((Integer) value).longValue())
.atZone(sinkTimeZone)
.toLocalDateTime());
}

if (value instanceof String) {
} else if (value instanceof String) {
return Optional.of(
Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime());
} else {
TimestampType logicalType1 = (TimestampType) logicalType;
if (logicalType1.getPrecision() == 3) {
return Optional.of(Instant.ofEpochMilli((long) value)
.atZone(sinkTimeZone)
.toLocalDateTime());
} else if (logicalType1.getPrecision() > 3) {
return Optional.of(
Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, logicalType1.getPrecision() - 3))
.atZone(sinkTimeZone)
.toLocalDateTime());
}
return Optional.of(Instant.ofEpochSecond(((long) value))
.atZone(sinkTimeZone)
.toLocalDateTime());
}

TimestampType timestampType = (TimestampType) logicalType;
// 转换为毫秒
if (timestampType.getPrecision() > 3) {
return Optional.of(
Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, timestampType.getPrecision() - 3.0))
.atZone(sinkTimeZone)
.toLocalDateTime());
}
return Optional.of(
Instant.ofEpochSecond(((long) value)).atZone(sinkTimeZone).toLocalDateTime());
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.cdc;

import org.dinky.cdc.sql.SQLSinkBuilder;

import org.apache.flink.table.types.logical.TimestampType;

import org.junit.Assert;
import org.junit.Test;

/**
* CDCSOURCETest
*
*/
public class SinkBuilderTest {

@Test
public void convertValueTimestampTest() {
SQLSinkBuilder sqlSinkBuilder = new SQLSinkBuilder();
Object value0 = sqlSinkBuilder.convertValue(1688946316L, new TimestampType(0));
Object value3 = sqlSinkBuilder.convertValue(1688946316123L, new TimestampType(3));
Object value6 = sqlSinkBuilder.convertValue(1688946316123456L, new TimestampType(6));
String target0 = "2023-07-09T23:45:16";
String target3 = "2023-07-09T23:45:16.123";
String target6 = "2023-07-09T23:45:16.123";
Assert.assertEquals(target0, value0.toString());
Assert.assertEquals(target3, value3.toString());
Assert.assertEquals(target6, value6.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private static void loadExpressionVariableClass() {
log.info("load class : {}", fullClassName);
} catch (ClassNotFoundException e) {
log.error(
"The class [{}] that needs to be loaded may not be loaded by dinky or there is no jar file of this class under dinky's lib/plugins. Please check, and try again. {}",
"The class [{}] that needs to be loaded may not be loaded by dinky or there is no jar file of this class under dinky's lib/plugins/extends. Please check, and try again. {}",
fullClassName,
e.getMessage(),
e);
Expand Down
2 changes: 1 addition & 1 deletion script/bin/auto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ FLINK_VERSION=${2:-1.18}
JAR_NAME="dinky-admin"

# Use FLINK_HOME:
CLASS_PATH=".:./lib/*:config:./plugins/*:./customJar/*:./plugins/flink${FLINK_VERSION}/dinky/*:./plugins/flink${FLINK_VERSION}/*"
CLASS_PATH=".:./lib/*:config:./plugins/*:./customJar/*:./plugins/flink${FLINK_VERSION}/dinky/*:./plugins/flink${FLINK_VERSION}/*:./extends/flink${FLINK_VERSION}/dinky/*:./extends/flink${FLINK_VERSION}/*"

PID_FILE="dinky.pid"

Expand Down

0 comments on commit e286965

Please sign in to comment.