Skip to content

Commit

Permalink
Include generated
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek committed Jan 8, 2025
1 parent d22ca46 commit e08bb0b
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ write zilla:data.ext ${pgsql:dataEx()
write "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n"
" SELECT\n"
" c.correlation_id,\n"
" c.owner_id,\n"
" c.created_at,\n"
" CASE\n"
" WHEN balance >= c.amount THEN 'PaymentSent'\n"
" ELSE 'PaymentDeclined'\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ read zilla:data.ext ${pgsql:dataEx()
read "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n"
" SELECT\n"
" c.correlation_id,\n"
" c.owner_id,\n"
" c.created_at,\n"
" CASE\n"
" WHEN balance >= c.amount THEN 'PaymentSent'\n"
" ELSE 'PaymentDeclined'\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ private final class CreateHandlerMaterializedViewState implements RisingwaveMacr
CREATE MATERIALIZED VIEW %s.%s AS
SELECT
c.correlation_id,
%s,
%s
FROM %s.%s_commands c
%s
Expand Down Expand Up @@ -316,6 +317,11 @@ public void onStarted(
}
}

String include = command.columns().stream()
.filter(c -> ZILLA_MAPPINGS.containsKey(c.generatedAlways()))
.map(c -> "c.%s".formatted(c.name()))
.collect(Collectors.joining(",\n "));

String commandName = command.commandHandlers().entrySet().stream()
.filter(e -> e.getValue().equals(name))
.map(Map.Entry::getKey)
Expand All @@ -324,7 +330,8 @@ public void onStarted(

String where = "c.%s = '%s'".formatted(command.dispatchOn(), commandName);

String sqlQuery = String.format(sqlFormat, systemSchema, name, columns, systemSchema, from, join, where);
String sqlQuery = String.format(sqlFormat, systemSchema, name, include, columns,
systemSchema, from, join, where);

handler.doExecuteSystemClient(traceId, authorization, sqlQuery);
}
Expand Down

0 comments on commit e08bb0b

Please sign in to comment.