Skip to content

Commit

Permalink
[cdc] Calculated columns support constant value. (#3156)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuangchong authored Apr 10, 2024
1 parent 0be4a21 commit 6f512c0
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 69 deletions.
4 changes: 4 additions & 0 deletions docs/layouts/shortcodes/generated/other_functions.html
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,9 @@
If the column is an INT or LONG, truncate(column,width) will truncate the number with the algorithm `v - (((v % W) + W) % W)`. The `redundant` compute part is to keep the result always positive.
If the column is a DECIMAL, truncate(column,width) will truncate the decimal with the algorithm: let `scaled_W = decimal(W, scale(v))`, then return `v - (v % scaled_W)`.</td>
</tr>
<tr>
<td><h5>cast(value,dataType)</h5></td>
<td>Get a constant value. The output is an atomic type, such as STRING, INT, BOOLEAN, etc.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static DataType parseDataType(JsonNode json) {
throw new IllegalArgumentException("Can not parse: " + json);
}

private static DataType parseAtomicTypeSQLString(String string) {
public static DataType parseAtomicTypeSQLString(String string) {
List<Token> tokens = tokenize(string);
TokenParser converter = new TokenParser(string, tokens);
return converter.parseTokens();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ public DataType columnType() {
return expression.outputType();
}

@Nullable
public String fieldReference() {
return expression.fieldReference();
}

/** Compute column's value from given argument. Return null if input is null. */
@Nullable
public String eval(@Nullable String input) {
if (input == null) {
if (fieldReference() != null && input == null) {
return null;
}
return expression.eval(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -71,25 +69,10 @@ public static List<ComputedColumn> buildComputedColumns(
String[] args = expression.substring(left + 1, right).split(",");
checkArgument(args.length >= 1, "Computed column needs at least one argument.");

String fieldReference = args[0].trim();
String[] literals =
Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new);
String fieldReferenceCheckForm =
StringUtils.caseSensitiveConversion(fieldReference, caseSensitive);
checkArgument(
typeMapping.containsKey(fieldReferenceCheckForm),
String.format(
"Referenced field '%s' is not in given fields: %s.",
fieldReferenceCheckForm, typeMapping.keySet()));

computedColumns.add(
new ComputedColumn(
columnName,
Expression.create(
exprName,
fieldReference,
typeMapping.get(fieldReferenceCheckForm),
literals)));
Expression.create(typeMapping, caseSensitive, exprName, args)));
}

return computedColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.DataTypeJsonParser;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;

Expand All @@ -34,25 +36,16 @@
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Produce a computation result for computed column. */
public interface Expression extends Serializable {

List<String> SUPPORTED_EXPRESSION =
Arrays.asList(
"year",
"month",
"day",
"hour",
"minute",
"second",
"date_format",
"substring",
"truncate");

/** Return name of referenced field. */
String fieldReference();

Expand All @@ -62,40 +55,179 @@ public interface Expression extends Serializable {
/** Compute value from given input. Input and output are serialized to string. */
String eval(String input);

/** Expression function. */
enum ExpressionFunction {
YEAR(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
() -> LocalDateTime::getYear,
referencedField.literals());
}),
MONTH(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
() -> LocalDateTime::getMonthValue,
referencedField.literals());
}),
DAY(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
() -> LocalDateTime::getDayOfMonth,
referencedField.literals());
}),
HOUR(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
() -> LocalDateTime::getHour,
referencedField.literals());
}),
MINUTE(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
() -> LocalDateTime::getMinute,
referencedField.literals());
}),
SECOND(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
() -> LocalDateTime::getSecond,
referencedField.literals());
}),
DATE_FORMAT(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return DateFormat.create(
referencedField.field(),
referencedField.fieldType(),
referencedField.literals());
}),
SUBSTRING(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return substring(referencedField.field(), referencedField.literals());
}),
TRUNCATE(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return truncate(
referencedField.field(),
referencedField.fieldType(),
referencedField.literals());
}),
CAST((typeMapping, caseSensitive, args) -> cast(args));

public final ExpressionCreator creator;

ExpressionFunction(ExpressionCreator creator) {
this.creator = creator;
}

public ExpressionCreator getCreator() {
return creator;
}

