Skip to content

Commit

Permalink
Merge pull request MaterializeInc#9366 from mjibson/tablefuncs
Browse files Browse the repository at this point in the history
sql: allow multiple table functions as long as they are identical
  • Loading branch information
maddyblue authored Dec 2, 2021
2 parents a740288 + cf1271c commit 2158364
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 56 deletions.
4 changes: 4 additions & 0 deletions doc/user/content/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ List new features before bug fixes.
- Fix a crash when decoding certain messages from Protobuf-formatted Kafka
topics {{% gh 8930 %}}.

- Improve support for table functions in `SELECT` lists.

- Support PgJDBC's `getPrimaryKeys()` API.

{{% version-header v0.10.0 %}}

- Allow ingesting avro schemas whose top node is not a record type.
Expand Down
13 changes: 2 additions & 11 deletions src/sql/src/plan/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3631,17 +3631,8 @@ fn plan_function<'a>(
Func::Aggregate(_) => {
bail!("aggregate functions are not allowed in {}", ecx.name);
}
Func::Table(_) => {
bail_unsupported!(
1546,
format!("table function ({}) in scalar position", name)
);
}
Func::Set(_) => {
bail_unsupported!(
1546,
format!("table function ({}) in scalar position", name)
);
Func::Table(_) | Func::Set(_) => {
bail!("table functions are not allowed in {}", ecx.name);
}
Func::Scalar(impls) => impls,
Func::ScalarWindow(impls) => {
Expand Down
158 changes: 121 additions & 37 deletions src/sql/src/plan/transform_ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,57 +347,39 @@ impl<'a> Desugarer<'a> {
// =>
// `SELECT .., table_func, .. FROM x, LATERAL $table_func`
//
// We do not support LATERAL ROWS FROM, so we can only support a single select
// item. Additionally, we do not attempt to identify table functions wrapped by
// other functions.
// Table functions in SELECT projections are supported by rewriting them to a
// FROM LATERAL, which is limited to a single table function. After we find a
// table function, if we find another identical to it, they are all rewritten
// to the same identifier. A second unique table function will cause an error.
//
// See: https://www.postgresql.org/docs/14/xfunc-sql.html#XFUNC-SQL-FUNCTIONS-RETURNING-SET
// See: https://www.postgresql.org/docs/14/queries-table-expressions.html#QUERIES-FROM
let mut rewrote_table_func = false;
for sel_item in node.projection.iter_mut() {
let (func, alias) = match sel_item {
SelectItem::Expr {
expr: Expr::Function(func),
alias,
} => (func, alias),
_ => {
continue;
}
};
let item = match self.scx.resolve_function(func.name.clone()) {
Ok(item) => item,
Err(_) => continue,
};
if !matches!(item.func()?, Func::Set(_) | Func::Table(_)) {
continue;
}
if rewrote_table_func {
bail_unsupported!("multiple table functions in select projections");
}
let name = Ident::new(item.name().item.clone());
// We have a table func in a select item's position, move it to FROM.

// Look for table function in SELECT projections. We use a unique
// TableFuncRewriter per SELECT, which allows differing table functions to
// exist in the same statement, as long as they are in other SELECTs.
let mut tf = TableFuncRewriter::new(self.scx);
for item in node.projection.iter_mut() {
visit_mut::visit_select_item_mut(&mut tf, item);
}
tf.status?;

if let Some((func, name)) = tf.table_func {
// We have a table func in a select item's position, add it to FROM.
node.from.push(TableWithJoins {
relation: TableFactor::Function {
function: TableFunction {
name: func.name.clone(),
args: func.args.clone(),
name: func.name,
args: func.args,
},
alias: Some(TableAlias {
name: name.clone(),
name,
columns: vec![],
strict: false,
}),
},
joins: vec![],
});
*sel_item = SelectItem::Expr {
expr: Expr::Identifier(vec![name]),
alias: alias.clone(),
};

// We don't support LATERAL ROWS FROM (#9076), so can only support a single
// table function.
rewrote_table_func = true;
}

visit_mut::visit_select_mut(self, node);
Expand Down Expand Up @@ -644,3 +626,105 @@ impl<'a> Desugarer<'a> {
Ok(())
}
}

// A VisitMut that replaces table functions with the function name as an
// Identifier. After calls to visit_mut, if table_func is Some, it holds the
// extracted table function, which should be added as a FROM clause. Query
// nodes (and thus any subquery Expr) are ignored.
struct TableFuncRewriter<'a> {
scx: &'a StatementContext<'a>,
disallowed_context: Vec<&'static str>,
table_func: Option<(Function<Raw>, Ident)>,
status: Result<(), anyhow::Error>,
}

impl<'ast> VisitMut<'ast, Raw> for TableFuncRewriter<'_> {
fn visit_expr_mut(&mut self, expr: &'ast mut Expr<Raw>) {
self.visit_internal(Self::visit_expr_mut_internal, expr);
}

fn visit_query_mut(&mut self, _node: &'ast mut Query<Raw>) {
// Do not descend into Query nodes so subqueries can have their own table
// functions rewritten.
}
}

impl<'a> TableFuncRewriter<'a> {
fn new(scx: &'a StatementContext<'a>) -> TableFuncRewriter<'a> {
TableFuncRewriter {
scx,
disallowed_context: Vec::new(),
table_func: None,
status: Ok(()),
}
}

fn visit_internal<F, X>(&mut self, f: F, x: X)
where
F: Fn(&mut Self, X) -> Result<(), anyhow::Error>,
{
if self.status.is_ok() {
// self.status could have changed from a deeper call, so don't blindly
// overwrite it with the result of this call.
let status = f(self, x);
if self.status.is_ok() {
self.status = status;
}
}
}

fn visit_expr_mut_internal(&mut self, expr: &mut Expr<Raw>) -> Result<(), anyhow::Error> {
// This block does two things:
// - Check if expr is a context where table functions are disallowed.
// - Check if expr is a table function, and attempt to replace if so.
let disallowed_context = match expr {
Expr::Case { .. } => Some("CASE"),
Expr::Coalesce { .. } => Some("COALESCE"),
Expr::Function(func) => {
if let Ok(item) = self.scx.resolve_function(func.name.clone()) {
match item.func()? {
Func::Aggregate(_) => Some("aggregate function calls"),
Func::Set(_) | Func::Table(_) => {
if let Some(context) = self.disallowed_context.last() {
bail!("table functions are not allowed in {}", context);
}
let name = Ident::new(item.name().item.clone());
let ident = Expr::Identifier(vec![name.clone()]);
match self.table_func.as_mut() {
None => {
self.table_func = Some((func.clone(), name));
}
Some((table_func, _)) => {
if func != table_func {
bail_unsupported!(
1546,
"multiple table functions in select projections"
);
}
}
}
*expr = ident;
None
}
_ => None,
}
} else {
None
}
}
_ => None,
};

if let Some(context) = disallowed_context {
self.disallowed_context.push(context);
}

visit_mut::visit_expr_mut(self, expr);

if disallowed_context.is_some() {
self.disallowed_context.pop();
}

Ok(())
}
}
15 changes: 15 additions & 0 deletions test/lang/java/smoketest/SmokeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,19 @@ void testPgJDBCgetColumns() throws SQLException, ClassNotFoundException {
stmt.execute("DROP TABLE materialize.public.getcols");
stmt.close();
}

