Skip to content

Commit

Permalink
#138 - kafka.read merge fixes - topic and group to block
Browse files Browse the repository at this point in the history
  • Loading branch information
wheelly committed Feb 11, 2024
1 parent 7b501e1 commit 69f1235
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 16 deletions.
4 changes: 2 additions & 2 deletions core/src/datayoga_core/blocks/kafka/read/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def init(self, context: Optional[Context] = None):
self.port = int(connection_details.get("port", 9092))
logger.debug(f"Connection details: {json.dumps(connection_details)}")
self.bootstrap_servers = connection_details.get("bootstrap_servers")
self.group = connection_details.get("group")
self.topic = connection_details.get("topic", "integration-tests")
self.group = self.properties.get("group")
self.topic = self.properties["topic"]
self.seek_to_beginning = self.properties.get("seek_to_beginning", False)
self.snapshot = self.properties.get("snapshot", False)
self.consumer = Consumer({
Expand Down
11 changes: 10 additions & 1 deletion core/src/datayoga_core/blocks/kafka/read/block.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@
"description": "host name",
"type": "string"
},
"topic": {
"description": "Kafka topic",
"type": "string"
},
"group": {
"description": "Kafka group",
"type": "string"
},
"seek_to_beginning": {
"description": "Consumer seek to beginning",
"type": "boolean"
Expand All @@ -19,6 +27,7 @@
}
},
"required": [
"bootstrap_servers"
"bootstrap_servers",
"topic"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,15 @@
"bootstrap_servers": {
"description": "Kafka Hosts",
"type": "string"
},
"topic": {
"description": "Kafka Topics",
"type": "string"
},
"group": {
"description": "Kafka Group",
"type": "string"
}
},
"additionalProperties": false,
"required": ["type", "bootstrap_servers", "topic"],
"required": ["type", "bootstrap_servers"],
"examples": [
{
"kafka": {
"type": "kafka",
"bootstrap_servers": ["localhost"],
"topic": "test"
"bootstrap_servers": ["localhost:9092"]
}
}
]
Expand Down
2 changes: 0 additions & 2 deletions integration-tests/resources/connections.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,3 @@ cassandra:
kafka:
type: kafka
bootstrap_servers: "localhost:9093"
group: "integration-tests"
topic: "integration-tests"
2 changes: 2 additions & 0 deletions integration-tests/resources/jobs/tests/kafka_to_stdout.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ input:
uses: kafka.read
with:
bootstrap_servers: kafka
topic: "integration-tests"
group: "integration-tests"
snapshot: true
seek_to_beginning: true
steps:
Expand Down

0 comments on commit 69f1235

Please sign in to comment.