private static final Map<String, ExpressionCreator> EXPRESSION_FUNCTIONS =
Arrays.stream(ExpressionFunction.values())
.collect(
Collectors.toMap(
value -> value.name().toLowerCase(),
ExpressionFunction::getCreator));

public static ExpressionCreator creator(String exprName) {
return EXPRESSION_FUNCTIONS.get(exprName.toLowerCase());
}
}

/** Expression creator. */
@FunctionalInterface
interface ExpressionCreator {
Expression create(Map<String, DataType> typeMapping, boolean caseSensitive, String[] args);
}

/** Referenced field in expression input parameters. */
class ReferencedField {
private final String field;
private final DataType fieldType;
private final String[] literals;

private ReferencedField(String field, DataType fieldType, String[] literals) {
this.field = field;
this.fieldType = fieldType;
this.literals = literals;
}

public static ReferencedField checkArgument(
Map<String, DataType> typeMapping, boolean caseSensitive, String... args) {
String referencedField = args[0].trim();
String[] literals =
Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new);
String referencedFieldCheckForm =
StringUtils.caseSensitiveConversion(referencedField, caseSensitive);

DataType fieldType =
checkNotNull(
typeMapping.get(referencedFieldCheckForm),
String.format(
"Referenced field '%s' is not in given fields: %s.",
referencedFieldCheckForm, typeMapping.keySet()));
return new ReferencedField(referencedField, fieldType, literals);
}

public String field() {
return field;
}

public DataType fieldType() {
return fieldType;
}

public String[] literals() {
return literals;
}
}

static Expression create(
String exprName, String fieldReference, DataType fieldType, String... literals) {
switch (exprName.toLowerCase()) {
case "year":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getYear, literals);
case "month":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getMonthValue, literals);
case "day":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getDayOfMonth, literals);
case "hour":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getHour, literals);
case "minute":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getMinute, literals);
case "second":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getSecond, literals);
case "date_format":
return DateFormat.create(fieldReference, fieldType, literals);
case "substring":
return substring(fieldReference, literals);
case "truncate":
return truncate(fieldReference, fieldType, literals);
// TODO: support more expression
default:
throw new UnsupportedOperationException(
String.format(
"Unsupported expression: %s. Supported expressions are: %s",
exprName, String.join(",", SUPPORTED_EXPRESSION)));
Map<String, DataType> typeMapping,
boolean caseSensitive,
String exprName,
String... args) {

ExpressionCreator function = ExpressionFunction.creator(exprName.toLowerCase());
if (function == null) {
throw new UnsupportedOperationException(
String.format(
"Unsupported expression: %s. Supported expressions are: %s",
exprName,
String.join(",", ExpressionFunction.EXPRESSION_FUNCTIONS.keySet())));
}
return function.create(typeMapping, caseSensitive, args);
}

static Expression substring(String fieldReference, String... literals) {
Expand Down Expand Up @@ -137,6 +269,21 @@ static Expression truncate(String fieldReference, DataType fieldType, String...
return new TruncateComputer(fieldReference, fieldType, literals[0]);
}

static Expression cast(String... literals) {
checkArgument(
literals.length == 1 || literals.length == 2,
String.format(
"'cast' expression supports one or two arguments, but found '%s'.",
literals.length));
DataType dataType = DataTypes.STRING();
if (literals.length == 2) {
dataType = DataTypeJsonParser.parseAtomicTypeSQLString(literals[1]);
}
return new CastExpression(literals[0], dataType);
}

// ======================== Expression Implementations ========================

/** Expression to handle temporal value. */
abstract class TemporalExpressionBase<T> implements Expression {

Expand Down Expand Up @@ -431,4 +578,34 @@ private BigDecimal truncateDecimal(BigInteger unscaledWidth, BigDecimal value) {
return value.subtract(remainder);
}
}

/** Get constant value. */
final class CastExpression implements Expression {

private static final long serialVersionUID = 1L;

private final String value;

private final DataType dataType;

private CastExpression(String value, DataType dataType) {
this.value = value;
this.dataType = dataType;
}

@Override
public String fieldReference() {
return null;
}

@Override
public DataType outputType() {
return dataType;
}

@Override
public String eval(String input) {
return value;
}
}
}
Loading

0 comments on commit 6f512c0

Please sign in to comment.