Skip to content

Commit

Permalink
fix(analyzer): correct embedded analytic funcs (#2653)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying authored Mar 1, 2024
1 parent fc8fdc0 commit 4240c53
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 12 deletions.
6 changes: 3 additions & 3 deletions internal/topo/planner/analyzer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 EMQ Technologies Co., Ltd.
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -175,15 +175,15 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*
if function.IsAnalyticFunc(f.Name) {
f.CachedField = fmt.Sprintf("%s_%s_%d", function.AnalyticPrefix, f.Name, f.FuncId)
f.Cached = true
analyticFuncs = append(analyticFuncs, &ast.Call{
analyticFuncs = append([]*ast.Call{{
Name: f.Name,
FuncId: f.FuncId,
FuncType: f.FuncType,
Args: f.Args,
CachedField: f.CachedField,
Partition: f.Partition,
WhenExpr: f.WhenExpr,
})
}}, analyticFuncs...)
}
}
return true
Expand Down
42 changes: 33 additions & 9 deletions internal/topo/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1813,7 +1813,7 @@ func Test_createLogicalPlan(t *testing.T) {
}.Init(),
},
{ // 15 analytic function over partition plan
sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp), id1 FROM src1 WHERE lag(temp) > temp`,
sql: `SELECT latest(lag(name)) OVER (PARTITION BY temp), id1 FROM src1 WHERE latest(lag(temp)) > temp`,
p: ProjectPlan{
baseLogicalPlan: baseLogicalPlan{
children: []LogicalPlan{
Expand Down Expand Up @@ -1852,6 +1852,21 @@ func Test_createLogicalPlan(t *testing.T) {
StreamName: "src1",
}},
},
{
Name: "latest",
FuncId: 3,
CachedField: "$$a_latest_3",
Args: []ast.Expr{&ast.Call{
Name: "lag",
FuncId: 2,
CachedField: "$$a_lag_2",
Cached: true,
Args: []ast.Expr{&ast.FieldRef{
Name: "temp",
StreamName: "src1",
}},
}},
},
},
fieldFuncs: []*ast.Call{
{
Expand All @@ -1866,14 +1881,20 @@ func Test_createLogicalPlan(t *testing.T) {
},
condition: &ast.BinaryExpr{
LHS: &ast.Call{
Name: "lag",
FuncId: 2,
Args: []ast.Expr{&ast.FieldRef{
Name: "temp",
StreamName: "src1",
}},
CachedField: "$$a_lag_2",
Name: "latest",
FuncId: 3,
CachedField: "$$a_latest_3",
Cached: true,
Args: []ast.Expr{&ast.Call{
Name: "lag",
FuncId: 2,
CachedField: "$$a_lag_2",
Args: []ast.Expr{&ast.FieldRef{
Name: "temp",
StreamName: "src1",
}},
Cached: true,
}},
},
OP: ast.GT,
RHS: &ast.FieldRef{
Expand Down Expand Up @@ -2737,7 +2758,10 @@ func Test_createLogicalPlan(t *testing.T) {
if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
t.Errorf("%d. %v: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.sql, tt.err, err)
} else {
assert.Equal(t, tt.p, p, "plan mismatch")
ok := assert.Equal(t, tt.p, p, "%d plan mismatch %s", i)
if !ok {
t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
}
}
}
}
Expand Down

0 comments on commit 4240c53

Please sign in to comment.