Skip to content

Commit

Permalink
[FLINK-26948][table] Add-ARRAY_SORT-function.
Browse files Browse the repository at this point in the history
[FLINK-26948][table] Add-ARRAY_SORT-function.
  • Loading branch information
hanyuzheng7 authored and dawidwys committed Feb 16, 2024
1 parent 6e93394 commit 620e597
Show file tree
Hide file tree
Showing 11 changed files with 345 additions and 87 deletions.
3 changes: 3 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,9 @@ collection:
- sql: ARRAY_SLICE(array, start_offset[, end_offset])
table: array.arraySlice(start_offset[, end_offset])
description: Returns a subarray of the input array between 'start_offset' and 'end_offset' inclusive. The offsets are 1-based however 0 is also treated as the beginning of the array. Positive values are counted from the beginning of the array while negative from the end. If 'end_offset' is omitted then this offset is treated as the length of the array. If 'start_offset' is after 'end_offset' or both are out of array bounds an empty array will be returned. Returns null if any input is null.
- sql: ARRAY_SORT(array[, ascending_order[, null_first]])
table: array.arraySort([, ascendingOrder[, null_first]])
description: Returns the array in sorted order.The function sorts an array, defaulting to ascending order with NULLs at the start when only the array is input. Specifying ascending_order as true orders the array in ascending with NULLs first, and setting it to false orders it in descending with NULLs last. Independently, null_first as true moves NULLs to the beginning, and as false to the end, irrespective of the sorting order. The function returns null if any input is null.
- sql: ARRAY_UNION(array1, array2)
table: haystack.arrayUnion(array)
description: Returns an array of the elements in the union of array1 and array2, without duplicates. If any of the array is null, the function will return null.
Expand Down
1 change: 1 addition & 0 deletions flink-python/docs/reference/pyflink.table/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ advanced type helper functions
Expression.array_max
Expression.array_slice
Expression.array_min
Expression.array_sort
Expression.array_union
Expression.map_entries
Expression.map_keys
Expand Down
16 changes: 16 additions & 0 deletions flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -1531,6 +1531,22 @@ def array_slice(self, start_offset, end_offset=None) -> 'Expression':
else:
return _ternary_op("array_slice")(self, start_offset, end_offset)

def array_sort(self, ascending_order=None, null_first=None) -> 'Expression':
"""
Returns the array in sorted order.
The function sorts an array, defaulting to ascending order with NULLs at the start when
only the array is input. Specifying ascending_order as true orders the array in ascending
with NULLs first, and setting it to false orders it in descending with NULLs last.
Independently, null_first as true moves NULLs to the beginning, and as false to the end,
irrespective of the sorting order. The function returns null if any input is null.
"""
if ascending_order and null_first is None:
return _unary_op("array_sort")(self)
elif null_first is None:
return _binary_op("array_sort")(self, ascending_order)
else:
return _ternary_op("array_sort")(self, ascending_order, null_first)

