Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc] Calculated columns support constant value. #3156

Merged
merged 4 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/layouts/shortcodes/generated/other_functions.html
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,9 @@
If the column is an INT or LONG, truncate(column,width) will truncate the number with the algorithm `v - (((v % W) + W) % W)`. The `redundant` compute part is to keep the result always positive.
If the column is a DECIMAL, truncate(column,width) will truncate the decimal with the algorithm: let `scaled_W = decimal(W, scale(v))`, then return `v - (v % scaled_W)`.</td>
</tr>
<tr>
<td><h5>cast(value,dataType)</h5></td>
<td>Get a constant value. The output is an atomic type, such as STRING, INT, BOOLEAN, etc.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static DataType parseDataType(JsonNode json) {
throw new IllegalArgumentException("Can not parse: " + json);
}

private static DataType parseAtomicTypeSQLString(String string) {
public static DataType parseAtomicTypeSQLString(String string) {
List<Token> tokens = tokenize(string);
TokenParser converter = new TokenParser(string, tokens);
return converter.parseTokens();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ public DataType columnType() {
return expression.outputType();
}

@Nullable
public String fieldReference() {
return expression.fieldReference();
}

/** Compute column's value from given argument. Return null if input is null. */
@Nullable
public String eval(@Nullable String input) {
if (input == null) {
if (fieldReference() != null && input == null) {
return null;
}
return expression.eval(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -71,25 +69,10 @@ public static List<ComputedColumn> buildComputedColumns(
String[] args = expression.substring(left + 1, right).split(",");
checkArgument(args.length >= 1, "Computed column needs at least one argument.");

String fieldReference = args[0].trim();
String[] literals =
Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new);
String fieldReferenceCheckForm =
StringUtils.caseSensitiveConversion(fieldReference, caseSensitive);
checkArgument(
typeMapping.containsKey(fieldReferenceCheckForm),
String.format(
"Referenced field '%s' is not in given fields: %s.",
fieldReferenceCheckForm, typeMapping.keySet()));

computedColumns.add(
new ComputedColumn(
columnName,
Expression.create(
exprName,
fieldReference,
typeMapping.get(fieldReferenceCheckForm),
literals)));
Expression.create(typeMapping, caseSensitive, exprName, args)));
}

return computedColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.DataTypeJsonParser;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;

Expand All @@ -34,25 +36,16 @@
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Produce a computation result for computed column. */
public interface Expression extends Serializable {

List<String> SUPPORTED_EXPRESSION =
Arrays.asList(
"year",
"month",
"day",
"hour",
"minute",
"second",
"date_format",
"substring",
"truncate");

/** Return name of referenced field. */
String fieldReference();

Expand All @@ -62,40 +55,179 @@ public interface Expression extends Serializable {
/** Compute value from given input. Input and output are serialized to string. */
String eval(String input);

/** Expression function. */
enum ExpressionFunction {
YEAR(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
() -> LocalDateTime::getYear,
referencedField.literals());
}),
MONTH(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
() -> LocalDateTime::getMonthValue,
referencedField.literals());
}),
DAY(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
() -> LocalDateTime::getDayOfMonth,
referencedField.literals());
}),
HOUR(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
() -> LocalDateTime::getHour,
referencedField.literals());
}),
MINUTE(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
() -> LocalDateTime::getMinute,
referencedField.literals());
}),
SECOND(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
() -> LocalDateTime::getSecond,
referencedField.literals());
}),
DATE_FORMAT(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return DateFormat.create(
referencedField.field(),
referencedField.fieldType(),
referencedField.literals());
}),
SUBSTRING(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return substring(referencedField.field(), referencedField.literals());
}),
TRUNCATE(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
ReferencedField.checkArgument(typeMapping, caseSensitive, args);
return truncate(
referencedField.field(),
referencedField.fieldType(),
referencedField.literals());
}),
CAST((typeMapping, caseSensitive, args) -> cast(args));

public final ExpressionCreator creator;

ExpressionFunction(ExpressionCreator creator) {
this.creator = creator;
}

public ExpressionCreator getCreator() {
return creator;
}

private static final Map<String, ExpressionCreator> EXPRESSION_FUNCTIONS =
Arrays.stream(ExpressionFunction.values())
.collect(
Collectors.toMap(
value -> value.name().toLowerCase(),
ExpressionFunction::getCreator));

