diff --git a/docs/layouts/shortcodes/generated/compute_column.html b/docs/layouts/shortcodes/generated/compute_column.html
index cabeb5276558..f70c66654edf 100644
--- a/docs/layouts/shortcodes/generated/compute_column.html
+++ b/docs/layouts/shortcodes/generated/compute_column.html
@@ -55,6 +55,19 @@
'format' is compatible with Java's DateTimeFormatter String (for example, 'yyyy-MM-dd'). Output is a string value in converted date format.
+
+ time_format(time-column,format,[time-unit]) |
+ Convert date format from an integer numeric value (such as INT, BIGINT). The value will be considered as the epoch time of 1970-01-01T00:00:00Z.
+ You can control the time unit of the integer value by time-unit:
+
+ - second: The time unit is second. This is the default time unit.
+ - millis: The time unit is millisecond.
+ - micros: The time unit is microsecond.
+ - nanos: The time unit is nanosecond.
+
+ 'format' is compatible with Java's DateTimeFormatter String (for example, 'yyyy-MM-dd'). Output is a string value in converted date format.
+ |
+
substring(column,beginInclusive) |
Get column.substring(beginInclusive). Output is a STRING. |
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 554b8fc9ea57..30ad9460efde 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -18,8 +18,12 @@
package org.apache.paimon.flink.action.cdc;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.DateTimeUtils;
import javax.annotation.Nullable;
@@ -46,6 +50,7 @@ public interface Expression extends Serializable {
"minute",
"second",
"date_format",
+ "time_format",
"substring",
"truncate");
@@ -74,7 +79,9 @@ static Expression create(
case "second":
return second(fieldReference);
case "date_format":
- return dateFormat(fieldReference, literals);
+ return dateFormat(fieldReference, fieldType, literals);
+ case "time_format":
+ return timeFormat(fieldReference, fieldType, literals);
case "substring":
return substring(fieldReference, literals);
case "truncate":
@@ -112,7 +119,16 @@ static Expression second(String fieldReference) {
return new SecondComputer(fieldReference);
}
- static Expression dateFormat(String fieldReference, String... literals) {
+ static Expression dateFormat(String fieldReference, DataType fieldType, String... literals) {
+ String errMsg =
+ "'date_format' only supports to convert DATE, DATETIME OR TIMESTAMP data now.";
+ if (!(fieldType instanceof DateType || fieldType instanceof TimestampType)) {
+ if (fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC)) {
+ errMsg += " Perhaps you want to use 'time_format'?";
+ }
+ throw new IllegalArgumentException(errMsg);
+ }
+
checkArgument(
literals.length == 1,
String.format(
@@ -121,6 +137,26 @@ static Expression dateFormat(String fieldReference, String... literals) {
return new DateFormat(fieldReference, literals[0]);
}
+ static Expression timeFormat(String fieldReference, DataType fieldType, String... literals) {
+ String errMsg = "'time_format' only supports to convert integer numeric data now.";
+ if (!fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC)) {
+ if (fieldType instanceof DateType || fieldType instanceof TimestampType) {
+ errMsg += " Perhaps you want to use 'date_format'?";
+ }
+ throw new IllegalArgumentException(errMsg);
+ }
+
+ checkArgument(
+ literals.length == 1 || literals.length == 2,
+ String.format(
+ "'time_format' expression supports one or two arguments, but found '%s'.",
+ literals.length));
+
+ String timeunit = literals.length == 1 ? "second" : literals[1];
+
+ return new TimeFormat(fieldReference, literals[0], timeunit);
+ }
+
static Expression substring(String fieldReference, String... literals) {
checkArgument(
literals.length == 1 || literals.length == 2,
@@ -362,6 +398,77 @@ public String eval(String input) {
}
}
+ /** Format a time (int or long value) to string by formatter. */
+ final class TimeFormat implements Expression {
+
+ private static final long serialVersionUID = 1L;
+ private static final List SUPPORTED_TIME_UNIT =
+ Arrays.asList("second", "millis", "micros", "nanos");
+
+ private final String fieldReference;
+ private final String pattern;
+ private final String timeUnit;
+
+ private transient DateTimeFormatter formatter;
+
+ private TimeFormat(String fieldReference, String pattern, String timeUnit) {
+ this.fieldReference = fieldReference;
+ this.pattern = pattern;
+ this.timeUnit = timeUnit.toLowerCase();
+ checkArgument(
+ SUPPORTED_TIME_UNIT.contains(this.timeUnit),
+ "Unsupported time unit '%s' for time_format. Supported: %s",
+ timeUnit,
+ String.join(", ", SUPPORTED_TIME_UNIT));
+ }
+
+ @Override
+ public String fieldReference() {
+ return fieldReference;
+ }
+
+ @Override
+ public DataType outputType() {
+ return DataTypes.STRING();
+ }
+
+ @Override
+ public String eval(String input) {
+ if (formatter == null) {
+ formatter = DateTimeFormatter.ofPattern(pattern);
+ }
+ return toLocalDateTime(input).format(formatter);
+ }
+
+ private LocalDateTime toLocalDateTime(String input) {
+ long numericValue = Long.parseLong(input);
+ long milliseconds;
+ long nanosOfMillisecond;
+ switch (timeUnit) {
+ case "second":
+ milliseconds = numericValue * 1000L;
+ nanosOfMillisecond = 0;
+ break;
+ case "millis":
+ milliseconds = numericValue;
+ nanosOfMillisecond = 0;
+ break;
+ case "micros":
+ milliseconds = numericValue / 1000;
+ nanosOfMillisecond = numericValue % 1000 * 1000;
+ break;
+ case "nanos":
+ milliseconds = numericValue / 1_000_000;
+ nanosOfMillisecond = numericValue % 1_000_000;
+ break;
+ default:
+ throw new RuntimeException();
+ }
+ return Timestamp.fromEpochMillis(milliseconds, (int) nanosOfMillisecond)
+ .toLocalDateTime();
+ }
+ }
+
/** Get substring using {@link String#substring}. */
final class Substring implements Expression {
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index aee4c484f27d..6b10406f2523 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -37,8 +37,10 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -819,6 +821,92 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception {
waitForResult(expected, table, rowType, Arrays.asList("pk", "_year_date"));
}
+ @Test
+ @Timeout(60)
+ public void testTimeFormatComputedColum() throws Exception {
+ Map mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "test_time_format");
+
+ List computedColumnDefs =
+ Arrays.asList(
+ "_from_second0_nounit=time_format(_second_val0, yyyy-MM-dd HH:mm:ss)",
+ "_from_second0=time_format(_second_val0, yyyy-MM-dd HH:mm:ss, second)",
+ "_from_second1=time_format(_second_val1, yyyy-MM-dd HH:mm:ss, second)",
+ "_from_millisecond=time_format(_millis_val, yyyy-MM-dd HH:mm:ss.SSS, millis)",
+ "_from_microsecond=time_format(_micros_val, yyyy-MM-dd HH:mm:ss.SSSSSS, micros)",
+ "_from_nanoseconds=time_format(_nanos_val, yyyy-MM-dd HH:mm:ss.SSSSSSSSS, nanos)");
+
+ MySqlSyncTableAction action =
+ syncTableActionBuilder(mySqlConfig)
+ .withComputedColumnArgs(computedColumnDefs)
+ .build();
+ runActionWithDefaultEnv(action);
+
+ try (Statement statement = getStatement()) {
+ statement.execute("USE " + DATABASE_NAME);
+ insertEpochTime(1, "2024-01-01T00:01:02.123456789Z", statement);
+ insertEpochTime(2, "2024-02-29T12:01:02.123456789Z", statement);
+ }
+
+ FileStoreTable table = getFileStoreTable();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {
+ "pk",
+ "_second_val0",
+ "_second_val1",
+ "_millis_val",
+ "_micros_val",
+ "_nanos_val",
+ "_from_second0_nounit",
+ "_from_second0",
+ "_from_second1",
+ "_from_millisecond",
+ "_from_microsecond",
+ "_from_nanoseconds"
+ });
+ List expected =
+ Arrays.asList(
+ "+I[1, 1704067262, 1704067262, 1704067262123, 1704067262123456, 1704067262123456789, "
+ + "2024-01-01 00:01:02, 2024-01-01 00:01:02, 2024-01-01 00:01:02, "
+ + "2024-01-01 00:01:02.123, 2024-01-01 00:01:02.123456, 2024-01-01 00:01:02.123456789]",
+ "+I[2, 1709208062, 1709208062, 1709208062123, 1709208062123456, 1709208062123456789, "
+ + "2024-02-29 12:01:02, 2024-02-29 12:01:02, 2024-02-29 12:01:02, "
+ + "2024-02-29 12:01:02.123, 2024-02-29 12:01:02.123456, 2024-02-29 12:01:02.123456789]");
+ waitForResult(expected, table, rowType, Collections.singletonList("pk"));
+ }
+
+ private void insertEpochTime(int pk, String dateStr, Statement statement) throws SQLException {
+ Instant instant = Instant.parse(dateStr);
+ long epochSecond = instant.getEpochSecond();
+ int nano = instant.getNano();
+
+ statement.executeUpdate(
+ String.format(
+ "INSERT INTO test_time_format VALUES (%d, %d, %d, %d, %d, %d)",
+ pk,
+ epochSecond,
+ epochSecond,
+ epochSecond * 1000 + nano / 1_000_000,
+ epochSecond * 1000_000 + nano / 1_000,
+ epochSecond * 1_000_000_000 + nano));
+ }
+
@Test
@Timeout(60)
public void testSyncShards() throws Exception {
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index d119b1c899b6..81be396ddfeb 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -281,6 +281,16 @@ CREATE TABLE test_computed_column (
PRIMARY KEY (pk)
);
+CREATE TABLE test_time_format (
+ pk INT,
+ _second_val0 INT,
+ _second_val1 BIGINT,
+ _millis_val BIGINT,
+ _micros_val BIGINT,
+ _nanos_val BIGINT,
+ PRIMARY KEY (pk)
+);
+
CREATE TABLE test_options_change (
pk INT,
_date DATE,