From 2ca746da80152a79eea8f078f49312cfc654aef7 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Wed, 3 Apr 2024 17:41:11 +0800 Subject: [PATCH 1/4] Calculated columns support constant value. --- .../shortcodes/generated/other_functions.html | 4 + .../flink/action/cdc/ComputedColumn.java | 2 +- .../flink/action/cdc/ComputedColumnUtils.java | 9 + .../paimon/flink/action/cdc/Expression.java | 160 +++++++++++++----- .../cdc/mysql/MySqlSyncTableActionITCase.java | 13 +- 5 files changed, 139 insertions(+), 49 deletions(-) diff --git a/docs/layouts/shortcodes/generated/other_functions.html b/docs/layouts/shortcodes/generated/other_functions.html index f367c97d0b97..37a2957218bd 100644 --- a/docs/layouts/shortcodes/generated/other_functions.html +++ b/docs/layouts/shortcodes/generated/other_functions.html @@ -39,5 +39,9 @@ If the column is an INT or LONG, truncate(column,width) will truncate the number with the algorithm `v - (((v % W) + W) % W)`. The `redundant` compute part is to keep the result always positive. If the column is a DECIMAL, truncate(column,width) will truncate the decimal with the algorithm: let `scaled_W = decimal(W, scale(v))`, then return `v - (v % scaled_W)`. + +
constant(value)
+ Get constant value. Output is a STRING. + \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java index 3b262fafdae8..969f854171c9 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java @@ -55,7 +55,7 @@ public String fieldReference() { /** Compute column's value from given argument. Return null if input is null. */ @Nullable public String eval(@Nullable String input) { - if (input == null) { + if (fieldReference() != null && input == null) { return null; } return expression.eval(input); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java index a18355ca6015..c4ebfd6bc512 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java @@ -71,6 +71,15 @@ public static List buildComputedColumns( String[] args = expression.substring(left + 1, right).split(","); checkArgument(args.length >= 1, "Computed column needs at least one argument."); + // handle constant expression + if (Expression.ExpressionProcessor.CONSTANT.name().equalsIgnoreCase(exprName)) { + computedColumns.add( + new ComputedColumn( + columnName, + Expression.create(exprName, null, null, args[0].trim()))); + continue; + } + String fieldReference = args[0].trim(); String[] literals = Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java index 417758f4bc95..a3095ac7f263 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java @@ -34,25 +34,15 @@ import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Produce a computation result for computed column. */ public interface Expression extends Serializable { - List SUPPORTED_EXPRESSION = - Arrays.asList( - "year", - "month", - "day", - "hour", - "minute", - "second", - "date_format", - "substring", - "truncate"); - /** Return name of referenced field. */ String fieldReference(); @@ -62,40 +52,88 @@ public interface Expression extends Serializable { /** Compute value from given input. Input and output are serialized to string. */ String eval(String input); + enum ExpressionProcessor { + YEAR( + (fieldReference, fieldType, literals) -> + TemporalToIntConverter.create( + fieldReference, fieldType, () -> LocalDateTime::getYear, literals)), + MONTH( + (fieldReference, fieldType, literals) -> + TemporalToIntConverter.create( + fieldReference, + fieldType, + () -> LocalDateTime::getMonthValue, + literals)), + DAY( + (fieldReference, fieldType, literals) -> + TemporalToIntConverter.create( + fieldReference, + fieldType, + () -> LocalDateTime::getDayOfMonth, + literals)), + HOUR( + (fieldReference, fieldType, literals) -> + TemporalToIntConverter.create( + fieldReference, fieldType, () -> LocalDateTime::getHour, literals)), + MINUTE( + (fieldReference, fieldType, literals) -> + TemporalToIntConverter.create( + fieldReference, + fieldType, + () -> LocalDateTime::getMinute, + literals)), + SECOND( + (fieldReference, fieldType, literals) -> + TemporalToIntConverter.create( + fieldReference, + fieldType, + () -> LocalDateTime::getSecond, + literals)), + DATE_FORMAT(DateFormat::create), + SUBSTRING((fieldReference, fieldType, literals) -> substring(fieldReference, literals)), + TRUNCATE(Expression::truncate), + CONSTANT((fieldReference, fieldType, literals) -> constant(literals)); + + private final ExpressionFactory factory; + + ExpressionProcessor(ExpressionFactory factory) { + this.factory = factory; + } + + private ExpressionFactory factory() { + return this.factory; + } + + private static final Map EXPRESSION_FACTORIES = + Arrays.stream(ExpressionProcessor.values()) + .collect( + Collectors.toMap( + value -> value.name().toLowerCase(), + ExpressionProcessor::factory)); + + public static ExpressionFactory factory(String exprName) { + return EXPRESSION_FACTORIES.get(exprName.toLowerCase()); + } + } + + @FunctionalInterface + interface ExpressionFactory { + Expression create(String fieldReference, DataType fieldType, String... literals); + } + static Expression create( String exprName, String fieldReference, DataType fieldType, String... literals) { - switch (exprName.toLowerCase()) { - case "year": - return TemporalToIntConverter.create( - fieldReference, fieldType, () -> LocalDateTime::getYear, literals); - case "month": - return TemporalToIntConverter.create( - fieldReference, fieldType, () -> LocalDateTime::getMonthValue, literals); - case "day": - return TemporalToIntConverter.create( - fieldReference, fieldType, () -> LocalDateTime::getDayOfMonth, literals); - case "hour": - return TemporalToIntConverter.create( - fieldReference, fieldType, () -> LocalDateTime::getHour, literals); - case "minute": - return TemporalToIntConverter.create( - fieldReference, fieldType, () -> LocalDateTime::getMinute, literals); - case "second": - return TemporalToIntConverter.create( - fieldReference, fieldType, () -> LocalDateTime::getSecond, literals); - case "date_format": - return DateFormat.create(fieldReference, fieldType, literals); - case "substring": - return substring(fieldReference, literals); - case "truncate": - return truncate(fieldReference, fieldType, literals); - // TODO: support more expression - default: - throw new UnsupportedOperationException( - String.format( - "Unsupported expression: %s. Supported expressions are: %s", - exprName, String.join(",", SUPPORTED_EXPRESSION))); + + ExpressionFactory factory = ExpressionProcessor.factory(exprName.toLowerCase()); + if (factory == null) { + throw new UnsupportedOperationException( + String.format( + "Unsupported expression: %s. Supported expressions are: %s", + exprName, + String.join(",", ExpressionProcessor.EXPRESSION_FACTORIES.keySet()))); } + + return factory.create(fieldReference, fieldType, literals); } static Expression substring(String fieldReference, String... literals) { @@ -137,6 +175,15 @@ static Expression truncate(String fieldReference, DataType fieldType, String... return new TruncateComputer(fieldReference, fieldType, literals[0]); } + static Expression constant(String... literals) { + checkArgument( + literals.length == 1, + String.format( + "'constant' expression supports one argument, but found '%s'.", + literals.length)); + return new Constant(literals[0]); + } + /** Expression to handle temporal value. */ abstract class TemporalExpressionBase implements Expression { @@ -431,4 +478,31 @@ private BigDecimal truncateDecimal(BigInteger unscaledWidth, BigDecimal value) { return value.subtract(remainder); } } + + /** Get constant value. */ + final class Constant implements Expression { + + private static final long serialVersionUID = 1L; + + private final String value; + + private Constant(String value) { + this.value = value; + } + + @Override + public String fieldReference() { + return null; + } + + @Override + public DataType outputType() { + return DataTypes.STRING(); + } + + @Override + public String eval(String input) { + return value; + } + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index fb54763cb7c6..0b03912a16f1 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -734,7 +734,8 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception { "_date_format_timestamp=date_format(_timestamp,yyyyMMdd)", "_substring_date1=substring(_date,2)", "_substring_date2=substring(_timestamp,5,10)", - "_truncate_date=trUNcate(pk,2)"); // test case-insensitive too + "_truncate_date=trUNcate(pk,2)", // test case-insensitive too + "_constant=constant(ApachePaimon)"); MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) @@ -785,7 +786,8 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), - DataTypes.INT().notNull() + DataTypes.INT().notNull(), + DataTypes.STRING() }, new String[] { "pk", @@ -815,12 +817,13 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception { "_date_format_timestamp", "_substring_date1", "_substring_date2", - "_truncate_date" + "_truncate_date", + "_constant" }); List expected = Arrays.asList( - "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0]", - "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2]"); + "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0, ApachePaimon]", + "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2, ApachePaimon]"); waitForResult(expected, table, rowType, Arrays.asList("pk", "_year_date")); } From 3fab154f803e1a7685482f6e9981462587146e0e Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Wed, 3 Apr 2024 18:24:05 +0800 Subject: [PATCH 2/4] add comment --- .../java/org/apache/paimon/flink/action/cdc/Expression.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java index a3095ac7f263..7e75c23f2edb 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java @@ -52,6 +52,7 @@ public interface Expression extends Serializable { /** Compute value from given input. Input and output are serialized to string. */ String eval(String input); + /** Enumerates the expression processing behaviors for computed column. */ enum ExpressionProcessor { YEAR( (fieldReference, fieldType, literals) -> @@ -116,6 +117,7 @@ public static ExpressionFactory factory(String exprName) { } } + /** Define the function interface for creating Expression. */ @FunctionalInterface interface ExpressionFactory { Expression create(String fieldReference, DataType fieldType, String... literals); From ae31e5a3dcc86d0233e9b7eea766b27f642230da Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Mon, 8 Apr 2024 15:11:55 +0800 Subject: [PATCH 3/4] add cast function --- .../shortcodes/generated/other_functions.html | 4 +- .../paimon/types/DataTypeJsonParser.java | 2 +- .../flink/action/cdc/ComputedColumn.java | 1 + .../flink/action/cdc/ComputedColumnUtils.java | 28 +-- .../paimon/flink/action/cdc/Expression.java | 235 +++++++++++++----- .../cdc/mysql/MySqlSyncTableActionITCase.java | 8 +- 6 files changed, 177 insertions(+), 101 deletions(-) diff --git a/docs/layouts/shortcodes/generated/other_functions.html b/docs/layouts/shortcodes/generated/other_functions.html index 37a2957218bd..2135fc8172d3 100644 --- a/docs/layouts/shortcodes/generated/other_functions.html +++ b/docs/layouts/shortcodes/generated/other_functions.html @@ -40,8 +40,8 @@ If the column is a DECIMAL, truncate(column,width) will truncate the decimal with the algorithm: let `scaled_W = decimal(W, scale(v))`, then return `v - (v % scaled_W)`. -
constant(value)
- Get constant value. Output is a STRING. +
cast(value,dataType)
+ Get a constant value. The output is an atomic type, such as STRING, INT, BOOLEAN, etc. \ No newline at end of file diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java index 0af68df9de2b..1afde1b2db03 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java @@ -76,7 +76,7 @@ public static DataType parseDataType(JsonNode json) { throw new IllegalArgumentException("Can not parse: " + json); } - private static DataType parseAtomicTypeSQLString(String string) { + public static DataType parseAtomicTypeSQLString(String string) { List tokens = tokenize(string); TokenParser converter = new TokenParser(string, tokens); return converter.parseTokens(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java index 969f854171c9..5e9041a12072 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java @@ -48,6 +48,7 @@ public DataType columnType() { return expression.outputType(); } + @Nullable public String fieldReference() { return expression.fieldReference(); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java index c4ebfd6bc512..24ca0599bfed 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java @@ -21,10 +21,8 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.utils.StringUtils; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -71,34 +69,10 @@ public static List buildComputedColumns( String[] args = expression.substring(left + 1, right).split(","); checkArgument(args.length >= 1, "Computed column needs at least one argument."); - // handle constant expression - if (Expression.ExpressionProcessor.CONSTANT.name().equalsIgnoreCase(exprName)) { - computedColumns.add( - new ComputedColumn( - columnName, - Expression.create(exprName, null, null, args[0].trim()))); - continue; - } - - String fieldReference = args[0].trim(); - String[] literals = - Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new); - String fieldReferenceCheckForm = - StringUtils.caseSensitiveConversion(fieldReference, caseSensitive); - checkArgument( - typeMapping.containsKey(fieldReferenceCheckForm), - String.format( - "Referenced field '%s' is not in given fields: %s.", - fieldReferenceCheckForm, typeMapping.keySet())); - computedColumns.add( new ComputedColumn( columnName, - Expression.create( - exprName, - fieldReference, - typeMapping.get(fieldReferenceCheckForm), - literals))); + Expression.create(typeMapping, caseSensitive, exprName, args))); } return computedColumns; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java index 7e75c23f2edb..bb040b0d4fd2 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java @@ -21,9 +21,11 @@ import org.apache.paimon.data.Timestamp; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeFamily; +import org.apache.paimon.types.DataTypeJsonParser; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.SerializableSupplier; +import org.apache.paimon.utils.StringUtils; import javax.annotation.Nullable; @@ -39,6 +41,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Produce a computation result for computed column. */ public interface Expression extends Serializable { @@ -52,90 +55,179 @@ public interface Expression extends Serializable { /** Compute value from given input. Input and output are serialized to string. */ String eval(String input); - /** Enumerates the expression processing behaviors for computed column. */ - enum ExpressionProcessor { + /** Expression function. */ + enum ExpressionFunction { YEAR( - (fieldReference, fieldType, literals) -> - TemporalToIntConverter.create( - fieldReference, fieldType, () -> LocalDateTime::getYear, literals)), + (typeMapping, caseSensitive, args) -> { + ReferencedField referencedField = + ReferencedField.checkArgument(typeMapping, caseSensitive, args); + return TemporalToIntConverter.create( + referencedField.field(), + referencedField.fieldType(), + () -> LocalDateTime::getYear, + referencedField.literals()); + }), MONTH( - (fieldReference, fieldType, literals) -> - TemporalToIntConverter.create( - fieldReference, - fieldType, - () -> LocalDateTime::getMonthValue, - literals)), + (typeMapping, caseSensitive, args) -> { + ReferencedField referencedField = + ReferencedField.checkArgument(typeMapping, caseSensitive, args); + return TemporalToIntConverter.create( + referencedField.field(), + referencedField.fieldType(), + () -> LocalDateTime::getMonthValue, + referencedField.literals()); + }), DAY( - (fieldReference, fieldType, literals) -> - TemporalToIntConverter.create( - fieldReference, - fieldType, - () -> LocalDateTime::getDayOfMonth, - literals)), + (typeMapping, caseSensitive, args) -> { + ReferencedField referencedField = + ReferencedField.checkArgument(typeMapping, caseSensitive, args); + return TemporalToIntConverter.create( + referencedField.field(), + referencedField.fieldType(), + () -> LocalDateTime::getDayOfMonth, + referencedField.literals()); + }), HOUR( - (fieldReference, fieldType, literals) -> - TemporalToIntConverter.create( - fieldReference, fieldType, () -> LocalDateTime::getHour, literals)), + (typeMapping, caseSensitive, args) -> { + ReferencedField referencedField = + ReferencedField.checkArgument(typeMapping, caseSensitive, args); + return TemporalToIntConverter.create( + referencedField.field(), + referencedField.fieldType(), + () -> LocalDateTime::getHour, + referencedField.literals()); + }), MINUTE( - (fieldReference, fieldType, literals) -> - TemporalToIntConverter.create( - fieldReference, - fieldType, - () -> LocalDateTime::getMinute, - literals)), + (typeMapping, caseSensitive, args) -> { + ReferencedField referencedField = + ReferencedField.checkArgument(typeMapping, caseSensitive, args); + return TemporalToIntConverter.create( + referencedField.field(), + referencedField.fieldType(), + () -> LocalDateTime::getMinute, + referencedField.literals()); + }), SECOND( - (fieldReference, fieldType, literals) -> - TemporalToIntConverter.create( - fieldReference, - fieldType, - () -> LocalDateTime::getSecond, - literals)), - DATE_FORMAT(DateFormat::create), - SUBSTRING((fieldReference, fieldType, literals) -> substring(fieldReference, literals)), - TRUNCATE(Expression::truncate), - CONSTANT((fieldReference, fieldType, literals) -> constant(literals)); + (typeMapping, caseSensitive, args) -> { + ReferencedField referencedField = + ReferencedField.checkArgument(typeMapping, caseSensitive, args); + return TemporalToIntConverter.create( + referencedField.field(), + referencedField.fieldType(), + () -> LocalDateTime::getSecond, + referencedField.literals()); + }), + DATE_FORMAT( + (typeMapping, caseSensitive, args) -> { + ReferencedField referencedField = + ReferencedField.checkArgument(typeMapping, caseSensitive, args); + return DateFormat.create( + referencedField.field(), + referencedField.fieldType(), + referencedField.literals()); + }), + SUBSTRING( + (typeMapping, caseSensitive, args) -> { + ReferencedField referencedField = + ReferencedField.checkArgument(typeMapping, caseSensitive, args); + return substring(referencedField.field(), referencedField.literals()); + }), + TRUNCATE( + (typeMapping, caseSensitive, args) -> { + ReferencedField referencedField = + ReferencedField.checkArgument(typeMapping, caseSensitive, args); + return truncate( + referencedField.field(), + referencedField.fieldType(), + referencedField.literals()); + }), + CAST((typeMapping, caseSensitive, args) -> cast(args)); + + public final ExpressionCreator creator; + + ExpressionFunction(ExpressionCreator creator) { + this.creator = creator; + } + + public ExpressionCreator getCreator() { + return creator; + } + + private static final Map EXPRESSION_FUNCTIONS = + Arrays.stream(ExpressionFunction.values()) + .collect( + Collectors.toMap( + value -> value.name().toLowerCase(), + ExpressionFunction::getCreator)); + + public static ExpressionCreator creator(String exprName) { + return EXPRESSION_FUNCTIONS.get(exprName.toLowerCase()); + } + } + + /** Expression creator. */ + @FunctionalInterface + interface ExpressionCreator { + Expression create(Map typeMapping, boolean caseSensitive, String[] args); + } - private final ExpressionFactory factory; + /** Referenced field in expression input parameters. */ + class ReferencedField { + private final String field; + private final DataType fieldType; + private final String[] literals; - ExpressionProcessor(ExpressionFactory factory) { - this.factory = factory; + private ReferencedField(String field, DataType fieldType, String[] literals) { + this.field = field; + this.fieldType = fieldType; + this.literals = literals; } - private ExpressionFactory factory() { - return this.factory; + public static ReferencedField checkArgument( + Map typeMapping, boolean caseSensitive, String... args) { + String referencedField = args[0].trim(); + String[] literals = + Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new); + String referencedFieldCheckForm = + StringUtils.caseSensitiveConversion(referencedField, caseSensitive); + + DataType fieldType = + checkNotNull( + typeMapping.get(referencedFieldCheckForm), + String.format( + "Referenced field '%s' is not in given fields: %s.", + referencedFieldCheckForm, typeMapping.keySet())); + return new ReferencedField(referencedFieldCheckForm, fieldType, literals); } - private static final Map EXPRESSION_FACTORIES = - Arrays.stream(ExpressionProcessor.values()) - .collect( - Collectors.toMap( - value -> value.name().toLowerCase(), - ExpressionProcessor::factory)); + public String field() { + return field; + } - public static ExpressionFactory factory(String exprName) { - return EXPRESSION_FACTORIES.get(exprName.toLowerCase()); + public DataType fieldType() { + return fieldType; } - } - /** Define the function interface for creating Expression. */ - @FunctionalInterface - interface ExpressionFactory { - Expression create(String fieldReference, DataType fieldType, String... literals); + public String[] literals() { + return literals; + } } static Expression create( - String exprName, String fieldReference, DataType fieldType, String... literals) { + Map typeMapping, + boolean caseSensitive, + String exprName, + String... args) { - ExpressionFactory factory = ExpressionProcessor.factory(exprName.toLowerCase()); - if (factory == null) { + ExpressionCreator function = ExpressionFunction.creator(exprName.toLowerCase()); + if (function == null) { throw new UnsupportedOperationException( String.format( "Unsupported expression: %s. Supported expressions are: %s", exprName, - String.join(",", ExpressionProcessor.EXPRESSION_FACTORIES.keySet()))); + String.join(",", ExpressionFunction.EXPRESSION_FUNCTIONS.keySet()))); } - - return factory.create(fieldReference, fieldType, literals); + return function.create(typeMapping, caseSensitive, args); } static Expression substring(String fieldReference, String... literals) { @@ -177,15 +269,21 @@ static Expression truncate(String fieldReference, DataType fieldType, String... return new TruncateComputer(fieldReference, fieldType, literals[0]); } - static Expression constant(String... literals) { + static Expression cast(String... literals) { checkArgument( - literals.length == 1, + literals.length == 1 || literals.length == 2, String.format( - "'constant' expression supports one argument, but found '%s'.", + "'cast' expression supports one or two arguments, but found '%s'.", literals.length)); - return new Constant(literals[0]); + DataType dataType = DataTypes.STRING(); + if (literals.length == 2) { + dataType = DataTypeJsonParser.parseAtomicTypeSQLString(literals[1]); + } + return new CastExpression(literals[0], dataType); } + // ======================== Expression Implementations ======================== + /** Expression to handle temporal value. */ abstract class TemporalExpressionBase implements Expression { @@ -482,14 +580,17 @@ private BigDecimal truncateDecimal(BigInteger unscaledWidth, BigDecimal value) { } /** Get constant value. */ - final class Constant implements Expression { + final class CastExpression implements Expression { private static final long serialVersionUID = 1L; private final String value; - private Constant(String value) { + private final DataType dataType; + + private CastExpression(String value, DataType dataType) { this.value = value; + this.dataType = dataType; } @Override @@ -499,7 +600,7 @@ public String fieldReference() { @Override public DataType outputType() { - return DataTypes.STRING(); + return dataType; } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 0b03912a16f1..67358f848c79 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -735,7 +735,7 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception { "_substring_date1=substring(_date,2)", "_substring_date2=substring(_timestamp,5,10)", "_truncate_date=trUNcate(pk,2)", // test case-insensitive too - "_constant=constant(ApachePaimon)"); + "_constant=cast(11,INT)"); MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) @@ -787,7 +787,7 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception { DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT().notNull(), - DataTypes.STRING() + DataTypes.INT() }, new String[] { "pk", @@ -822,8 +822,8 @@ private void innerTestComputedColumn(boolean executeMysql) throws Exception { }); List expected = Arrays.asList( - "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0, ApachePaimon]", - "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2, ApachePaimon]"); + "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0, 11]", + "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2, 11]"); waitForResult(expected, table, rowType, Arrays.asList("pk", "_year_date")); } From c1d4cd8ebcf47703465656ac84b69201209be927 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Mon, 8 Apr 2024 16:29:14 +0800 Subject: [PATCH 4/4] fix --- .../java/org/apache/paimon/flink/action/cdc/Expression.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java index bb040b0d4fd2..2e0a1319293f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java @@ -197,7 +197,7 @@ public static ReferencedField checkArgument( String.format( "Referenced field '%s' is not in given fields: %s.", referencedFieldCheckForm, typeMapping.keySet())); - return new ReferencedField(referencedFieldCheckForm, fieldType, literals); + return new ReferencedField(referencedField, fieldType, literals); } public String field() {