public static ExpressionCreator creator(String exprName) {
return EXPRESSION_FUNCTIONS.get(exprName.toLowerCase());
}
}

/** Expression creator. */
@FunctionalInterface
interface ExpressionCreator {
Expression create(Map<String, DataType> typeMapping, boolean caseSensitive, String[] args);
}

/** Referenced field in expression input parameters. */
class ReferencedField {
private final String field;
private final DataType fieldType;
private final String[] literals;

private ReferencedField(String field, DataType fieldType, String[] literals) {
this.field = field;
this.fieldType = fieldType;
this.literals = literals;
}

public static ReferencedField checkArgument(
Map<String, DataType> typeMapping, boolean caseSensitive, String... args) {
String referencedField = args[0].trim();
String[] literals =
Arrays.stream(args).skip(1).map(String::trim).toArray(String[]::new);
String referencedFieldCheckForm =
StringUtils.caseSensitiveConversion(referencedField, caseSensitive);

DataType fieldType =
checkNotNull(
typeMapping.get(referencedFieldCheckForm),
String.format(
"Referenced field '%s' is not in given fields: %s.",
referencedFieldCheckForm, typeMapping.keySet()));
return new ReferencedField(referencedField, fieldType, literals);
}

public String field() {
return field;
}

public DataType fieldType() {
return fieldType;
}

public String[] literals() {
return literals;
}
}

static Expression create(
String exprName, String fieldReference, DataType fieldType, String... literals) {
switch (exprName.toLowerCase()) {
case "year":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getYear, literals);
case "month":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getMonthValue, literals);
case "day":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getDayOfMonth, literals);
case "hour":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getHour, literals);
case "minute":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getMinute, literals);
case "second":
return TemporalToIntConverter.create(
fieldReference, fieldType, () -> LocalDateTime::getSecond, literals);
case "date_format":
return DateFormat.create(fieldReference, fieldType, literals);
case "substring":
return substring(fieldReference, literals);
case "truncate":
return truncate(fieldReference, fieldType, literals);
// TODO: support more expression
default:
throw new UnsupportedOperationException(
String.format(
"Unsupported expression: %s. Supported expressions are: %s",
exprName, String.join(",", SUPPORTED_EXPRESSION)));
Map<String, DataType> typeMapping,
boolean caseSensitive,
String exprName,
String... args) {

ExpressionCreator function = ExpressionFunction.creator(exprName.toLowerCase());
if (function == null) {
throw new UnsupportedOperationException(
String.format(
"Unsupported expression: %s. Supported expressions are: %s",
exprName,
String.join(",", ExpressionFunction.EXPRESSION_FUNCTIONS.keySet())));
}
return function.create(typeMapping, caseSensitive, args);
}

static Expression substring(String fieldReference, String... literals) {
Expand Down Expand Up @@ -137,6 +269,21 @@ static Expression truncate(String fieldReference, DataType fieldType, String...
return new TruncateComputer(fieldReference, fieldType, literals[0]);
}

static Expression cast(String... literals) {
checkArgument(
literals.length == 1 || literals.length == 2,
String.format(
"'cast' expression supports one or two arguments, but found '%s'.",
literals.length));
DataType dataType = DataTypes.STRING();
if (literals.length == 2) {
dataType = DataTypeJsonParser.parseAtomicTypeSQLString(literals[1]);
}
return new CastExpression(literals[0], dataType);
}

// ======================== Expression Implementations ========================

/** Expression to handle temporal value. */
abstract class TemporalExpressionBase<T> implements Expression {

Expand Down Expand Up @@ -431,4 +578,34 @@ private BigDecimal truncateDecimal(BigInteger unscaledWidth, BigDecimal value) {
return value.subtract(remainder);
}
}

/** Get constant value. */
final class CastExpression implements Expression {

private static final long serialVersionUID = 1L;

private final String value;

private final DataType dataType;

private CastExpression(String value, DataType dataType) {
this.value = value;
this.dataType = dataType;
}

@Override
public String fieldReference() {
return null;
}

@Override
public DataType outputType() {
return dataType;
}

@Override
public String eval(String input) {
return value;
}
}
}
Loading
Loading