-
Notifications
You must be signed in to change notification settings - Fork 173
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update tutorial docs to include confluent code examples (#1131)
* Remove link to spectacularfailure github profile * Update missing docs for confluent in tutorial * Add separate index page for kafka with links to confluent and aiokafka
- Loading branch information
1 parent
5cac95b
commit 7535b24
Showing
121 changed files
with
1,499 additions
and
145 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
--- | ||
# 0.5 - API | ||
# 2 - Release | ||
# 3 - Contributing | ||
# 5 - Template Page | ||
# 10 - Default | ||
search: | ||
boost: 10 | ||
--- | ||
|
||
# Kafka Routing | ||
|
||
## Kafka Overview | ||
|
||
### What is Kafka? | ||
|
||
[Kafka](https://kafka.apache.org/){.external-link target="_blank"} is an open-source distributed streaming platform developed by the Apache Software Foundation. It is designed to handle high-throughput, fault-tolerant, real-time data streaming. Kafka is widely used for building real-time data pipelines and streaming applications. | ||
|
||
### Key Kafka Concepts | ||
|
||
#### 1. Publish-Subscribe Model | ||
|
||
Kafka is built around the publish-subscribe messaging model. In this model, data is published to topics, and multiple consumers can subscribe to these topics to receive the data. This decouples the producers of data from the consumers, allowing for flexibility and scalability. | ||
|
||
#### 2. Topics | ||
|
||
A **topic** in Kafka is a logical channel or category to which messages are published by producers and from which messages are consumed by consumers. Topics are used to organize and categorize data streams. Each topic can have multiple **partitions**, which enable Kafka to distribute data and provide parallelism for both producers and consumers. | ||
|
||
## Kafka Topics | ||
|
||
### Understanding Kafka Topics | ||
|
||
Topics are fundamental to Kafka and serve as the central point of data distribution. Here are some key points about topics: | ||
|
||
- Topics allow you to logically group and categorize messages. | ||
- Each message sent to Kafka is associated with a specific topic. | ||
- Topics can have one or more partitions to enable parallel processing and scaling. | ||
- Consumers subscribe to topics to receive messages. | ||
|
||
## Library support | ||
|
||
`FastStream` provides two different `KafkaBroker`s based on the following libraries: | ||
|
||
- [Confluent Kafka](https://github.com/confluentinc/confluent-kafka-python){.external-link target="_blank"} | ||
- [aiokafka](https://github.com/aio-libs/aiokafka){.external-link target="_blank"} | ||
|
||
### Confluent's Python Client for Apache Kafka | ||
|
||
The Confluent Kafka Python library is developed by Confluent, the company founded by the creators of Apache Kafka. It offers a high-level Kafka producer and consumer API that integrates well with the Kafka ecosystem. The Confluent library provides a comprehensive set of features, including support for Avro serialization, schema registry integration, and various configurations to fine-tune performance. | ||
|
||
Developed by Confluent, this library enjoys strong support from the core team behind Kafka. This often translates to better compatibility with the latest Kafka releases and a more robust feature set. | ||
|
||
Check out [Confluent's KafkaBroker](../confluent/index.md). | ||
|
||
### AIOKafka library | ||
|
||
The `aiokafka` library, is an asynchronous Kafka client for Python, built on top of the `asyncio` framework. It is designed to work seamlessly with asynchronous code, making it suitable for applications with high concurrency requirements. | ||
|
||
Check out [aiokafka's KafkaBroker](../kafka/index.md). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
from faststream import FastStream, ContextRepo | ||
from faststream.confluent import KafkaBroker | ||
from pydantic_settings import BaseSettings | ||
|
||
broker = KafkaBroker() | ||
|
||
app = FastStream(broker) | ||
|
||
class Settings(BaseSettings): | ||
host: str = "localhost:9092" | ||
|
||
@app.on_startup | ||
async def setup(env: str, context: ContextRepo): | ||
settings = Settings(_env_file=env) | ||
await broker.connect(settings.host) | ||
context.set_global("settings", settings) |
Empty file.
18 changes: 18 additions & 0 deletions
18
docs/docs_src/getting_started/context/confluent/annotated.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from typing import Annotated | ||
|
||
from faststream import Context, FastStream | ||
from faststream.confluent import KafkaBroker | ||
from faststream.confluent.message import KafkaMessage | ||
|
||
Message = Annotated[KafkaMessage, Context()] | ||
|
||
broker = KafkaBroker("localhost:9092") | ||
app = FastStream(broker) | ||
|
||
|
||
@broker.subscriber("test") | ||
async def base_handler( | ||
body: str, | ||
message: Message, # get access to raw message | ||
): | ||
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from faststream import Context, FastStream | ||
from faststream.confluent import KafkaBroker | ||
|
||
broker = KafkaBroker("localhost:9092") | ||
app = FastStream(broker) | ||
|
||
|
||
@broker.subscriber("test") | ||
async def base_handler( | ||
body: str, | ||
message=Context(), # get access to raw message | ||
): | ||
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from faststream import Context, FastStream, context | ||
from faststream.confluent import KafkaBroker | ||
|
||
broker = KafkaBroker("localhost:9092") | ||
app = FastStream(broker) | ||
context.set_global("secret", "1") | ||
|
||
@broker.subscriber("test-topic") | ||
async def handle( | ||
secret: int = Context(), | ||
): | ||
assert secret == "1" | ||
|
||
@broker.subscriber("test-topic2") | ||
async def handle_int( | ||
secret: int = Context(cast=True), | ||
): | ||
assert secret == 1 |
18 changes: 18 additions & 0 deletions
18
docs/docs_src/getting_started/context/confluent/custom_global_context.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from faststream import FastStream, ContextRepo, Context | ||
from faststream.confluent import KafkaBroker | ||
|
||
broker = KafkaBroker("localhost:9092") | ||
app = FastStream(broker) | ||
|
||
|
||
@broker.subscriber("test-topic") | ||
async def handle( | ||
msg: str, | ||
secret_str: str=Context(), | ||
): | ||
assert secret_str == "my-perfect-secret" # pragma: allowlist secret | ||
|
||
|
||
@app.on_startup | ||
async def set_global(context: ContextRepo): | ||
context.set_global("secret_str", "my-perfect-secret") |
24 changes: 24 additions & 0 deletions
24
docs/docs_src/getting_started/context/confluent/custom_local_context.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from faststream import Context, FastStream, apply_types | ||
from faststream.confluent import KafkaBroker | ||
from faststream.confluent.annotations import ContextRepo, KafkaMessage | ||
|
||
broker = KafkaBroker("localhost:9092") | ||
app = FastStream(broker) | ||
|
||
|
||
@broker.subscriber("test-topic") | ||
async def handle( | ||
msg: str, | ||
message: KafkaMessage, | ||
context: ContextRepo, | ||
): | ||
with context.scope("correlation_id", message.correlation_id): | ||
call() | ||
|
||
|
||
@apply_types | ||
def call( | ||
message: KafkaMessage, | ||
correlation_id=Context(), | ||
): | ||
assert correlation_id == message.correlation_id |
Oops, something went wrong.