def array_union(self, array) -> 'Expression':
"""
Returns an array of the elements in the union of array1 and array2, without duplicates.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import static org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_IF_NULL;
import static org.apache.flink.table.types.inference.InputTypeStrategies.TYPE_LITERAL;
import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
import static org.apache.flink.table.types.inference.InputTypeStrategies.arrayFullyComparableElementType;
import static org.apache.flink.table.types.inference.InputTypeStrategies.commonArrayType;
import static org.apache.flink.table.types.inference.InputTypeStrategies.commonMultipleArrayType;
import static org.apache.flink.table.types.inference.InputTypeStrategies.commonType;
Expand All @@ -99,6 +98,7 @@
import static org.apache.flink.table.types.inference.TypeStrategies.nullableIfArgs;
import static org.apache.flink.table.types.inference.TypeStrategies.varyingString;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_ELEMENT_ARG;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_EQUALS_COMPARABLE;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_FULLY_COMPARABLE;
Expand Down Expand Up @@ -231,6 +231,25 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
.build();

public static final BuiltInFunctionDefinition ARRAY_SORT =
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_SORT")
.kind(SCALAR)
.inputTypeStrategy(
or(
sequence(ARRAY_FULLY_COMPARABLE),
sequence(
ARRAY_FULLY_COMPARABLE,
logical(LogicalTypeRoot.BOOLEAN)),
sequence(
ARRAY_FULLY_COMPARABLE,
logical(LogicalTypeRoot.BOOLEAN),
logical(LogicalTypeRoot.BOOLEAN))))
.outputTypeStrategy(nullableIfArgs(argument(0)))
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.ArraySortFunction")
.build();

public static final BuiltInFunctionDefinition ARRAY_DISTINCT =
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_DISTINCT")
Expand Down Expand Up @@ -327,7 +346,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_MAX")
.kind(SCALAR)
.inputTypeStrategy(arrayFullyComparableElementType())
.inputTypeStrategy(sequence(ARRAY_FULLY_COMPARABLE))
.outputTypeStrategy(forceNullable(SpecificTypeStrategies.ARRAY_ELEMENT))
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.ArrayMaxFunction")
Expand Down Expand Up @@ -355,7 +374,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_MIN")
.kind(SCALAR)
.inputTypeStrategy(arrayFullyComparableElementType())
.inputTypeStrategy(sequence(ARRAY_FULLY_COMPARABLE))
.outputTypeStrategy(forceNullable(SpecificTypeStrategies.ARRAY_ELEMENT))
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.ArrayMinFunction")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.strategies.AndArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.AnyArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.ArrayComparableElementTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CommonArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CommonArrayInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CommonInputTypeStrategy;
Expand Down Expand Up @@ -366,10 +365,6 @@ public static InputTypeStrategy commonMultipleArrayType(int minCount) {
return new CommonArrayInputTypeStrategy(ConstantArgumentCount.from(minCount));
}

public static InputTypeStrategy arrayFullyComparableElementType() {
return new ArrayComparableElementTypeStrategy(StructuredComparison.FULL);
}

/** @see ItemAtIndexArgumentTypeStrategy */
public static final ArgumentTypeStrategy ITEM_AT_INDEX = new ItemAtIndexArgumentTypeStrategy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LogicalType;
Expand All @@ -34,42 +32,35 @@
import org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
import org.apache.flink.util.Preconditions;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

/**
* An {@link InputTypeStrategy} that checks if the input argument is an ARRAY type and check whether
* its' elements are comparable.
* An {@link ArgumentTypeStrategy} that checks if the input argument is an ARRAY type and check
* whether its' elements are comparable.
*
* <p>It requires one argument.
*
* <p>For the rules which types are comparable with which types see {@link
* #areComparable(LogicalType, LogicalType)}.
*/
@Internal
public final class ArrayComparableElementTypeStrategy implements InputTypeStrategy {
public final class ArrayComparableElementArgumentTypeStrategy implements ArgumentTypeStrategy {

private final StructuredComparison requiredComparison;
private final ConstantArgumentCount argumentCount;

public ArrayComparableElementTypeStrategy(StructuredComparison requiredComparison) {
public ArrayComparableElementArgumentTypeStrategy(StructuredComparison requiredComparison) {
Preconditions.checkArgument(requiredComparison != StructuredComparison.NONE);
this.requiredComparison = requiredComparison;
this.argumentCount = ConstantArgumentCount.of(1);
}

@Override
public ArgumentCount getArgumentCount() {
return argumentCount;
}

@Override
public Optional<List<DataType>> inferInputTypes(
CallContext callContext, boolean throwOnFailure) {
public Optional<DataType> inferArgumentType(
CallContext callContext, int argumentPos, boolean throwOnFailure) {
final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
final DataType argumentType = argumentDataTypes.get(0);
final DataType argumentType = argumentDataTypes.get(argumentPos);
if (!argumentType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
return callContext.fail(throwOnFailure, "All arguments requires to be an ARRAY type");
return callContext.fail(throwOnFailure, "The argument requires to be an ARRAY type");
}
final DataType elementDataType = ((CollectionDataType) argumentType).getElementDataType();
final LogicalType elementLogicalDataType = elementDataType.getLogicalType();
Expand All @@ -80,7 +71,7 @@ public Optional<List<DataType>> inferInputTypes(
elementLogicalDataType,
comparisonToString());
}
return Optional.of(argumentDataTypes);
return Optional.of(argumentType);
}

private String comparisonToString() {
Expand Down Expand Up @@ -131,8 +122,8 @@ private boolean areComparableWithNormalizedNullability(
}

@Override
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
return Collections.singletonList(
Signature.of(Signature.Argument.ofGroup("ARRAY<COMPARABLE>")));
public Signature.Argument getExpectedArgument(
FunctionDefinition functionDefinition, int argumentPos) {
return Signature.Argument.ofGroup("ARRAY<COMPARABLE>");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.apache.flink.table.types.inference.InputTypeStrategies.or;
import static org.apache.flink.table.types.inference.InputTypeStrategies.repeatingSequence;
import static org.apache.flink.table.types.inference.InputTypeStrategies.symbol;
import static org.apache.flink.table.types.logical.StructuredType.StructuredComparison;

/**
* Entry point for specific input type strategies not covered in {@link InputTypeStrategies}.
Expand Down Expand Up @@ -89,6 +90,10 @@ public static InputTypeStrategy windowTimeIndicator() {
public static final ArgumentTypeStrategy ARRAY_ELEMENT_ARG =
new ArrayElementArgumentTypeStrategy();

/** Argument type representing the array is comparable. */
public static final ArgumentTypeStrategy ARRAY_FULLY_COMPARABLE =
new ArrayComparableElementArgumentTypeStrategy(StructuredComparison.FULL);

/**
* Input strategy for {@link BuiltInFunctionDefinitions#JSON_OBJECT}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,19 @@ ANY, explicit(DataTypes.INT())
.expectArgumentTypes(
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
DataTypes.INT()),
TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE))
.expectSignature("f(<ARRAY<COMPARABLE>>)")
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
.expectErrorMessage(
"Invalid input arguments. Expected signatures are:\n"
+ "f(<ARRAY<COMPARABLE>>)"),
TestSpec.forStrategy(
"Strategy fails if input argument type is not ARRAY",
sequence(SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE))
.calledWithArgumentTypes(DataTypes.INT())
.expectErrorMessage(
"Invalid input arguments. Expected signatures are:\n"
+ "f(<ARRAY<COMPARABLE>>)"),
TestSpec.forStrategy(
"PROCTIME type strategy",
SpecificInputTypeStrategies.windowTimeIndicator(
Expand Down

This file was deleted.

Loading

0 comments on commit 620e597

Please sign in to comment.