diff --git a/core/src/datayoga_core/blocks/relational/write/block.py b/core/src/datayoga_core/blocks/relational/write/block.py index b0d42fd5..ed302021 100644 --- a/core/src/datayoga_core/blocks/relational/write/block.py +++ b/core/src/datayoga_core/blocks/relational/write/block.py @@ -187,6 +187,8 @@ def execute(self, statement: Any, records: List[Dict[str, Any]]): if not connected: raise ConnectionError(e) from e + raise + def execute_upsert(self, records: List[Dict[str, Any]]): """Upserts records into the table.""" if records: diff --git a/schemas/job.schema.json b/schemas/job.schema.json index 0c71fc46..555e500f 100644 --- a/schemas/job.schema.json +++ b/schemas/job.schema.json @@ -19,28 +19,28 @@ "properties": { "uses": { "enum": [ - "remove_field", - "std.read", - "std.write", + "files.read_csv", + "redis.read_stream", + "redis.write", + "redis.lookup", "map", - "relational.read", "relational.write", "relational.lookup", - "jinja_template", - "sequence", - "add_field", - "filter", - "files.read_csv", + "relational.read", "azure.read_event_hub", - "parquet.read", "parquet.write", - "http.receiver", - "http.write", - "rename_field", + "parquet.read", + "std.write", + "std.read", + "jinja_template", + "filter", "cassandra.write", - "redis.read_stream", - "redis.write", - "redis.lookup" + "add_field", + "remove_field", + "sequence", + "http.write", + "http.receiver", + "rename_field" ] } }, @@ -51,7 +51,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "remove_field" + "const": "files.read_csv" } }, "required": ["uses"] @@ -59,50 +59,63 @@ "then": { "properties": { "with": { - "title": "remove_field", - "description": "Remove fields", + "title": "files.read_csv", + "description": "Read data from CSV", "type": "object", - "oneOf": [ - { - "description": "Remove multiple fields", - "properties": { - "fields": { - "type": "array", - "description": "Fields", - "items": { - "type": "object", - "properties": { - "field": { - "description": "Field", - "type": "string" - } - }, - "additionalProperties": false, - "required": ["field"] - } - } - }, - "required": ["fields"], - "additionalProperties": false, - "examples": [ - { - "fields": [ - { "field": "credit_card" }, - { "field": "name.mname" } - ] - } - ] + "properties": { + "file": { + "description": "Filename. Can contain a regexp or glob expression", + "type": "string" }, - { - "description": "Remove one field", - "properties": { - "field": { "description": "Field", "type": "string" } - }, - "additionalProperties": false, - "required": ["field"], - "examples": [{ "field": "credit_card" }] + "encoding": { + "description": "Encoding to use for reading the file", + "type": "string", + "default": "utf-8" + }, + "fields": { + "type": "array", + "title": "List of columns to use", + "description": "List of columns to use for extract", + "default": null, + "examples": [["fname", "lname"]], + "minLength": 1, + "additionalItems": true, + "items": { + "type": "string", + "description": "field name", + "examples": ["fname"] + } + }, + "skip": { + "description": "Number of lines to skip", + "type": "number", + "minimum": 0, + "default": 0 + }, + "delimiter": { + "description": "Delimiter to use for splitting the csv records", + "type": "string", + "minLength": 1, + "maxLength": 1, + "default": "," + }, + "batch_size": { + "description": "Number of records to read per batch", + "type": "number", + "minimum": 1, + "default": 1000 + }, + "quotechar": { + "description": "A one-character string used to quote fields containing special characters, such as the delimiter or quotechar, or which contain new-line characters. It defaults to '", + "type": "string", + "minLength": 1, + "maxLength": 1, + "default": "\"" } - ] + }, + "additionalProperties": false, + "required": ["file"], + "examples": [{ "file": "archive.csv", "delimiter": ";" }] } } } @@ -113,7 +126,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "std.read" + "const": "redis.read_stream" } }, "required": ["uses"] @@ -121,8 +134,28 @@ "then": { "properties": { "with": { - "title": "std.read", - "description": "Read from the standard input" + "title": "redis.read_stream", + "description": "Read from Redis stream", + "type": "object", + "properties": { + "connection": { + "description": "Connection name", + "type": "string" + }, + "stream_name": { + "type": "string", + "title": "Source stream name", + "description": "Source stream name" + }, + "snapshot": { + "type": "boolean", + "title": "Snapshot current entries and quit", + "description": "Snapshot current entries and quit", + "default": false + } + }, + "additionalProperties": false, + "required": ["connection", "stream_name"] } } } @@ -133,7 +166,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "std.write" + "const": "redis.write" } }, "required": ["uses"] @@ -141,8 +174,98 @@ "then": { "properties": { "with": { - "title": "std.write", - "description": "Write to the standard output" + "title": "redis.write", + "description": "Write to a Redis data structure", + "type": "object", + "properties": { + "connection": { + "title": "Connection name", + "type": "string" + }, + "command": { + "enum": [ + "HSET", + "SADD", + "XADD", + "RPUSH", + "LPUSH", + "SET", + "ZADD" + ], + "default": "HSET", + "type": "string", + "title": "Redis command", + "description": "Redis command" + }, + "key": { + "description": "Field to use as the Redis key", + "type": "object", + "properties": { + "expression": { + "description": "Expression", + "type": "string" + }, + "language": { + "description": "Language", + "type": "string", + "enum": ["jmespath", "sql"] + } + }, + "required": ["expression", "language"] + } + }, + "additionalProperties": false, + "required": ["connection", "key"] + } + } + } + }, + { + "if": { + "properties": { + "uses": { + "description": "Block type", + "type": "string", + "const": "redis.lookup" + } + }, + "required": ["uses"] + }, + "then": { + "properties": { + "with": { + "title": "redis.lookup", + "description": "Lookup data from Redis using the given command and key", + "type": "object", + "properties": { + "connection": { + "title": "Connection name", + "type": "string" + }, + "cmd": { + "title": "Redis command", + "description": "The command to execute", + "type": "string" + }, + "args": { + "title": "Redis command arguments", + "description": "The list of expressions produces arguments", + "type": "array", + "items": { "type": "string" } + }, + "language": { + "description": "Language", + "type": "string", + "enum": ["jmespath", "sql"] + }, + "field": { + "type": "string", + "title": "Target field", + "description": "The field to write the result to" + } + }, + "additionalProperties": false, + "required": ["connection", "cmd", "args", "language", "field"] } } } @@ -203,7 +326,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "relational.read" + "const": "relational.write" } }, "required": ["uses"] @@ -211,71 +334,8 @@ "then": { "properties": { "with": { - "title": "relational.read", - "description": "Read a table from an SQL-compatible data store", - "type": "object", - "additionalProperties": false, - "examples": [ - { - "id": "read_snowflake", - "type": "relational.read", - "properties": { - "connection": "eu_datalake", - "table": "employees", - "schema": "dbo" - } - } - ], - "properties": { - "connection": { - "type": "string", - "title": "The connection to use for loading", - "description": "Logical connection name as defined in the connections.dy.yaml", - "examples": ["europe_db", "target", "eu_dwh"] - }, - "schema": { - "type": "string", - "title": "The table schema of the table", - "description": "If left blank, the default schema of this connection will be used as defined in the connections.dy.yaml", - "examples": ["dbo"] - }, - "table": { - "type": "string", - "title": "The table name", - "description": "Table name", - "examples": ["employees"] - }, - "columns": { - "type": "array", - "title": "Optional subset of columns to load", - "items": { - "type": ["string", "object"], - "title": "name of column" - }, - "examples": [["fname", { "lname": "last_name" }]] - } - }, - "required": ["connection", "table"] - } - } - } - }, - { - "if": { - "properties": { - "uses": { - "description": "Block type", - "type": "string", - "const": "relational.write" - } - }, - "required": ["uses"] - }, - "then": { - "properties": { - "with": { - "title": "relational.write", - "description": "Write into a SQL-compatible data store", + "title": "relational.write", + "description": "Write into a SQL-compatible data store", "type": "object", "additionalProperties": false, "examples": [ @@ -464,7 +524,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "jinja_template" + "const": "relational.read" } }, "required": ["uses"] @@ -472,28 +532,51 @@ "then": { "properties": { "with": { - "title": "jinja_template", - "description": "Apply Jinja template to a field", + "title": "relational.read", + "description": "Read a table from an SQL-compatible data store", "type": "object", - "properties": { - "field": { "description": "Field", "type": "string" }, - "template": { - "description": "Jinja Template", - "type": "string" - } - }, "additionalProperties": false, - "required": ["field", "template"], "examples": [ { - "field": "name.full_name", - "template": "{{ name.fname }} {{ name.lname }}" + "id": "read_snowflake", + "type": "relational.read", + "properties": { + "connection": "eu_datalake", + "table": "employees", + "schema": "dbo" + } + } + ], + "properties": { + "connection": { + "type": "string", + "title": "The connection to use for loading", + "description": "Logical connection name as defined in the connections.dy.yaml", + "examples": ["europe_db", "target", "eu_dwh"] }, - { - "field": "name.fname_upper", - "template": "{{ name.fname | upper }}" + "schema": { + "type": "string", + "title": "The table schema of the table", + "description": "If left blank, the default schema of this connection will be used as defined in the connections.dy.yaml", + "examples": ["dbo"] + }, + "table": { + "type": "string", + "title": "The table name", + "description": "Table name", + "examples": ["employees"] + }, + "columns": { + "type": "array", + "title": "Optional subset of columns to load", + "items": { + "type": ["string", "object"], + "title": "name of column" + }, + "examples": [["fname", { "lname": "last_name" }]] } - ] + }, + "required": ["connection", "table"] } } } @@ -504,7 +587,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "sequence" + "const": "azure.read_event_hub" } }, "required": ["uses"] @@ -512,29 +595,43 @@ "then": { "properties": { "with": { - "title": "sequence", - "description": "Add a sequence number field to data", + "title": "azure.read_event_hub", + "description": "Read from Azure Event Hub", "type": "object", - "additionalProperties": false, - "examples": [], - "required": [], "properties": { - "field": { + "event_hub_connection_string": { "type": "string", - "title": "Name of new sequence field" + "description": "The connection string for the Azure Event Hub namespace." }, - "start": { - "type": "number", - "title": "Start entry", - "default": 1, - "examples": [] + "event_hub_consumer_group_name": { + "type": "string", + "description": "The name of the consumer group to read events from." }, - "increment": { - "type": "number", - "title": "Increment between sequences", - "examples": [] + "event_hub_name": { + "type": "string", + "description": "The name of the Azure Event Hub." + }, + "checkpoint_store_connection_string": { + "type": "string", + "description": "The connection string for the Azure Storage account used as the checkpoint store." + }, + "checkpoint_store_container_name": { + "type": "string", + "description": "The name of the container within the checkpoint store to store the checkpoints." + }, + "batch_size": { + "type": "integer", + "description": "The maximum number of events to receive in each batch.", + "default": 300 } - } + }, + "required": [ + "event_hub_connection_string", + "event_hub_consumer_group_name", + "event_hub_name", + "checkpoint_store_connection_string", + "checkpoint_store_container_name" + ] } } } @@ -545,7 +642,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "add_field" + "const": "parquet.write" } }, "required": ["uses"] @@ -553,82 +650,18 @@ "then": { "properties": { "with": { - "title": "add_field", - "description": "Add fields to a record", + "title": "parquet.write", + "description": "Write data to parquet", "type": "object", - "oneOf": [ - { - "description": "Add multiple fields", - "properties": { - "fields": { - "type": "array", - "description": "Fields", - "items": { - "type": "object", - "properties": { - "field": { - "description": "Field", - "type": "string" - }, - "expression": { - "description": "Expression", - "type": "string" - }, - "language": { - "description": "Language", - "type": "string", - "enum": ["jmespath", "sql"] - } - }, - "additionalProperties": false, - "required": ["field", "expression", "language"] - } - } - }, - "required": ["fields"], - "additionalProperties": false, - "examples": [ - { - "fields": [ - { - "field": "name.full_name", - "language": "jmespath", - "expression": "concat([name.fname, ' ', name.lname])" - }, - { - "field": "name.fname_upper", - "language": "jmespath", - "expression": "upper(name.fname)" - } - ] - } - ] - }, - { - "description": "Add one field", - "properties": { - "field": { "description": "Field", "type": "string" }, - "expression": { - "description": "Expression", - "type": "string" - }, - "language": { - "description": "Language", - "type": "string", - "enum": ["jmespath", "sql"] - } - }, - "additionalProperties": false, - "required": ["field", "expression", "language"], - "examples": [ - { - "field": "country", - "language": "sql", - "expression": "country_code || ' - ' || UPPER(country_name)" - } - ] + "properties": { + "file": { + "description": "Filename. Can contain a regexp or glob expression", + "type": "string" } - ] + }, + "additionalProperties": false, + "required": ["file"], + "examples": [{ "file": "data.parquet" }] } } } @@ -639,7 +672,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "filter" + "const": "parquet.read" } }, "required": ["uses"] @@ -647,23 +680,18 @@ "then": { "properties": { "with": { - "title": "filter", - "description": "Filter records", + "title": "parquet.read", + "description": "Read data from parquet", "type": "object", "properties": { - "expression": { - "description": "Expression", + "file": { + "description": "Filename. Can contain a regexp or glob expression", "type": "string" - }, - "language": { - "description": "Language", - "type": "string", - "enum": ["jmespath", "sql"] } }, "additionalProperties": false, - "required": ["expression", "language"], - "examples": [{ "language": "sql", "expression": "age>20" }] + "required": ["file"], + "examples": [{ "file": "data.parquet" }] } } } @@ -674,7 +702,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "files.read_csv" + "const": "std.write" } }, "required": ["uses"] @@ -682,63 +710,8 @@ "then": { "properties": { "with": { - "title": "files.read_csv", - "description": "Read data from CSV", - "type": "object", - "properties": { - "file": { - "description": "Filename. Can contain a regexp or glob expression", - "type": "string" - }, - "encoding": { - "description": "Encoding to use for reading the file", - "type": "string", - "default": "utf-8" - }, - "fields": { - "type": "array", - "title": "List of columns to use", - "description": "List of columns to use for extract", - "default": null, - "examples": [["fname", "lname"]], - "minLength": 1, - "additionalItems": true, - "items": { - "type": "string", - "description": "field name", - "examples": ["fname"] - } - }, - "skip": { - "description": "Number of lines to skip", - "type": "number", - "minimum": 0, - "default": 0 - }, - "delimiter": { - "description": "Delimiter to use for splitting the csv records", - "type": "string", - "minLength": 1, - "maxLength": 1, - "default": "," - }, - "batch_size": { - "description": "Number of records to read per batch", - "type": "number", - "minimum": 1, - "default": 1000 - }, - "quotechar": { - "description": "A one-character string used to quote fields containing special characters, such as the delimiter or quotechar, or which contain new-line characters. It defaults to '", - "type": "string", - "minLength": 1, - "maxLength": 1, - "default": "\"" - } - }, - "additionalProperties": false, - "required": ["file"], - "examples": [{ "file": "archive.csv", "delimiter": ";" }] + "title": "std.write", + "description": "Write to the standard output" } } } @@ -749,7 +722,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "azure.read_event_hub" + "const": "std.read" } }, "required": ["uses"] @@ -757,43 +730,8 @@ "then": { "properties": { "with": { - "title": "azure.read_event_hub", - "description": "Read from Azure Event Hub", - "type": "object", - "properties": { - "event_hub_connection_string": { - "type": "string", - "description": "The connection string for the Azure Event Hub namespace." - }, - "event_hub_consumer_group_name": { - "type": "string", - "description": "The name of the consumer group to read events from." - }, - "event_hub_name": { - "type": "string", - "description": "The name of the Azure Event Hub." - }, - "checkpoint_store_connection_string": { - "type": "string", - "description": "The connection string for the Azure Storage account used as the checkpoint store." - }, - "checkpoint_store_container_name": { - "type": "string", - "description": "The name of the container within the checkpoint store to store the checkpoints." - }, - "batch_size": { - "type": "integer", - "description": "The maximum number of events to receive in each batch.", - "default": 300 - } - }, - "required": [ - "event_hub_connection_string", - "event_hub_consumer_group_name", - "event_hub_name", - "checkpoint_store_connection_string", - "checkpoint_store_container_name" - ] + "title": "std.read", + "description": "Read from the standard input" } } } @@ -804,7 +742,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "parquet.read" + "const": "jinja_template" } }, "required": ["uses"] @@ -812,18 +750,28 @@ "then": { "properties": { "with": { - "title": "parquet.read", - "description": "Read data from parquet", + "title": "jinja_template", + "description": "Apply Jinja template to a field", "type": "object", "properties": { - "file": { - "description": "Filename. Can contain a regexp or glob expression", + "field": { "description": "Field", "type": "string" }, + "template": { + "description": "Jinja Template", "type": "string" } }, "additionalProperties": false, - "required": ["file"], - "examples": [{ "file": "data.parquet" }] + "required": ["field", "template"], + "examples": [ + { + "field": "name.full_name", + "template": "{{ name.fname }} {{ name.lname }}" + }, + { + "field": "name.fname_upper", + "template": "{{ name.fname | upper }}" + } + ] } } } @@ -834,7 +782,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "parquet.write" + "const": "filter" } }, "required": ["uses"] @@ -842,18 +790,23 @@ "then": { "properties": { "with": { - "title": "parquet.write", - "description": "Write data to parquet", + "title": "filter", + "description": "Filter records", "type": "object", "properties": { - "file": { - "description": "Filename. Can contain a regexp or glob expression", + "expression": { + "description": "Expression", "type": "string" + }, + "language": { + "description": "Language", + "type": "string", + "enum": ["jmespath", "sql"] } }, "additionalProperties": false, - "required": ["file"], - "examples": [{ "file": "data.parquet" }] + "required": ["expression", "language"], + "examples": [{ "language": "sql", "expression": "age>20" }] } } } @@ -864,7 +817,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "http.receiver" + "const": "cassandra.write" } }, "required": ["uses"] @@ -872,23 +825,74 @@ "then": { "properties": { "with": { - "title": "http.receiver", - "description": "Receives HTTP requests and process the data.", + "title": "cassandra.write", + "description": "Write into a Cassandra data store", "type": "object", + "additionalProperties": false, + "examples": [ + { + "id": "load_snowflake", + "type": "relational.write", + "properties": { + "connection": "eu_datalake", + "table": "employees", + "schema": "dbo", + "load_strategy": "APPEND" + } + } + ], "properties": { - "host": { - "description": "Host to listen", + "connection": { "type": "string", - "default": "0.0.0.0" + "title": "The connection to use for loading", + "description": "Logical connection name as defined in the connections.dy.yaml", + "examples": ["europe_db", "target", "eu_dwh"] }, - "port": { - "description": "Port to listen", - "type": "integer", - "default": 8080 + "keyspace": { + "type": "string", + "title": "Keyspace", + "description": "Keyspace", + "examples": ["employees"] + }, + "table": { + "type": "string", + "title": "The target table name", + "description": "Target table name", + "examples": ["employees"] + }, + "keys": { + "type": "array", + "title": "Business keys", + "items": { + "type": ["string", "object"], + "title": "name of column" + }, + "examples": [["fname", { "lname": "last_name" }]] + }, + "mapping": { + "type": "array", + "title": "Fields to write", + "items": { + "type": ["string", "object"], + "title": "name of column" + }, + "examples": [ + ["fname", { "lname": "last_name" }, "address", "gender"] + ] + }, + "opcode_field": { + "type": "string", + "description": "Name of the field in the payload that holds the operation (c - create, d - delete, u - update) for this record in the DB" } }, - "additionalProperties": false, - "examples": [{ "host": "localhost", "port": 8080 }] + "required": [ + "connection", + "keyspace", + "table", + "keys", + "mapping", + "opcode_field" + ] } } } @@ -899,7 +903,7 @@ "uses": { "description": "Block type", "type": "string", - "const": "http.write" + "const": "add_field" } }, "required": ["uses"] @@ -907,23 +911,220 @@ "then": { "properties": { "with": { - "title": "http.write", - "description": "Write data using an HTTP request", + "title": "add_field", + "description": "Add fields to a record", "type": "object", - "properties": { - "connection": { - "type": "string", - "title": "The connection to use for the HTTP request", - "description": "Logical connection name as defined in the connections.dy.yaml", - "examples": ["api_connection", "external_service"] - }, - "endpoint": { - "oneOf": [ - { - "type": "string", - "title": "API Endpoint", - "description": "The endpoint URL for the HTTP request", - "examples": ["/users"] + "oneOf": [ + { + "description": "Add multiple fields", + "properties": { + "fields": { + "type": "array", + "description": "Fields", + "items": { + "type": "object", + "properties": { + "field": { + "description": "Field", + "type": "string" + }, + "expression": { + "description": "Expression", + "type": "string" + }, + "language": { + "description": "Language", + "type": "string", + "enum": ["jmespath", "sql"] + } + }, + "additionalProperties": false, + "required": ["field", "expression", "language"] + } + } + }, + "required": ["fields"], + "additionalProperties": false, + "examples": [ + { + "fields": [ + { + "field": "name.full_name", + "language": "jmespath", + "expression": "concat([name.fname, ' ', name.lname])" + }, + { + "field": "name.fname_upper", + "language": "jmespath", + "expression": "upper(name.fname)" + } + ] + } + ] + }, + { + "description": "Add one field", + "properties": { + "field": { "description": "Field", "type": "string" }, + "expression": { + "description": "Expression", + "type": "string" + }, + "language": { + "description": "Language", + "type": "string", + "enum": ["jmespath", "sql"] + } + }, + "additionalProperties": false, + "required": ["field", "expression", "language"], + "examples": [ + { + "field": "country", + "language": "sql", + "expression": "country_code || ' - ' || UPPER(country_name)" + } + ] + } + ] + } + } + } + }, + { + "if": { + "properties": { + "uses": { + "description": "Block type", + "type": "string", + "const": "remove_field" + } + }, + "required": ["uses"] + }, + "then": { + "properties": { + "with": { + "title": "remove_field", + "description": "Remove fields", + "type": "object", + "oneOf": [ + { + "description": "Remove multiple fields", + "properties": { + "fields": { + "type": "array", + "description": "Fields", + "items": { + "type": "object", + "properties": { + "field": { + "description": "Field", + "type": "string" + } + }, + "additionalProperties": false, + "required": ["field"] + } + } + }, + "required": ["fields"], + "additionalProperties": false, + "examples": [ + { + "fields": [ + { "field": "credit_card" }, + { "field": "name.mname" } + ] + } + ] + }, + { + "description": "Remove one field", + "properties": { + "field": { "description": "Field", "type": "string" } + }, + "additionalProperties": false, + "required": ["field"], + "examples": [{ "field": "credit_card" }] + } + ] + } + } + } + }, + { + "if": { + "properties": { + "uses": { + "description": "Block type", + "type": "string", + "const": "sequence" + } + }, + "required": ["uses"] + }, + "then": { + "properties": { + "with": { + "title": "sequence", + "description": "Add a sequence number field to data", + "type": "object", + "additionalProperties": false, + "examples": [], + "required": [], + "properties": { + "field": { + "type": "string", + "title": "Name of new sequence field" + }, + "start": { + "type": "number", + "title": "Start entry", + "default": 1, + "examples": [] + }, + "increment": { + "type": "number", + "title": "Increment between sequences", + "examples": [] + } + } + } + } + } + }, + { + "if": { + "properties": { + "uses": { + "description": "Block type", + "type": "string", + "const": "http.write" + } + }, + "required": ["uses"] + }, + "then": { + "properties": { + "with": { + "title": "http.write", + "description": "Write data using an HTTP request", + "type": "object", + "properties": { + "connection": { + "type": "string", + "title": "The connection to use for the HTTP request", + "description": "Logical connection name as defined in the connections.dy.yaml", + "examples": ["api_connection", "external_service"] + }, + "endpoint": { + "oneOf": [ + { + "type": "string", + "title": "API Endpoint", + "description": "The endpoint URL for the HTTP request", + "examples": ["/users"] }, { "type": "object", @@ -1098,6 +1299,41 @@ } } }, + { + "if": { + "properties": { + "uses": { + "description": "Block type", + "type": "string", + "const": "http.receiver" + } + }, + "required": ["uses"] + }, + "then": { + "properties": { + "with": { + "title": "http.receiver", + "description": "Receives HTTP requests and process the data.", + "type": "object", + "properties": { + "host": { + "description": "Host to listen", + "type": "string", + "default": "0.0.0.0" + }, + "port": { + "description": "Port to listen", + "type": "integer", + "default": 8080 + } + }, + "additionalProperties": false, + "examples": [{ "host": "localhost", "port": 8080 }] + } + } + } + }, { "if": { "properties": { @@ -1181,242 +1417,6 @@ } } } - }, - { - "if": { - "properties": { - "uses": { - "description": "Block type", - "type": "string", - "const": "cassandra.write" - } - }, - "required": ["uses"] - }, - "then": { - "properties": { - "with": { - "title": "cassandra.write", - "description": "Write into a Cassandra data store", - "type": "object", - "additionalProperties": false, - "examples": [ - { - "id": "load_snowflake", - "type": "relational.write", - "properties": { - "connection": "eu_datalake", - "table": "employees", - "schema": "dbo", - "load_strategy": "APPEND" - } - } - ], - "properties": { - "connection": { - "type": "string", - "title": "The connection to use for loading", - "description": "Logical connection name as defined in the connections.dy.yaml", - "examples": ["europe_db", "target", "eu_dwh"] - }, - "keyspace": { - "type": "string", - "title": "Keyspace", - "description": "Keyspace", - "examples": ["employees"] - }, - "table": { - "type": "string", - "title": "The target table name", - "description": "Target table name", - "examples": ["employees"] - }, - "keys": { - "type": "array", - "title": "Business keys", - "items": { - "type": ["string", "object"], - "title": "name of column" - }, - "examples": [["fname", { "lname": "last_name" }]] - }, - "mapping": { - "type": "array", - "title": "Fields to write", - "items": { - "type": ["string", "object"], - "title": "name of column" - }, - "examples": [ - ["fname", { "lname": "last_name" }, "address", "gender"] - ] - }, - "opcode_field": { - "type": "string", - "description": "Name of the field in the payload that holds the operation (c - create, d - delete, u - update) for this record in the DB" - } - }, - "required": [ - "connection", - "keyspace", - "table", - "keys", - "mapping", - "opcode_field" - ] - } - } - } - }, - { - "if": { - "properties": { - "uses": { - "description": "Block type", - "type": "string", - "const": "redis.read_stream" - } - }, - "required": ["uses"] - }, - "then": { - "properties": { - "with": { - "title": "redis.read_stream", - "description": "Read from Redis stream", - "type": "object", - "properties": { - "connection": { - "description": "Connection name", - "type": "string" - }, - "stream_name": { - "type": "string", - "title": "Source stream name", - "description": "Source stream name" - }, - "snapshot": { - "type": "boolean", - "title": "Snapshot current entries and quit", - "description": "Snapshot current entries and quit", - "default": false - } - }, - "additionalProperties": false, - "required": ["connection", "stream_name"] - } - } - } - }, - { - "if": { - "properties": { - "uses": { - "description": "Block type", - "type": "string", - "const": "redis.write" - } - }, - "required": ["uses"] - }, - "then": { - "properties": { - "with": { - "title": "redis.write", - "description": "Write to a Redis data structure", - "type": "object", - "properties": { - "connection": { - "title": "Connection name", - "type": "string" - }, - "command": { - "enum": [ - "HSET", - "SADD", - "XADD", - "RPUSH", - "LPUSH", - "SET", - "ZADD" - ], - "default": "HSET", - "type": "string", - "title": "Redis command", - "description": "Redis command" - }, - "key": { - "description": "Field to use as the Redis key", - "type": "object", - "properties": { - "expression": { - "description": "Expression", - "type": "string" - }, - "language": { - "description": "Language", - "type": "string", - "enum": ["jmespath", "sql"] - } - }, - "required": ["expression", "language"] - } - }, - "additionalProperties": false, - "required": ["connection", "key"] - } - } - } - }, - { - "if": { - "properties": { - "uses": { - "description": "Block type", - "type": "string", - "const": "redis.lookup" - } - }, - "required": ["uses"] - }, - "then": { - "properties": { - "with": { - "title": "redis.lookup", - "description": "Lookup data from Redis using the given command and key", - "type": "object", - "properties": { - "connection": { - "title": "Connection name", - "type": "string" - }, - "cmd": { - "title": "Redis command", - "description": "The command to execute", - "type": "string" - }, - "args": { - "title": "Redis command arguments", - "description": "The list of expressions produces arguments", - "type": "array", - "items": { "type": "string" } - }, - "language": { - "description": "Language", - "type": "string", - "enum": ["jmespath", "sql"] - }, - "field": { - "type": "string", - "title": "Target field", - "description": "The field to write the result to" - } - }, - "additionalProperties": false, - "required": ["connection", "cmd", "args", "language", "field"] - } - } - } } ] }