diff --git a/docs/layouts/shortcodes/generated/other_functions.html b/docs/layouts/shortcodes/generated/other_functions.html
index f367c97d0b97..2135fc8172d3 100644
--- a/docs/layouts/shortcodes/generated/other_functions.html
+++ b/docs/layouts/shortcodes/generated/other_functions.html
@@ -39,5 +39,9 @@
If the column is an INT or LONG, truncate(column,width) will truncate the number with the algorithm `v - (((v % W) + W) % W)`. The `redundant` compute part is to keep the result always positive.
If the column is a DECIMAL, truncate(column,width) will truncate the decimal with the algorithm: let `scaled_W = decimal(W, scale(v))`, then return `v - (v % scaled_W)`.
+
+ cast(value,dataType) |
+ Get a constant value. The output is an atomic type, such as STRING, INT, BOOLEAN, etc. |
+
\ No newline at end of file
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
index 0af68df9de2b..1afde1b2db03 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
@@ -76,7 +76,7 @@ public static DataType parseDataType(JsonNode json) {
throw new IllegalArgumentException("Can not parse: " + json);
}
- private static DataType parseAtomicTypeSQLString(String string) {
+ public static DataType parseAtomicTypeSQLString(String string) {
List tokens = tokenize(string);
TokenParser converter = new TokenParser(string, tokens);
return converter.parseTokens();
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
index 3b262fafdae8..5e9041a12072 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
@@ -48,6 +48,7 @@ public DataType columnType() {
return expression.outputType();
}
+ @Nullable
public String fieldReference() {
return expression.fieldReference();
}
@@ -55,7 +56,7 @@ public String fieldReference() {
/** Compute column's value from given argument. Return null if input is null. */
@Nullable
public String eval(@Nullable String input) {
- if (input == null) {
+ if (fieldReference() != null && input == null) {
return null;
}
return expression.eval(input);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
index a18355ca6015..24ca0599bfed 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
@@ -21,10 +21,8 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.StringUtils;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -71,25 +69,10 @@ public static List buildComputedColumns(
String[] args = expression.substring(left + 1, right).split(",");
checkArgument(args.length >= 1, "Computed column needs at least one argument.");
- String fieldReference = args[0].trim();
- String[] literals =
- Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new);
- String fieldReferenceCheckForm =
- StringUtils.caseSensitiveConversion(fieldReference, caseSensitive);
- checkArgument(
- typeMapping.containsKey(fieldReferenceCheckForm),
- String.format(
- "Referenced field '%s' is not in given fields: %s.",
- fieldReferenceCheckForm, typeMapping.keySet()));
-
computedColumns.add(
new ComputedColumn(
columnName,
- Expression.create(
- exprName,
- fieldReference,
- typeMapping.get(fieldReferenceCheckForm),
- literals)));
+ Expression.create(typeMapping, caseSensitive, exprName, args)));
}
return computedColumns;
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 417758f4bc95..2e0a1319293f 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -21,9 +21,11 @@
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeFamily;
+import org.apache.paimon.types.DataTypeJsonParser;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SerializableSupplier;
+import org.apache.paimon.utils.StringUtils;
import javax.annotation.Nullable;
@@ -34,25 +36,16 @@
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Produce a computation result for computed column. */
public interface Expression extends Serializable {
- List SUPPORTED_EXPRESSION =
- Arrays.asList(
- "year",
- "month",
- "day",
- "hour",
- "minute",
- "second",
- "date_format",
- "substring",
- "truncate");
-
/** Return name of referenced field. */
String fieldReference();
@@ -62,40 +55,179 @@ public interface Expression extends Serializable {
/** Compute value from given input. Input and output are serialized to string. */
String eval(String input);
+ /** Expression function. */
+ enum ExpressionFunction {
+ YEAR(
+ (typeMapping, caseSensitive, args) -> {
+ ReferencedField referencedField =
+ ReferencedField.checkArgument(typeMapping, caseSensitive, args);
+ return TemporalToIntConverter.create(
+ referencedField.field(),
+ referencedField.fieldType(),
+ () -> LocalDateTime::getYear,
+ referencedField.literals());
+ }),
+ MONTH(
+ (typeMapping, caseSensitive, args) -> {
+ ReferencedField referencedField =
+ ReferencedField.checkArgument(typeMapping, caseSensitive, args);
+ return TemporalToIntConverter.create(
+ referencedField.field(),
+ referencedField.fieldType(),
+ () -> LocalDateTime::getMonthValue,
+ referencedField.literals());
+ }),
+ DAY(
+ (typeMapping, caseSensitive, args) -> {
+ ReferencedField referencedField =
+ ReferencedField.checkArgument(typeMapping, caseSensitive, args);
+ return TemporalToIntConverter.create(
+ referencedField.field(),
+ referencedField.fieldType(),
+ () -> LocalDateTime::getDayOfMonth,
+ referencedField.literals());
+ }),
+ HOUR(
+ (typeMapping, caseSensitive, args) -> {
+ ReferencedField referencedField =
+ ReferencedField.checkArgument(typeMapping, caseSensitive, args);
+ return TemporalToIntConverter.create(
+ referencedField.field(),
+ referencedField.fieldType(),
+ () -> LocalDateTime::getHour,
+ referencedField.literals());
+ }),
+ MINUTE(
+ (typeMapping, caseSensitive, args) -> {
+ ReferencedField referencedField =
+ ReferencedField.checkArgument(typeMapping, caseSensitive, args);
+ return TemporalToIntConverter.create(
+ referencedField.field(),
+ referencedField.fieldType(),
+ () -> LocalDateTime::getMinute,
+ referencedField.literals());
+ }),
+ SECOND(
+ (typeMapping, caseSensitive, args) -> {
+ ReferencedField referencedField =
+ ReferencedField.checkArgument(typeMapping, caseSensitive, args);
+ return TemporalToIntConverter.create(
+ referencedField.field(),
+ referencedField.fieldType(),
+ () -> LocalDateTime::getSecond,
+ referencedField.literals());
+ }),
+ DATE_FORMAT(
+ (typeMapping, caseSensitive, args) -> {
+ ReferencedField referencedField =
+ ReferencedField.checkArgument(typeMapping, caseSensitive, args);
+ return DateFormat.create(
+ referencedField.field(),
+ referencedField.fieldType(),
+ referencedField.literals());
+ }),
+ SUBSTRING(
+ (typeMapping, caseSensitive, args) -> {
+ ReferencedField referencedField =
+ ReferencedField.checkArgument(typeMapping, caseSensitive, args);
+ return substring(referencedField.field(), referencedField.literals());
+ }),
+ TRUNCATE(
+ (typeMapping, caseSensitive, args) -> {
+ ReferencedField referencedField =
+ ReferencedField.checkArgument(typeMapping, caseSensitive, args);
+ return truncate(
+ referencedField.field(),
+ referencedField.fieldType(),
+ referencedField.literals());
+ }),
+ CAST((typeMapping, caseSensitive, args) -> cast(args));
+
+ public final ExpressionCreator creator;
+
+ ExpressionFunction(ExpressionCreator creator) {
+ this.creator = creator;
+ }
+
+ public ExpressionCreator getCreator() {
+ return creator;
+ }
+
+ private static final Map EXPRESSION_FUNCTIONS =
+ Arrays.stream(ExpressionFunction.values())
+ .collect(
+ Collectors.toMap(
+ value -> value.name().toLowerCase(),
+ ExpressionFunction::getCreator));
+
+ public static ExpressionCreator creator(String exprName) {
+ return EXPRESSION_FUNCTIONS.get(exprName.toLowerCase());
+ }
+ }
+
+ /** Expression creator. */
+ @FunctionalInterface
+ interface ExpressionCreator {
+ Expression create(Map typeMapping, boolean caseSensitive, String[] args);
+ }
+
+ /** Referenced field in expression input parameters. */
+ class ReferencedField {
+ private final String field;
+ private final DataType fieldType;
+ private final String[] literals;
+
+ private ReferencedField(String field, DataType fieldType, String[] literals) {
+ this.field = field;
+ this.fieldType = fieldType;
+ this.literals = literals;
+ }
+
+ public static ReferencedField checkArgument(
+ Map typeMapping, boolean caseSensitive, String... args) {
+ String referencedField = args[0].trim();
+ String[] literals =
+ Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new);
+ String referencedFieldCheckForm =
+ StringUtils.caseSensitiveConversion(referencedField, caseSensitive);
+
+ DataType fieldType =
+ checkNotNull(
+ typeMapping.get(referencedFieldCheckForm),
+ String.format(
+ "Referenced field '%s' is not in given fields: %s.",
+ referencedFieldCheckForm, typeMapping.keySet()));
+ return new ReferencedField(referencedField, fieldType, literals);
+ }
+
+ public String field() {
+ return field;
+ }
+
+ public DataType fieldType() {
+ return fieldType;
+ }
+
+ public String[] literals() {
+ return literals;
+ }
+ }
+
static Expression create(
- String exprName, String fieldReference, DataType fieldType, String... literals) {
- switch (exprName.toLowerCase()) {
- case "year":
- return TemporalToIntConverter.create(
- fieldReference, fieldType, () -> LocalDateTime::getYear, literals);
- case "month":
- return TemporalToIntConverter.create(
- fieldReference, fieldType, () -> LocalDateTime::getMonthValue, literals);
- case "day":
- return TemporalToIntConverter.create(
- fieldReference, fieldType, () -> LocalDateTime::getDayOfMonth, literals);
- case "hour":
- return TemporalToIntConverter.create(
- fieldReference, fieldType, () -> LocalDateTime::getHour, literals);
- case "minute":
- return TemporalToIntConverter.create(
- fieldReference, fieldType, () -> LocalDateTime::getMinute, literals);
- case "second":
- return TemporalToIntConverter.create(
- fieldReference, fieldType, () -> LocalDateTime::getSecond, literals);
- case "date_format":
- return DateFormat.create(fieldReference, fieldType, literals);
- case "substring":
- return substring(fieldReference, literals);
- case "truncate":
- return truncate(fieldReference, fieldType, literals);
- // TODO: support more expression
- default:
- throw new UnsupportedOperationException(
- String.format(
- "Unsupported expression: %s. Supported expressions are: %s",
- exprName, String.join(",", SUPPORTED_EXPRESSION)));
+ Map typeMapping,
+ boolean caseSensitive,
+ String exprName,
+ String... args) {
+
+ ExpressionCreator function = ExpressionFunction.creator(exprName.toLowerCase());
+ if (function == null) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported expression: %s. Supported expressions are: %s",
+ exprName,
+ String.join(",", ExpressionFunction.EXPRESSION_FUNCTIONS.keySet())));
}
+ return function.create(typeMapping, caseSensitive, args);
}
static Expression substring(String fieldReference, String... literals) {
@@ -137,6 +269,21 @@ static Expression truncate(String fieldReference, DataType fieldType, String...
return new TruncateComputer(fieldReference, fieldType, literals[0]);
}
+ static Expression cast(String... literals) {
+ checkArgument(
+ literals.length == 1 || literals.length == 2,
+ String.format(
+ "'cast' expression supports one or two arguments, but found '%s'.",
+ literals.length));
+ DataType dataType = DataTypes.STRING();
+ if (literals.length == 2) {
+ dataType = DataTypeJsonParser.parseAtomicTypeSQLString(literals[1]);
+ }
+ return new CastExpression(literals[0], dataType);
+ }
+
+ // ======================== Expression Implementations ========================
+
/** Expression to handle temporal value. */
abstract class TemporalExpressionBase implements Expression {
@@ -431,4 +578,34 @@ private BigDecimal truncateDecimal(BigInteger unscaledWidth, BigDecimal value) {
return value.subtract(remainder);
}
}
+
+ /** Get constant value. */
+ final class CastExpression implements Expression {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String value;
+
+ private final DataType dataType;
+
+ private CastExpression(String value, DataType dataType) {
+ this.value = value;
+ this.dataType = dataType;
+ }
+
+ @Override
+ public String fieldReference() {
+ return null;
+ }
+
+ @Override
+ public DataType outputType() {
+ return dataType;
+ }
+
+ @Override
+ public String eval(String input) {
+ return value;
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index fb54763cb7c6..67358f848c79 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -734,7 +734,8 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception {
"_date_format_timestamp=date_format(_timestamp,yyyyMMdd)",
"_substring_date1=substring(_date,2)",
"_substring_date2=substring(_timestamp,5,10)",
- "_truncate_date=trUNcate(pk,2)"); // test case-insensitive too
+ "_truncate_date=trUNcate(pk,2)", // test case-insensitive too
+ "_constant=cast(11,INT)");
MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig)
@@ -785,7 +786,8 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
- DataTypes.INT().notNull()
+ DataTypes.INT().notNull(),
+ DataTypes.INT()
},
new String[] {
"pk",
@@ -815,12 +817,13 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception {
"_date_format_timestamp",
"_substring_date1",
"_substring_date2",
- "_truncate_date"
+ "_truncate_date",
+ "_constant"
});
List expected =
Arrays.asList(
- "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0]",
- "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2]");
+ "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0, 11]",
+ "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2, 11]");
waitForResult(expected, table, rowType, Arrays.asList("pk", "_year_date"));
}