Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config param to pass additional parameters to confluent-kafka-python #1505

Merged
merged 10 commits into from
Jun 11, 2024
1 change: 1 addition & 0 deletions .codespell-whitelist.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
dependant
unsecure
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
"filename": "docs/docs/en/release.md",
"hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450",
"is_verified": false,
"line_number": 1308,
"line_number": 1325,
"is_secret": false
}
],
Expand Down Expand Up @@ -163,5 +163,5 @@
}
]
},
"generated_at": "2024-06-06T04:30:54Z"
"generated_at": "2024-06-10T09:56:52Z"
}
14 changes: 14 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ search:
- [Acknowledgement](confluent/ack.md)
- [Message Information](confluent/message.md)
- [Security Configuration](confluent/security.md)
- [Additional Configuration](confluent/additional-configuration.md)
- [RabbitMQ](rabbit/index.md)
- [Subscription](rabbit/examples/index.md)
- [Direct](rabbit/examples/direct.md)
Expand Down Expand Up @@ -443,6 +444,19 @@ search:
- [TopicPartition](api/faststream/confluent/client/TopicPartition.md)
- [check_msg_error](api/faststream/confluent/client/check_msg_error.md)
- [create_topics](api/faststream/confluent/client/create_topics.md)
- config
- [BrokerAddressFamily](api/faststream/confluent/config/BrokerAddressFamily.md)
- [BuiltinFeatures](api/faststream/confluent/config/BuiltinFeatures.md)
- [ClientDNSLookup](api/faststream/confluent/config/ClientDNSLookup.md)
- [CompressionCodec](api/faststream/confluent/config/CompressionCodec.md)
- [CompressionType](api/faststream/confluent/config/CompressionType.md)
- [ConfluentConfig](api/faststream/confluent/config/ConfluentConfig.md)
- [Debug](api/faststream/confluent/config/Debug.md)
- [GroupProtocol](api/faststream/confluent/config/GroupProtocol.md)
- [IsolationLevel](api/faststream/confluent/config/IsolationLevel.md)
- [OffsetStoreMethod](api/faststream/confluent/config/OffsetStoreMethod.md)
- [SASLOAUTHBearerMethod](api/faststream/confluent/config/SASLOAUTHBearerMethod.md)
- [SecurityProtocol](api/faststream/confluent/config/SecurityProtocol.md)
- fastapi
- [Context](api/faststream/confluent/fastapi/Context.md)
- [KafkaRouter](api/faststream/confluent/fastapi/KafkaRouter.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.BrokerAddressFamily
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/BuiltinFeatures.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.BuiltinFeatures
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/ClientDNSLookup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.ClientDNSLookup
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/CompressionCodec.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.CompressionCodec
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/CompressionType.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.CompressionType
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/ConfluentConfig.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.ConfluentConfig
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/Debug.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.Debug
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/GroupProtocol.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.GroupProtocol
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/IsolationLevel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.IsolationLevel
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/OffsetStoreMethod.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.OffsetStoreMethod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.SASLOAUTHBearerMethod
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/confluent/config/SecurityProtocol.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.confluent.config.SecurityProtocol
27 changes: 27 additions & 0 deletions docs/docs/en/confluent/additional-configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 10
---

# Passing Additional Configuration to confluent-kafka-python

The `confluent-kafka-python` package is a Python wrapper around [librdkakfa](https://github.com/confluentinc/librdkafka), which is a C/C++ client library for Apache Kafka.

`confluent-kafka-python` accepts a `config` dictionary that is then passed on to `librdkafka`. `librdkafka` provides plenty of [configuration properties](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) to configure the Kafka client.

**FastStream** also provides users with the ability to pass the config dictionary to `librdkafka` to provide greater customizability.

## Example

In the following example, we are setting the parameter `topic.metadata.refresh.fast.interval.ms`'s value to `300` instead of the default value `100` via the `config` parameter.

```python linenums="1" hl_lines="15 16"
{! docs_src/confluent/additional_config/app.py !}
```

Similarly, you could use the `config` parameter to pass any [configuration properties](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) to `librdkafka`.
28 changes: 24 additions & 4 deletions docs/docs/en/confluent/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ This chapter discusses the security options available in **FastStream** and how

**Usage:**

```python linenums="1" hl_lines="4 7 9"
```python linenums="1" hl_lines="2 4 6"
{! docs_src/confluent/security/basic.py !}
```

Expand All @@ -33,7 +33,7 @@ This chapter discusses the security options available in **FastStream** and how
**Usage:**

```python linenums="1"
{! docs_src/confluent/security/plaintext.py [ln:1-10.25,11-] !}
{! docs_src/confluent/security/plaintext.py !}
```

**Using any SASL authentication without SSL:**
Expand All @@ -58,10 +58,30 @@ If the user does not want to use SSL encryption without the warning getting logg

=== "SCRAM256"
```python linenums="1"
{!> docs_src/confluent/security/sasl_scram256.py [ln:1-10.25,11-] !}
{!> docs_src/confluent/security/sasl_scram256.py [ln:1-6.25,7-] !}
```

=== "SCRAM512"
```python linenums="1"
{!> docs_src/confluent/security/sasl_scram512.py [ln:1-10.25,11-] !}
{!> docs_src/confluent/security/sasl_scram512.py [ln:1-6.25,7-] !}
```

### 4. Other security related usecases

**Purpose**: If you want to pass additional values to `confluent-kafka-python`, you can pass a dictionary called `config` to `KafkaBroker`. For example, to pass your own certificate file:

**Usage:**

```python
from faststream.confluent import KafkaBroker
from faststream.security import SASLPlaintext

security = SASLPlaintext(
username="admin",
password="password", # pragma: allowlist secret
)

config = {"ssl.ca.location": "~/my_certs/CRT_cacerts.pem"}

broker = KafkaBroker("localhost:9092", security=security, config=config)
```
1 change: 1 addition & 0 deletions docs/docs/navigation_template.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ search:
- [Acknowledgement](confluent/ack.md)
- [Message Information](confluent/message.md)
- [Security Configuration](confluent/security.md)
- [Additional Configuration](confluent/additional-configuration.md)
- [RabbitMQ](rabbit/index.md)
- [Subscription](rabbit/examples/index.md)
- [Direct](rabbit/examples/direct.md)
Expand Down
Empty file.
22 changes: 22 additions & 0 deletions docs/docs_src/confluent/additional_config/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from pydantic import BaseModel, Field

from faststream import FastStream, Logger
from faststream.confluent import KafkaBroker


class HelloWorld(BaseModel):
msg: str = Field(
...,
examples=["Hello"],
description="Demo hello world message",
)


config = {"topic.metadata.refresh.fast.interval.ms": 300}
broker = KafkaBroker("localhost:9092", config=config)
app = FastStream(broker)


@broker.subscriber("hello_world")
async def on_hello_world(msg: HelloWorld, logger: Logger):
logger.info(msg)
5 changes: 1 addition & 4 deletions docs/docs_src/confluent/security/basic.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import ssl

from faststream.confluent import KafkaBroker
from faststream.security import BaseSecurity

ssl_context = ssl.create_default_context()
security = BaseSecurity(ssl_context=ssl_context)
security = BaseSecurity(use_ssl=True)

broker = KafkaBroker("localhost:9092", security=security)
5 changes: 1 addition & 4 deletions docs/docs_src/confluent/security/plaintext.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import ssl

from faststream.confluent import KafkaBroker
from faststream.security import SASLPlaintext

ssl_context = ssl.create_default_context()
security = SASLPlaintext(
ssl_context=ssl_context,
username="admin",
password="password", # pragma: allowlist secret
use_ssl=True,
)

broker = KafkaBroker("localhost:9092", security=security)
5 changes: 1 addition & 4 deletions docs/docs_src/confluent/security/sasl_scram256.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import ssl

from faststream.confluent import KafkaBroker
from faststream.security import SASLScram256

ssl_context = ssl.create_default_context()
security = SASLScram256(
ssl_context=ssl_context,
username="admin",
password="password", # pragma: allowlist secret
use_ssl=True,
)

broker = KafkaBroker("localhost:9092", security=security)
5 changes: 1 addition & 4 deletions docs/docs_src/confluent/security/sasl_scram512.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import ssl

from faststream.confluent import KafkaBroker
from faststream.security import SASLScram512

ssl_context = ssl.create_default_context()
security = SASLScram512(
ssl_context=ssl_context,
username="admin",
password="password", # pragma: allowlist secret
use_ssl=True,
)

broker = KafkaBroker("localhost:9092", security=security)
Loading
Loading