Skip to content

Commit

Permalink
[core] introduce theta_sketch aggregate function. (#3723)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhoulii authored Jul 11, 2024
1 parent 9306dd8 commit b55ef81
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 0 deletions.
59 changes: 59 additions & 0 deletions docs/content/primary-key-table/merge-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,65 @@ Current supported aggregate functions and data types are:
* `merge_map`:
The merge_map function merge input maps. It only supports MAP type.

* `theta_sketch`:
The theta_sketch function aggregates multiple serialized Sketch objects into a single Sketch.
It supports VARBINARY data type.

An example:

{{< tabs "nested_update-example" >}}

{{< tab "Flink" >}}

```sql
-- source table
CREATE TABLE VISITS (
id INT PRIMARY KEY NOT ENFORCED,
user_id STRING
);

-- agg table
CREATE TABLE UV_AGG (
id INT PRIMARY KEY NOT ENFORCED,
uv VARBINARY
) WITH (
'merge-engine' = 'aggregation',
'fields.f0.aggregate-function' = 'theta_sketch'
);

-- Register the following class as a Flink function with the name "SKETCH"
-- which is used to transform input to sketch bytes array:
--
-- public static class SketchFunction extends ScalarFunction {
-- public byte[] eval(String user_id) {
-- UpdateSketch updateSketch = UpdateSketch.builder().build();
-- updateSketch.update(user_id);
-- return updateSketch.compact().toByteArray();
-- }
-- }
--
INSERT INTO UV_AGG SELECT id, SKETCH(user_id) FROM VISITS;

-- Register the following class as a Flink function with the name "SKETCH_COUNT"
-- which is used to get cardinality from sketch bytes array:
--
-- public static class SketchCountFunction extends ScalarFunction {
-- public Double eval(byte[] sketchBytes) {
-- if (sketchBytes == null) {
-- return 0d;
-- }
-- return Sketches.wrapCompactSketch(Memory.wrap(sketchBytes)).getEstimate();
-- }
-- }
--
-- Then we can get user cardinality based on the aggregated field.
SELECT id, SKETCH_COUNT(UV) as uv FROM UV_AGG;
```

{{< /tab >}}

{{< /tabs >}}

{{< hint info >}}
For streaming queries, `aggregation` merge engine must be used together with `lookup` or `full-compaction`
[changelog producer]({{< ref "primary-key-table/changelog-producer" >}}). ('input' changelog producer is also supported, but only returns input records.)
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ under the License.
<version>0.2.2</version>
</dependency>

<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
<version>4.2.0</version>
</dependency>

<!-- Test -->

<dependency>
Expand Down Expand Up @@ -294,6 +300,7 @@ under the License.
<include>net.openhft:zero-allocation-hashing</include>
<include>com.github.davidmoten:hilbert-curve</include>
<include>com.github.davidmoten:guava-mini</include>
<include>org.apache.datasketches:*</include>
</includes>
</artifactSet>
<filters>
Expand Down Expand Up @@ -345,6 +352,10 @@ under the License.
<pattern>com.github.davidmoten.guavamini</pattern>
<shadedPattern>org.apache.paimon.shade.com.github.davidmoten.guavamini</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.datasketches</pattern>
<shadedPattern>org.apache.paimon.shade.org.apache.datasketches</shadedPattern>
</relocation>
</relocations>
<minimizeJar>true</minimizeJar>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.annotation.VisibleForTesting;

import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.Sketches;
import org.apache.datasketches.theta.Union;
import org.apache.datasketches.theta.UpdateSketch;

/** A compressed bitmap for 32-bit integer. */
public class ThetaSketch {

public static byte[] union(byte[] sketchBytes1, byte[] sketchBytes2) {
Union union = Sketches.setOperationBuilder().buildUnion();
union.union(Memory.wrap(sketchBytes1));
union.union(Memory.wrap(sketchBytes2));
return union.getResult().toByteArray();
}

@VisibleForTesting
public static byte[] sketchOf(int... values) {
UpdateSketch updateSketch = UpdateSketch.builder().build();
for (int value : values) {
updateSketch.update(value);
}
return updateSketch.compact().toByteArray();
}
}
2 changes: 2 additions & 0 deletions paimon-common/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ The Apache Software Foundation (http://www.apache.org/).

This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
- org.roaringbitmap:RoaringBitmap:1.0.5
- org.apache.datasketches:datasketches-java:4.2.0
- org.apache.datasketches:datasketches-memory:2.2.0

This project bundles the following dependencies under the BSD 3-clause license.
You find them under licenses/LICENSE.antlr-runtime and licenses/LICENSE.janino.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarBinaryType;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -116,6 +117,13 @@ public static FieldAggregator createFieldAggregator(
fieldType);
fieldAggregator = new FieldMergeMapAgg((MapType) fieldType);
break;
case FieldThetaSketchAgg.NAME:
checkArgument(
fieldType instanceof VarBinaryType,
"Data type for theta sketch column must be 'VarBinaryType' but was '%s'.",
fieldType);
fieldAggregator = new FieldThetaSketchAgg((VarBinaryType) fieldType);
break;
default:
throw new RuntimeException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree.compact.aggregate;

import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.utils.ThetaSketch;

/** ThetaSketch aggregate a field of a row. */
public class FieldThetaSketchAgg extends FieldAggregator {

public static final String NAME = "theta_sketch";

private static final long serialVersionUID = 1L;

public FieldThetaSketchAgg(VarBinaryType dataType) {
super(dataType);
}

@Override
public String name() {
return NAME;
}

@Override
public Object agg(Object accumulator, Object inputField) {
if (accumulator == null && inputField == null) {
return null;
}

if (accumulator == null || inputField == null) {
return accumulator == null ? inputField : accumulator;
}

return ThetaSketch.union((byte[]) accumulator, (byte[]) inputField);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.paimon.utils.ThetaSketch.sketchOf;
import static org.assertj.core.api.Assertions.assertThat;

/** test whether {@link FieldAggregator}' subclasses behaviors are expected. */
Expand Down Expand Up @@ -725,6 +726,29 @@ public void testFieldMergeMapAggRetract() {
assertThat(toJavaMap(result)).containsExactlyInAnyOrderEntriesOf(toMap(3, "C"));
}

@Test
public void testFieldThetaSketchAgg() {
FieldThetaSketchAgg agg = new FieldThetaSketchAgg(DataTypes.VARBINARY(20));

byte[] inputVal = sketchOf(1);
byte[] acc1 = sketchOf(2, 3);
byte[] acc2 = sketchOf(1, 2, 3);

assertThat(agg.agg(null, null)).isNull();

byte[] result1 = (byte[]) agg.agg(null, inputVal);
assertThat(inputVal).isEqualTo(result1);

byte[] result2 = (byte[]) agg.agg(acc1, null);
assertThat(result2).isEqualTo(acc1);

byte[] result3 = (byte[]) agg.agg(acc1, inputVal);
assertThat(result3).isEqualTo(acc2);

byte[] result4 = (byte[]) agg.agg(acc2, inputVal);
assertThat(result4).isEqualTo(acc2);
}

private Map<Object, Object> toMap(Object... kvs) {
Map<Object, Object> result = new HashMap<>();
for (int i = 0; i < kvs.length; i += 2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.mergetree.compact.aggregate.FieldNestedUpdateAgg;
import org.apache.paimon.utils.BlockingIterator;

import org.apache.commons.codec.binary.Hex;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
Expand All @@ -46,6 +47,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.paimon.utils.ThetaSketch.sketchOf;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -1810,4 +1812,49 @@ public void testStreamingRead() {
"Pre-aggregate continuous reading is not supported");
}
}

/** ITCase for {@link org.apache.paimon.mergetree.compact.aggregate.FieldThetaSketchAgg}. */
public static class ThetaSketchAggAggregationITCase extends CatalogITCaseBase {

@Test
public void testThetaSketchAgg() {
sql(
"CREATE TABLE test_collect("
+ " id INT PRIMARY KEY NOT ENFORCED,"
+ " f0 VARBINARY"
+ ") WITH ("
+ " 'merge-engine' = 'aggregation',"
+ " 'fields.f0.aggregate-function' = 'theta_sketch'"
+ ")");

String str1 = Hex.encodeHexString(sketchOf(1)).toUpperCase();
String str2 = Hex.encodeHexString(sketchOf(2)).toUpperCase();
String str3 = Hex.encodeHexString(sketchOf(3)).toUpperCase();

sql(
String.format(
"INSERT INTO test_collect VALUES (1, CAST (NULL AS VARBINARY)),(2, CAST(x'%s' AS VARBINARY)), (3, CAST(x'%s' AS VARBINARY))",
str1, str2));

List<Row> result = queryAndSort("SELECT * FROM test_collect");
checkOneRecord(result.get(0), 1, null);
checkOneRecord(result.get(1), 2, sketchOf(1));
checkOneRecord(result.get(2), 3, sketchOf(2));

sql(
String.format(
"INSERT INTO test_collect VALUES (1, CAST (x'%s' AS VARBINARY)),(2, CAST(x'%s' AS VARBINARY)), (2, CAST(x'%s' AS VARBINARY)), (3, CAST(x'%s' AS VARBINARY))",
str1, str2, str2, str3));

result = queryAndSort("SELECT * FROM test_collect");
checkOneRecord(result.get(0), 1, sketchOf(1));
checkOneRecord(result.get(1), 2, sketchOf(1, 2));
checkOneRecord(result.get(2), 3, sketchOf(2, 3));
}

private void checkOneRecord(Row row, int id, byte[] expected) {
assertThat(row.getField(0)).isEqualTo(id);
assertThat(row.getField(1)).isEqualTo(expected);
}
}
}

0 comments on commit b55ef81

Please sign in to comment.