Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor bug fixes in zstream #1364

Merged
merged 2 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,13 @@ write zilla:data.ext ${pgsql:dataEx()
write "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n"
" SELECT\n"
" c.correlation_id,\n"
" c.owner_id,\n"
" c.created_at,\n"
" CASE\n"
" WHEN balance >= c.amount THEN 'PaymentSent'\n"
" ELSE 'PaymentDeclined'\n"
" END AS event, c.user_id, c.amount\n"
" FROM app_events_commands c\n"
" FROM zb_catalog.app_events_commands c\n"
" JOIN balance ON WHERE user_id = c.user_id\n"
" WHERE c.command = 'SendPayment';"
[0x00]
Expand All @@ -174,7 +176,7 @@ write zilla:data.ext ${pgsql:dataEx()
.build()
.build()}
write "CREATE MATERIALIZED VIEW public.app_events AS\n"
" SELECT * FROM send_payment_handler;"
" SELECT * FROM zb_catalog.send_payment_handler;"
[0x00]

read advised zilla:flush ${pgsql:flushEx()
Expand Down Expand Up @@ -248,12 +250,12 @@ write "CREATE SINK zb_catalog.app_events_replies_sink AS\n"
" SELECT\n"
" COALESCE(r.status, '400') AS status,\n"
" c.correlation_id\n"
" FROM app_events_commands c\n"
" LEFT JOIN app_events_reply_handler r\n"
" FROM zb_catalog.app_events_commands c\n"
" LEFT JOIN zb_catalog.app_events_reply_handler r\n"
" ON c.correlation_id = r.correlation_id\n"
"WITH (\n"
" connector = 'kafka',\n"
" topic = 'public.app_events_replies',\n"
" topic = 'public.app_events_replies_sink',\n"
" properties.bootstrap.server = 'localhost:9092',\n"
") FORMAT PLAIN ENCODE AVRO (\n"
" force_append_only='true',\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,13 @@ read zilla:data.ext ${pgsql:dataEx()
read "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n"
" SELECT\n"
" c.correlation_id,\n"
" c.owner_id,\n"
" c.created_at,\n"
" CASE\n"
" WHEN balance >= c.amount THEN 'PaymentSent'\n"
" ELSE 'PaymentDeclined'\n"
" END AS event, c.user_id, c.amount\n"
" FROM app_events_commands c\n"
" FROM zb_catalog.app_events_commands c\n"
" JOIN balance ON WHERE user_id = c.user_id\n"
" WHERE c.command = 'SendPayment';"
[0x00]
Expand All @@ -176,7 +178,7 @@ read zilla:data.ext ${pgsql:dataEx()
.build()
.build()}
read "CREATE MATERIALIZED VIEW public.app_events AS\n"
" SELECT * FROM send_payment_handler;"
" SELECT * FROM zb_catalog.send_payment_handler;"
[0x00]

write advise zilla:flush ${pgsql:flushEx()
Expand Down Expand Up @@ -250,12 +252,12 @@ read "CREATE SINK zb_catalog.app_events_replies_sink AS\n"
" SELECT\n"
" COALESCE(r.status, '400') AS status,\n"
" c.correlation_id\n"
" FROM app_events_commands c\n"
" LEFT JOIN app_events_reply_handler r\n"
" FROM zb_catalog.app_events_commands c\n"
" LEFT JOIN zb_catalog.app_events_reply_handler r\n"
" ON c.correlation_id = r.correlation_id\n"
"WITH (\n"
" connector = 'kafka',\n"
" topic = 'public.app_events_replies',\n"
" topic = 'public.app_events_replies_sink',\n"
" properties.bootstrap.server = 'localhost:9092',\n"
") FORMAT PLAIN ENCODE AVRO (\n"
" force_append_only='true',\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,9 @@ private final class CreateHandlerMaterializedViewState implements RisingwaveMacr
CREATE MATERIALIZED VIEW %s.%s AS
SELECT
c.correlation_id,
%s,
%s
FROM %s_commands c
FROM %s.%s_commands c
%s
WHERE %s;\u0000""";

Expand Down Expand Up @@ -316,6 +317,11 @@ public void onStarted(
}
}

String include = command.columns().stream()
.filter(c -> ZILLA_MAPPINGS.containsKey(c.generatedAlways()))
.map(c -> "c.%s".formatted(c.name()))
.collect(Collectors.joining(",\n "));

String commandName = command.commandHandlers().entrySet().stream()
.filter(e -> e.getValue().equals(name))
.map(Map.Entry::getKey)
Expand All @@ -324,7 +330,8 @@ public void onStarted(

String where = "c.%s = '%s'".formatted(command.dispatchOn(), commandName);

String sqlQuery = String.format(sqlFormat, systemSchema, name, columns, from, join, where);
String sqlQuery = String.format(sqlFormat, systemSchema, name, include, columns,
systemSchema, from, join, where);

handler.doExecuteSystemClient(traceId, authorization, sqlQuery);
}
Expand Down Expand Up @@ -370,8 +377,8 @@ public void onStarted(
long authorization)
{
String selects = command.commandHandlers().values().stream()
.map("SELECT * FROM %s"::formatted)
.collect(Collectors.joining("UNION ALL\n"));
.map(value -> String.format("SELECT * FROM %s.%s", systemSchema, value))
.collect(Collectors.joining("\nUNION ALL\n"));

String sqlQuery = String.format(sqlFormat, command.schema(), command.name(), selects);

Expand Down Expand Up @@ -534,7 +541,7 @@ public RisingwaveMacroState onError(
private final class CreateReplySink implements RisingwaveMacroState
{
private final String sqlFormat = """
CREATE SINK %s.%s_replies_sink AS
CREATE SINK %s_replies_sink AS
SELECT
COALESCE(r.status, '400') AS status,
c.correlation_id
Expand All @@ -543,7 +550,7 @@ private final class CreateReplySink implements RisingwaveMacroState
ON c.correlation_id = r.correlation_id
WITH (
connector = 'kafka',
topic = '%s.%s_replies',
topic = '%s.%s_replies_sink',
properties.bootstrap.server = '%s',
) FORMAT PLAIN ENCODE AVRO (
force_append_only='true',
Expand All @@ -555,10 +562,12 @@ public void onStarted(
long traceId,
long authorization)
{
String name = command.name();
String schema = command.schema();
final String name = command.name();
final String schema = command.schema();

final String systemName = "%s.%s".formatted(systemSchema, name);

String sqlQuery = String.format(sqlFormat, systemSchema, name, name, name,
String sqlQuery = String.format(sqlFormat, systemName, systemName, systemName,
schema, name, bootstrapServer, schemaRegistry);

handler.doExecuteSystemClient(traceId, authorization, sqlQuery);
Expand Down
Loading