Skip to content

Commit

Permalink
[cdc] Support minute and second expressions for cdc sync (apache#2575)
Browse files Browse the repository at this point in the history
  • Loading branch information
TaoZex authored Jan 9, 2024
1 parent 860e4cd commit ce2d51a
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 3 deletions.
8 changes: 8 additions & 0 deletions docs/layouts/shortcodes/generated/compute_column.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@
<td><h5>hour(date-column)</h5></td>
<td>Extract hour from a DATE, DATETIME or TIMESTAMP (or its corresponding string format). Output is an INT value represent the hour.</td>
</tr>
<tr>
<td><h5>minute(date-column)</h5></td>
<td>Extract minute from a DATE, DATETIME or TIMESTAMP (or its corresponding string format). Output is an INT value represent the minute.</td>
</tr>
<tr>
<td><h5>second(date-column)</h5></td>
<td>Extract second from a DATE, DATETIME or TIMESTAMP (or its corresponding string format). Output is an INT value represent the second.</td>
</tr>
<tr>
<td><h5>date_format(date-column,format)</h5></td>
<td>Convert date format from a DATE, DATETIME or TIMESTAMP (or its corresponding string format).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@
public interface Expression extends Serializable {

List<String> SUPPORTED_EXPRESSION =
Arrays.asList("year", "month", "day", "hour", "date_format", "substring", "truncate");
Arrays.asList(
"year",
"month",
"day",
"hour",
"minute",
"second",
"date_format",
"substring",
"truncate");

/** Return name of referenced field. */
String fieldReference();
Expand All @@ -60,6 +69,10 @@ static Expression create(
return day(fieldReference);
case "hour":
return hour(fieldReference);
case "minute":
return minute(fieldReference);
case "second":
return second(fieldReference);
case "date_format":
return dateFormat(fieldReference, literals);
case "substring":
Expand Down Expand Up @@ -91,6 +104,14 @@ static Expression hour(String fieldReference) {
return new HourComputer(fieldReference);
}

static Expression minute(String fieldReference) {
return new MinuteComputer(fieldReference);
}

static Expression second(String fieldReference) {
return new SecondComputer(fieldReference);
}

static Expression dateFormat(String fieldReference, String... literals) {
checkArgument(
literals.length == 1,
Expand Down Expand Up @@ -251,6 +272,62 @@ public String eval(String input) {
}
}

/** Compute minute from a time input. */
final class MinuteComputer implements Expression {

private static final long serialVersionUID = 1L;

private final String fieldReference;

private MinuteComputer(String fieldReference) {
this.fieldReference = fieldReference;
}

@Override
public String fieldReference() {
return fieldReference;
}

@Override
public DataType outputType() {
return DataTypes.INT();
}

@Override
public String eval(String input) {
LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input, 0);
return String.valueOf(localDateTime.getMinute());
}
}

/** Compute second from a time input. */
final class SecondComputer implements Expression {

private static final long serialVersionUID = 1L;

private final String fieldReference;

private SecondComputer(String fieldReference) {
this.fieldReference = fieldReference;
}

@Override
public String fieldReference() {
return fieldReference;
}

@Override
public DataType outputType() {
return DataTypes.INT();
}

@Override
public String eval(String input) {
LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input, 0);
return String.valueOf(localDateTime.getSecond());
}
}

/** date format from a time input. */
final class DateFormat implements Expression {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,12 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception {
"_hour_date=hour(_date)",
"_hour_datetime=hour(_datetime)",
"_hour_timestamp=hour(_timestamp)",
"_minute_date=minute(_date)",
"_minute_datetime=minute(_datetime)",
"_minute_timestamp=minute(_timestamp)",
"_second_date=second(_date)",
"_second_datetime=second(_datetime)",
"_second_timestamp=second(_timestamp)",
"_date_format_date=date_format(_date,yyyy)",
"_date_format_datetime=date_format(_datetime,yyyy-MM-dd)",
"_date_format_timestamp=date_format(_timestamp,yyyyMMdd)",
Expand Down Expand Up @@ -761,6 +767,12 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception {
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
Expand All @@ -785,6 +797,12 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception {
"_hour_date",
"_hour_datetime",
"_hour_timestamp",
"_minute_date",
"_minute_datetime",
"_minute_timestamp",
"_second_date",
"_second_datetime",
"_second_timestamp",
"_date_format_date",
"_date_format_datetime",
"_date_format_timestamp",
Expand All @@ -794,8 +812,8 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception {
});
List<String> expected =
Arrays.asList(
"+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0]",
"+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2]");
"+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0]",
"+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2]");
waitForResult(expected, table, rowType, Arrays.asList("pk", "_year_date"));
}

Expand Down

0 comments on commit ce2d51a

Please sign in to comment.