@Test
void testPgJDBCgetPrimaryKeys() throws SQLException, ClassNotFoundException {
Statement stmt = conn.createStatement();
stmt.execute("CREATE TABLE materialize.public.getpks (a INT, b STRING)");
stmt.close();

ResultSet columns = conn.getMetaData().getPrimaryKeys("materialize", "public", "getpks");
Assertions.assertFalse(columns.next());
columns.close();

stmt = conn.createStatement();
stmt.execute("DROP TABLE materialize.public.getpks");
stmt.close();
}
}
2 changes: 1 addition & 1 deletion test/sqllogictest/cockroach/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ DROP TABLE string_agg_test2
query error supported
SELECT string_agg('foo', CAST ((SELECT NULL) AS BYTEA)) OVER ();

query error supported
query error table functions are not allowed in aggregate function calls
SELECT array_agg(generate_series(1, 2))

# Regression test for #31882.
Expand Down
72 changes: 65 additions & 7 deletions test/sqllogictest/table_func.slt
Original file line number Diff line number Diff line change
Expand Up @@ -708,13 +708,20 @@ jsonb_each
query error multiple table functions in select projections not yet supported
SELECT 1, jsonb_object_keys('{"1":2,"3":4}'::JSONB), jsonb_object_keys('{"1":2,"3":4,"5":6}'::JSONB) ORDER BY 1

