Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek committed Jan 8, 2025
1 parent 811415b commit d22ca46
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ write "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n"
" WHEN balance >= c.amount THEN 'PaymentSent'\n"
" ELSE 'PaymentDeclined'\n"
" END AS event, c.user_id, c.amount\n"
" FROM app_events_commands c\n"
" FROM zb_catalog.app_events_commands c\n"
" JOIN balance ON WHERE user_id = c.user_id\n"
" WHERE c.command = 'SendPayment';"
[0x00]
Expand All @@ -174,7 +174,7 @@ write zilla:data.ext ${pgsql:dataEx()
.build()
.build()}
write "CREATE MATERIALIZED VIEW public.app_events AS\n"
" SELECT * FROM send_payment_handler;"
" SELECT * FROM zb_catalog.send_payment_handler;"
[0x00]

read advised zilla:flush ${pgsql:flushEx()
Expand Down Expand Up @@ -248,12 +248,12 @@ write "CREATE SINK zb_catalog.app_events_replies_sink AS\n"
" SELECT\n"
" COALESCE(r.status, '400') AS status,\n"
" c.correlation_id\n"
" FROM app_events_commands c\n"
" LEFT JOIN app_events_reply_handler r\n"
" FROM zb_catalog.app_events_commands c\n"
" LEFT JOIN zb_catalog.app_events_reply_handler r\n"
" ON c.correlation_id = r.correlation_id\n"
"WITH (\n"
" connector = 'kafka',\n"
" topic = 'public.app_events_replies',\n"
" topic = 'public.app_events_replies_sink',\n"
" properties.bootstrap.server = 'localhost:9092',\n"
") FORMAT PLAIN ENCODE AVRO (\n"
" force_append_only='true',\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ read "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n"
" WHEN balance >= c.amount THEN 'PaymentSent'\n"
" ELSE 'PaymentDeclined'\n"
" END AS event, c.user_id, c.amount\n"
" FROM app_events_commands c\n"
" FROM zb_catalog.app_events_commands c\n"
" JOIN balance ON WHERE user_id = c.user_id\n"
" WHERE c.command = 'SendPayment';"
[0x00]
Expand All @@ -176,7 +176,7 @@ read zilla:data.ext ${pgsql:dataEx()
.build()
.build()}
read "CREATE MATERIALIZED VIEW public.app_events AS\n"
" SELECT * FROM send_payment_handler;"
" SELECT * FROM zb_catalog.send_payment_handler;"
[0x00]

write advise zilla:flush ${pgsql:flushEx()
Expand Down Expand Up @@ -250,12 +250,12 @@ read "CREATE SINK zb_catalog.app_events_replies_sink AS\n"
" SELECT\n"
" COALESCE(r.status, '400') AS status,\n"
" c.correlation_id\n"
" FROM app_events_commands c\n"
" LEFT JOIN app_events_reply_handler r\n"
" FROM zb_catalog.app_events_commands c\n"
" LEFT JOIN zb_catalog.app_events_reply_handler r\n"
" ON c.correlation_id = r.correlation_id\n"
"WITH (\n"
" connector = 'kafka',\n"
" topic = 'public.app_events_replies',\n"
" topic = 'public.app_events_replies_sink',\n"
" properties.bootstrap.server = 'localhost:9092',\n"
") FORMAT PLAIN ENCODE AVRO (\n"
" force_append_only='true',\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private final class CreateHandlerMaterializedViewState implements RisingwaveMacr
SELECT
c.correlation_id,
%s
FROM %s_commands c
FROM %s.%s_commands c
%s
WHERE %s;\u0000""";

Expand Down Expand Up @@ -324,7 +324,7 @@ public void onStarted(

String where = "c.%s = '%s'".formatted(command.dispatchOn(), commandName);

String sqlQuery = String.format(sqlFormat, systemSchema, name, columns, from, join, where);
String sqlQuery = String.format(sqlFormat, systemSchema, name, columns, systemSchema, from, join, where);

handler.doExecuteSystemClient(traceId, authorization, sqlQuery);
}
Expand Down Expand Up @@ -370,8 +370,8 @@ public void onStarted(
long authorization)
{
String selects = command.commandHandlers().values().stream()
.map("SELECT * FROM %s"::formatted)
.collect(Collectors.joining("UNION ALL\n"));
.map(value -> String.format("SELECT * FROM %s.%s", systemSchema, value))
.collect(Collectors.joining("\nUNION ALL\n"));

String sqlQuery = String.format(sqlFormat, command.schema(), command.name(), selects);

Expand Down Expand Up @@ -534,7 +534,7 @@ public RisingwaveMacroState onError(
private final class CreateReplySink implements RisingwaveMacroState
{
private final String sqlFormat = """
CREATE SINK %s.%s_replies_sink AS
CREATE SINK %s_replies_sink AS
SELECT
COALESCE(r.status, '400') AS status,
c.correlation_id
Expand All @@ -543,7 +543,7 @@ private final class CreateReplySink implements RisingwaveMacroState
ON c.correlation_id = r.correlation_id
WITH (
connector = 'kafka',
topic = '%s.%s_replies',
topic = '%s.%s_replies_sink',
properties.bootstrap.server = '%s',
) FORMAT PLAIN ENCODE AVRO (
force_append_only='true',
Expand All @@ -555,10 +555,12 @@ public void onStarted(
long traceId,
long authorization)
{
String name = command.name();
String schema = command.schema();
final String name = command.name();
final String schema = command.schema();

final String systemName = "%s.%s".formatted(systemSchema, name);

String sqlQuery = String.format(sqlFormat, systemSchema, name, name, name,
String sqlQuery = String.format(sqlFormat, systemName, systemName, systemName,
schema, name, bootstrapServer, schemaRegistry);

handler.doExecuteSystemClient(traceId, authorization, sqlQuery);
Expand Down

0 comments on commit d22ca46

Please sign in to comment.