Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc] Calculated columns support constant value. #3156

Merged
merged 4 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/layouts/shortcodes/generated/other_functions.html
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,9 @@
If the column is an INT or LONG, truncate(column,width) will truncate the number with the algorithm `v - (((v % W) + W) % W)`. The `redundant` compute part is to keep the result always positive.
If the column is a DECIMAL, truncate(column,width) will truncate the decimal with the algorithm: let `scaled_W = decimal(W, scale(v))`, then return `v - (v % scaled_W)`.</td>
</tr>
<tr>
<td><h5>constant(value)</h5></td>
<td>Get constant value. Output is a STRING.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public String fieldReference() {
/** Compute column's value from given argument. Return null if input is null. */
@Nullable
public String eval(@Nullable String input) {
if (input == null) {
if (fieldReference() != null && input == null) {
return null;
}
return expression.eval(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ public static List<ComputedColumn> buildComputedColumns(
String[] args = expression.substring(left + 1, right).split(",");
checkArgument(args.length >= 1, "Computed column needs at least one argument.");

// handle constant expression
if (Expression.ExpressionProcessor.CONSTANT.name().equalsIgnoreCase(exprName)) {
computedColumns.add(
new ComputedColumn(
columnName,
Expression.create(exprName, null, null, args[0].trim())));
continue;
}

String fieldReference = args[0].trim();
String[] literals =
Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,15 @@
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Produce a computation result for computed column. */
public interface Expression extends Serializable {

List<String> SUPPORTED_EXPRESSION =
Arrays.asList(
"year",
"month",
"day",
"hour",
"minute",
"second",
"date_format",
"substring",
"truncate");

/** Return name of referenced field. */
String fieldReference();

Expand All @@ -62,40 +52,90 @@ public interface Expression extends Serializable {
/** Compute value from given input. Input and output are serialized to string. */
String eval(String input);

/** Enumerates the expression processing behaviors for computed column. */
enum ExpressionProcessor {
YEAR(
(fieldReference, fieldType, literals) ->
TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getYear, literals)),
MONTH(
(fieldReference, fieldType, literals) ->
TemporalToIntConverter.create(
fieldReference,
fieldType,
() -> LocalDateTime::getMonthValue,
literals)),
DAY(
(fieldReference, fieldType, literals) ->
TemporalToIntConverter.create(
fieldReference,
fieldType,
() -> LocalDateTime::getDayOfMonth,
literals)),
HOUR(
(fieldReference, fieldType, literals) ->
TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getHour, literals)),
MINUTE(
(fieldReference, fieldType, literals) ->
TemporalToIntConverter.create(
fieldReference,
fieldType,
() -> LocalDateTime::getMinute,
literals)),
SECOND(
(fieldReference, fieldType, literals) ->
TemporalToIntConverter.create(
fieldReference,
fieldType,
() -> LocalDateTime::getSecond,
literals)),
DATE_FORMAT(DateFormat::create),
SUBSTRING((fieldReference, fieldType, literals) -> substring(fieldReference, literals)),
TRUNCATE(Expression::truncate),
CONSTANT((fieldReference, fieldType, literals) -> constant(literals));

private final ExpressionFactory factory;

ExpressionProcessor(ExpressionFactory factory) {
this.factory = factory;
}

private ExpressionFactory factory() {
return this.factory;
}

private static final Map<String, ExpressionFactory> EXPRESSION_FACTORIES =
Arrays.stream(ExpressionProcessor.values())
.collect(
Collectors.toMap(
value -> value.name().toLowerCase(),
ExpressionProcessor::factory));

public static ExpressionFactory factory(String exprName) {
return EXPRESSION_FACTORIES.get(exprName.toLowerCase());
}
}

/** Define the function interface for creating Expression. */
@FunctionalInterface
interface ExpressionFactory {
Expression create(String fieldReference, DataType fieldType, String... literals);
}

static Expression create(
String exprName, String fieldReference, DataType fieldType, String... literals) {
switch (exprName.toLowerCase()) {
case "year":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getYear, literals);
case "month":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getMonthValue, literals);
case "day":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getDayOfMonth, literals);
case "hour":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getHour, literals);
case "minute":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getMinute, literals);
case "second":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getSecond, literals);
case "date_format":
return DateFormat.create(fieldReference, fieldType, literals);
case "substring":
return substring(fieldReference, literals);
case "truncate":
return truncate(fieldReference, fieldType, literals);
// TODO: support more expression
default:
throw new UnsupportedOperationException(
String.format(
"Unsupported expression: %s. Supported expressions are: %s",
exprName, String.join(",", SUPPORTED_EXPRESSION)));

ExpressionFactory factory = ExpressionProcessor.factory(exprName.toLowerCase());
if (factory == null) {
throw new UnsupportedOperationException(
String.format(
"Unsupported expression: %s. Supported expressions are: %s",
exprName,
String.join(",", ExpressionProcessor.EXPRESSION_FACTORIES.keySet())));
}

return factory.create(fieldReference, fieldType, literals);
}

static Expression substring(String fieldReference, String... literals) {
Expand Down Expand Up @@ -137,6 +177,15 @@ static Expression truncate(String fieldReference, DataType fieldType, String...
return new TruncateComputer(fieldReference, fieldType, literals[0]);
}

static Expression constant(String... literals) {
checkArgument(
literals.length == 1,
String.format(
"'constant' expression supports one argument, but found '%s'.",
literals.length));
return new Constant(literals[0]);
}

/** Expression to handle temporal value. */
abstract class TemporalExpressionBase<T> implements Expression {

Expand Down Expand Up @@ -431,4 +480,31 @@ private BigDecimal truncateDecimal(BigInteger unscaledWidth, BigDecimal value) {
return value.subtract(remainder);
}
}

/** Get constant value. */
final class Constant implements Expression {

private static final long serialVersionUID = 1L;

private final String value;

private Constant(String value) {
this.value = value;
}

@Override
public String fieldReference() {
return null;
}

@Override
public DataType outputType() {
return DataTypes.STRING();
}

@Override
public String eval(String input) {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,8 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception {
"_date_format_timestamp=date_format(_timestamp,yyyyMMdd)",
"_substring_date1=substring(_date,2)",
"_substring_date2=substring(_timestamp,5,10)",
"_truncate_date=trUNcate(pk,2)"); // test case-insensitive too
"_truncate_date=trUNcate(pk,2)", // test case-insensitive too
"_constant=constant(ApachePaimon)");

MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig)
Expand Down Expand Up @@ -785,7 +786,8 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT().notNull()
DataTypes.INT().notNull(),
DataTypes.STRING()
},
new String[] {
"pk",
Expand Down Expand Up @@ -815,12 +817,13 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception {
"_date_format_timestamp",
"_substring_date1",
"_substring_date2",
"_truncate_date"
"_truncate_date",
"_constant"
});
List<String> expected =
Arrays.asList(
"+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0]",
"+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2]");
"+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0, ApachePaimon]",
"+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2, ApachePaimon]");
waitForResult(expected, table, rowType, Arrays.asList("pk", "_year_date"));
}

Expand Down
Loading