query error table function .* in scalar position not yet supported
query T colnames
SELECT jsonb_build_object(jsonb_object_keys('{"a":2, "b":3}'), 1, 'c', 3) ORDER BY 1
----
jsonb_build_object
{"a":1,"c":3}
{"b":1,"c":3}

query error table function .* in scalar position not yet supported
query TT
SELECT jsonb_build_object(jsonb_object_keys('{"a":2, "b":3}'), 1, 'c', 3), jsonb_build_object(jsonb_object_keys('{"a":2, "b":3}'), 1, 'c', 3) ORDER BY 1
----
{"a":1,"c":3} {"a":1,"c":3}
{"b":1,"c":3} {"b":1,"c":3}

query error table function .* in scalar position not yet supported
query error multiple table functions in select projections not yet supported
SELECT jsonb_build_object(jsonb_object_keys('{"a":2, "b":3}'), 1, 'c', 3), jsonb_build_object(jsonb_object_keys('{"a":2}'), 1, 'c', 3);

query error multiple table functions in select projections not yet supported
Expand All @@ -723,12 +730,25 @@ SELECT jsonb_array_elements(jsonb_array_elements('[[1,2],[3,4]]')), jsonb_array_
query error multiple table functions in select projections not yet supported
SELECT jsonb_array_elements(jsonb_array_elements('[[1,2],[3,4]]')), jsonb_array_elements('[7,8,9]') ORDER BY 1

# Postgres explicitly disallows this use case.
query error table function .* in scalar position not yet supported
# Postgres explicitly disallows table funcs (although it uses "set-returning
# functions") in CASE and COALESCE.
query error table functions are not allowed in CASE
SELECT i, CASE WHEN i > 0 THEN generate_series(1, 5) ELSE 0 END FROM t

query error multiple table functions in select projections not yet supported
SELECT generate_series(1,2), generate_series(1,2)
query error table functions are not allowed in COALESCE
SELECT COALESCE(1, generate_series(1, 1))

query error table functions are not allowed in aggregate function calls
SELECT array_agg(generate_series(1, 2))

# Subqueries avoid the CASE errors.
query I
SELECT COALESCE(1, (SELECT generate_series(1, 1)))
----
1

query error table functions are not allowed in WHERE clause
SELECT 1 WHERE generate_series(1, 1)

# timestamp-based generate series

Expand Down Expand Up @@ -1026,3 +1046,41 @@ SELECT * FROM LATERAL ROWS FROM (generate_series(1,1), information_schema._pg_ex
----
a b c
1 1 1

# Multiple table funcs in SELECT projection.

query II colnames rowsort
SELECT generate_series(1,2) x, generate_series(1,2) ORDER BY 1
----
x generate_series
1 1
2 2

query II colnames rowsort
SELECT generate_series(1,2), generate_series(1,2) y ORDER BY 1
----
generate_series y
1 1
2 2

query II colnames rowsort
SELECT generate_series(1,2) x, generate_series(1,2) y ORDER BY 1
----
x y
1 1
2 2

query IT colnames
SELECT (information_schema._pg_expandarray(array[10])).x, information_schema._pg_expandarray(array[10])
----
x _pg_expandarray
10 (10,1)

query error multiple table functions in select projections not yet supported
SELECT generate_series(1, 1), generate_series(2, 2)

# Subqueries avoid the uniqueness check.
query II
SELECT generate_series(1, 1), (SELECT generate_series(2, 2))
----
1 2

0 comments on commit 2158364

Please sign in to comment.