From d22ca4678ed48287266ef7abdd9d940df8e5e717 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 8 Jan 2025 10:33:04 -0800 Subject: [PATCH 1/2] WIP --- .../effective/create.zstream/client.rpt | 10 +++++----- .../effective/create.zstream/server.rpt | 10 +++++----- .../macro/RisingwaveCreateZstreamMacro.java | 20 ++++++++++--------- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/client.rpt index 2119a63375..eabc1a88dc 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/client.rpt @@ -149,7 +149,7 @@ write "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n" " WHEN balance >= c.amount THEN 'PaymentSent'\n" " ELSE 'PaymentDeclined'\n" " END AS event, c.user_id, c.amount\n" - " FROM app_events_commands c\n" + " FROM zb_catalog.app_events_commands c\n" " JOIN balance ON WHERE user_id = c.user_id\n" " WHERE c.command = 'SendPayment';" [0x00] @@ -174,7 +174,7 @@ write zilla:data.ext ${pgsql:dataEx() .build() .build()} write "CREATE MATERIALIZED VIEW public.app_events AS\n" - " SELECT * FROM send_payment_handler;" + " SELECT * FROM zb_catalog.send_payment_handler;" [0x00] read advised zilla:flush ${pgsql:flushEx() @@ -248,12 +248,12 @@ write "CREATE SINK zb_catalog.app_events_replies_sink AS\n" " SELECT\n" " COALESCE(r.status, '400') AS status,\n" " c.correlation_id\n" - " FROM app_events_commands c\n" - " LEFT JOIN app_events_reply_handler r\n" + " FROM zb_catalog.app_events_commands c\n" + " LEFT JOIN zb_catalog.app_events_reply_handler r\n" " ON c.correlation_id = r.correlation_id\n" "WITH (\n" " connector = 'kafka',\n" - " topic = 'public.app_events_replies',\n" + " topic = 'public.app_events_replies_sink',\n" " properties.bootstrap.server = 'localhost:9092',\n" ") FORMAT PLAIN ENCODE AVRO (\n" " force_append_only='true',\n" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/server.rpt index 23f6ee11ad..12e986d549 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/server.rpt @@ -151,7 +151,7 @@ read "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n" " WHEN balance >= c.amount THEN 'PaymentSent'\n" " ELSE 'PaymentDeclined'\n" " END AS event, c.user_id, c.amount\n" - " FROM app_events_commands c\n" + " FROM zb_catalog.app_events_commands c\n" " JOIN balance ON WHERE user_id = c.user_id\n" " WHERE c.command = 'SendPayment';" [0x00] @@ -176,7 +176,7 @@ read zilla:data.ext ${pgsql:dataEx() .build() .build()} read "CREATE MATERIALIZED VIEW public.app_events AS\n" - " SELECT * FROM send_payment_handler;" + " SELECT * FROM zb_catalog.send_payment_handler;" [0x00] write advise zilla:flush ${pgsql:flushEx() @@ -250,12 +250,12 @@ read "CREATE SINK zb_catalog.app_events_replies_sink AS\n" " SELECT\n" " COALESCE(r.status, '400') AS status,\n" " c.correlation_id\n" - " FROM app_events_commands c\n" - " LEFT JOIN app_events_reply_handler r\n" + " FROM zb_catalog.app_events_commands c\n" + " LEFT JOIN zb_catalog.app_events_reply_handler r\n" " ON c.correlation_id = r.correlation_id\n" "WITH (\n" " connector = 'kafka',\n" - " topic = 'public.app_events_replies',\n" + " topic = 'public.app_events_replies_sink',\n" " properties.bootstrap.server = 'localhost:9092',\n" ") FORMAT PLAIN ENCODE AVRO (\n" " force_append_only='true',\n" diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZstreamMacro.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZstreamMacro.java index d20bf63ab4..dfeec8c604 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZstreamMacro.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZstreamMacro.java @@ -279,7 +279,7 @@ private final class CreateHandlerMaterializedViewState implements RisingwaveMacr SELECT c.correlation_id, %s - FROM %s_commands c + FROM %s.%s_commands c %s WHERE %s;\u0000"""; @@ -324,7 +324,7 @@ public void onStarted( String where = "c.%s = '%s'".formatted(command.dispatchOn(), commandName); - String sqlQuery = String.format(sqlFormat, systemSchema, name, columns, from, join, where); + String sqlQuery = String.format(sqlFormat, systemSchema, name, columns, systemSchema, from, join, where); handler.doExecuteSystemClient(traceId, authorization, sqlQuery); } @@ -370,8 +370,8 @@ public void onStarted( long authorization) { String selects = command.commandHandlers().values().stream() - .map("SELECT * FROM %s"::formatted) - .collect(Collectors.joining("UNION ALL\n")); + .map(value -> String.format("SELECT * FROM %s.%s", systemSchema, value)) + .collect(Collectors.joining("\nUNION ALL\n")); String sqlQuery = String.format(sqlFormat, command.schema(), command.name(), selects); @@ -534,7 +534,7 @@ public RisingwaveMacroState onError( private final class CreateReplySink implements RisingwaveMacroState { private final String sqlFormat = """ - CREATE SINK %s.%s_replies_sink AS + CREATE SINK %s_replies_sink AS SELECT COALESCE(r.status, '400') AS status, c.correlation_id @@ -543,7 +543,7 @@ private final class CreateReplySink implements RisingwaveMacroState ON c.correlation_id = r.correlation_id WITH ( connector = 'kafka', - topic = '%s.%s_replies', + topic = '%s.%s_replies_sink', properties.bootstrap.server = '%s', ) FORMAT PLAIN ENCODE AVRO ( force_append_only='true', @@ -555,10 +555,12 @@ public void onStarted( long traceId, long authorization) { - String name = command.name(); - String schema = command.schema(); + final String name = command.name(); + final String schema = command.schema(); + + final String systemName = "%s.%s".formatted(systemSchema, name); - String sqlQuery = String.format(sqlFormat, systemSchema, name, name, name, + String sqlQuery = String.format(sqlFormat, systemName, systemName, systemName, schema, name, bootstrapServer, schemaRegistry); handler.doExecuteSystemClient(traceId, authorization, sqlQuery); From e08bb0b8f946e0edb7789e2e154c53206dea1988 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 8 Jan 2025 11:54:37 -0800 Subject: [PATCH 2/2] Include generated --- .../streams/effective/create.zstream/client.rpt | 2 ++ .../streams/effective/create.zstream/server.rpt | 2 ++ .../internal/macro/RisingwaveCreateZstreamMacro.java | 9 ++++++++- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/client.rpt index eabc1a88dc..56d6f619a8 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/client.rpt @@ -145,6 +145,8 @@ write zilla:data.ext ${pgsql:dataEx() write "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n" " SELECT\n" " c.correlation_id,\n" + " c.owner_id,\n" + " c.created_at,\n" " CASE\n" " WHEN balance >= c.amount THEN 'PaymentSent'\n" " ELSE 'PaymentDeclined'\n" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/server.rpt index 12e986d549..e244b3f962 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.zstream/server.rpt @@ -147,6 +147,8 @@ read zilla:data.ext ${pgsql:dataEx() read "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n" " SELECT\n" " c.correlation_id,\n" + " c.owner_id,\n" + " c.created_at,\n" " CASE\n" " WHEN balance >= c.amount THEN 'PaymentSent'\n" " ELSE 'PaymentDeclined'\n" diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZstreamMacro.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZstreamMacro.java index dfeec8c604..c12c52af25 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZstreamMacro.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZstreamMacro.java @@ -278,6 +278,7 @@ private final class CreateHandlerMaterializedViewState implements RisingwaveMacr CREATE MATERIALIZED VIEW %s.%s AS SELECT c.correlation_id, + %s, %s FROM %s.%s_commands c %s @@ -316,6 +317,11 @@ public void onStarted( } } + String include = command.columns().stream() + .filter(c -> ZILLA_MAPPINGS.containsKey(c.generatedAlways())) + .map(c -> "c.%s".formatted(c.name())) + .collect(Collectors.joining(",\n ")); + String commandName = command.commandHandlers().entrySet().stream() .filter(e -> e.getValue().equals(name)) .map(Map.Entry::getKey) @@ -324,7 +330,8 @@ public void onStarted( String where = "c.%s = '%s'".formatted(command.dispatchOn(), commandName); - String sqlQuery = String.format(sqlFormat, systemSchema, name, columns, systemSchema, from, join, where); + String sqlQuery = String.format(sqlFormat, systemSchema, name, include, columns, + systemSchema, from, join, where); handler.doExecuteSystemClient(traceId, authorization, sqlQuery); }