Skip to content

Commit

Permalink
[FLINK-34258][docs][table] Fix incorrect retract example for TableAgg…
Browse files Browse the repository at this point in the history
…regateFunction

This closes apache#24215
  • Loading branch information
LadyForest authored Feb 2, 2024
1 parent d6c7eee commit 0779c91
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
13 changes: 8 additions & 5 deletions docs/content.zh/docs/dev/table/functions/udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1892,7 +1892,10 @@ tab
{{< /tabs >}}


下面的例子展示了如何使用 `emitUpdateWithRetract` 方法来只发送更新的数据。为了只发送更新的结果,accumulator 保存了上一次的最大的2个值,也保存了当前最大的2个值。注意:如果 TopN 中的 n 非常大,这种既保存上次的结果,也保存当前的结果的方式不太高效。一种解决这种问题的方式是把输入数据直接存储到 `accumulator` 中,然后在调用 `emitUpdateWithRetract` 方法时再进行计算。
下面的例子展示了如何使用 `emitUpdateWithRetract` 方法来只发送更新的数据。为了只发送更新的结果,accumulator 保存了上一次的最大的2个值,也保存了当前最大的2个值。
{{< hint info >}}
注意:请不要在 `emitUpdateWithRetract` 方法中更新 accumulator,因为在调用 `function#emitUpdateWithRetract` 之后,`GroupTableAggFunction` 不会重新调用 `function#getAccumulators` 来将最新的 accumulator 更新到状态中。
{{< /hint >}}

{{< tabs "e0d841fe-8d95-4706-9e19-e76141171966" >}}
{{< tab "Java" >}}
Expand Down Expand Up @@ -1923,6 +1926,8 @@ public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>
}

public void accumulate(Top2Accum acc, Integer v) {
acc.oldFirst = acc.first;
acc.oldSecond = acc.second;
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
Expand All @@ -1938,7 +1943,6 @@ public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>
out.retract(Tuple2.of(acc.oldFirst, 1));
}
out.collect(Tuple2.of(acc.first, 1));
acc.oldFirst = acc.first;
}

if (!acc.second.equals(acc.oldSecond)) {
Expand All @@ -1947,7 +1951,6 @@ public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>
out.retract(Tuple2.of(acc.oldSecond, 2));
}
out.collect(Tuple2.of(acc.second, 2));
acc.oldSecond = acc.second;
}
}
}
Expand Down Expand Up @@ -1997,6 +2000,8 @@ class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum
}

def accumulate(acc: Top2Accum, v: Int) {
acc.oldFirst = acc.first
acc.oldSecond = acc.second
if (v > acc.first) {
acc.second = acc.first
acc.first = v
Expand All @@ -2015,15 +2020,13 @@ class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum
out.retract(JTuple2.of(acc.oldFirst, 1))
}
out.collect(JTuple2.of(acc.first, 1))
acc.oldFirst = acc.first
}
if (acc.second != acc.oldSecond) {
// if there is an update, retract old value then emit new value.
if (acc.oldSecond != Int.MinValue) {
out.retract(JTuple2.of(acc.oldSecond, 2))
}
out.collect(JTuple2.of(acc.second, 2))
acc.oldSecond = acc.second
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions docs/content/docs/dev/table/functions/udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1778,9 +1778,9 @@ def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]): Unit
The following example shows how to use the `emitUpdateWithRetract(...)` method to emit only incremental
updates. In order to do so, the accumulator keeps both the old and new top 2 values.

If the N of Top N is big, it might be inefficient to keep both the old and new values. One way to
solve this case is to store only the input record in the accumulator in `accumulate` method and then perform
a calculation in `emitUpdateWithRetract`.
{{< hint info >}}
Note: Do not update accumulator within `emitUpdateWithRetract` because after `function#emitUpdateWithRetract` is invoked, `GroupTableAggFunction` will not re-invoke `function#getAccumulators` to update the latest accumulator to state.
{{< /hint >}}

{{< tabs "043e94c6-05b5-4800-9e5f-7d11235f3a11" >}}
{{< tab "Java" >}}
Expand Down Expand Up @@ -1809,6 +1809,8 @@ public static class Top2WithRetract
}

public void accumulate(Top2WithRetractAccumulator acc, Integer v) {
acc.oldFirst = acc.first;
acc.oldSecond = acc.second;
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
Expand All @@ -1826,15 +1828,13 @@ public static class Top2WithRetract
out.retract(Tuple2.of(acc.oldFirst, 1));
}
out.collect(Tuple2.of(acc.first, 1));
acc.oldFirst = acc.first;
}
if (!acc.second.equals(acc.oldSecond)) {
// if there is an update, retract the old value then emit a new value
if (acc.oldSecond != Integer.MIN_VALUE) {
out.retract(Tuple2.of(acc.oldSecond, 2));
}
out.collect(Tuple2.of(acc.second, 2));
acc.oldSecond = acc.second;
}
}
}
Expand Down Expand Up @@ -1866,6 +1866,8 @@ class Top2WithRetract
}

def accumulate(acc: Top2WithRetractAccumulator, value: Integer): Unit = {
acc.oldFirst = acc.first
acc.oldSecond = acc.second
if (value > acc.first) {
acc.second = acc.first
acc.first = value
Expand All @@ -1884,15 +1886,13 @@ class Top2WithRetract
out.retract(Tuple2.of(acc.oldFirst, 1))
}
out.collect(Tuple2.of(acc.first, 1))
acc.oldFirst = acc.first
}
if (!acc.second.equals(acc.oldSecond)) {
// if there is an update, retract the old value then emit a new value
if (acc.oldSecond != Integer.MIN_VALUE) {
out.retract(Tuple2.of(acc.oldSecond, 2))
}
out.collect(Tuple2.of(acc.second, 2))
acc.oldSecond = acc.second
}
}
}
Expand Down

0 comments on commit 0779c91

Please sign in to comment.