Skip to content

Commit

Permalink
Add kerberos support for confluent broker (#1670)
Browse files Browse the repository at this point in the history
* Add kerberos support for confluent broker

* Update tests
  • Loading branch information
kumaranvpl authored Aug 12, 2024
1 parent ffaf6a0 commit e2af24d
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 1 deletion.
11 changes: 10 additions & 1 deletion docs/docs/en/confluent/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,17 @@ This chapter discusses the security options available in **FastStream** and how
{!> docs_src/confluent/security/sasl_oauthbearer.py [ln:1-8] !}
```

### 5. SASLGSSAPI Object with SSL/TLS

### 5. Other security related usecases
**Purpose:** The `SASLGSSAPI` object is used for authentication using Kerberos.

**Usage:**

```python linenums="1"
{!> docs_src/confluent/security/sasl_gssapi.py [ln:1-10.25,11-] !}
```

### 6. Other security related usecases

**Purpose**: If you want to pass additional values to `confluent-kafka-python`, you can pass a dictionary called `config` to `KafkaBroker`. For example, to pass your own certificate file:

Expand Down
8 changes: 8 additions & 0 deletions docs/docs_src/confluent/security/sasl_gssapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import ssl

from faststream.confluent import KafkaBroker
from faststream.security import SASLGSSAPI

security = SASLGSSAPI(use_ssl=True,)

broker = KafkaBroker("localhost:9092", security=security)
10 changes: 10 additions & 0 deletions faststream/confluent/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from faststream.exceptions import SetupError
from faststream.security import (
SASLGSSAPI,
BaseSecurity,
SASLOAuthBearer,
SASLPlaintext,
Expand Down Expand Up @@ -30,6 +31,8 @@ def parse_security(security: Optional[BaseSecurity]) -> "AnyDict":
return _parse_sasl_scram512(security)
elif isinstance(security, SASLOAuthBearer):
return _parse_sasl_oauthbearer(security)
elif isinstance(security, SASLGSSAPI):
return _parse_sasl_gssapi(security)
elif isinstance(security, BaseSecurity):
return _parse_base_security(security)
else:
Expand Down Expand Up @@ -74,3 +77,10 @@ def _parse_sasl_oauthbearer(security: SASLOAuthBearer) -> "AnyDict":
"security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
"sasl_mechanism": "OAUTHBEARER",
}


def _parse_sasl_gssapi(security: SASLGSSAPI) -> "AnyDict":
return {
"security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
"sasl_mechanism": "GSSAPI",
}
24 changes: 24 additions & 0 deletions tests/a_docs/confluent/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,27 @@ async def test_oathbearer():
producer_call_kwargs["security_protocol"]
== call_kwargs["security_protocol"]
)


@pytest.mark.asyncio()
@pytest.mark.confluent()
async def test_gssapi():
from docs.docs_src.confluent.security.sasl_gssapi import (
broker as gssapi_broker,
)

with patch_aio_consumer_and_producer() as producer:
async with gssapi_broker:
producer_call_kwargs = producer.call_args.kwargs

call_kwargs = {
"sasl_mechanism": "GSSAPI",
"security_protocol": "SASL_SSL",
}

assert call_kwargs.items() <= producer_call_kwargs.items()

assert (
producer_call_kwargs["security_protocol"]
== call_kwargs["security_protocol"]
)
24 changes: 24 additions & 0 deletions tests/asyncapi/confluent/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from faststream.asyncapi.generate import get_app_schema
from faststream.confluent import KafkaBroker
from faststream.security import (
SASLGSSAPI,
BaseSecurity,
SASLOAuthBearer,
SASLPlaintext,
Expand Down Expand Up @@ -195,3 +196,26 @@ async def test_topic(msg: str) -> str:
}

assert schema == sasl_oauthbearer_security_schema


def test_gssapi_security_schema():
ssl_context = ssl.create_default_context()
security = SASLGSSAPI(ssl_context=ssl_context)

broker = KafkaBroker("localhost:9092", security=security)
app = FastStream(broker)

@broker.publisher("test_2")
@broker.subscriber("test_1")
async def test_topic(msg: str) -> str:
pass

schema = get_app_schema(app).to_jsonable()

gssapi_security_schema = deepcopy(basic_schema)
gssapi_security_schema["servers"]["development"]["security"] = [{"gssapi": []}]
gssapi_security_schema["components"]["securitySchemes"] = {
"gssapi": {"type": "gssapi"}
}

assert schema == gssapi_security_schema

0 comments on commit e2af24d

Please sign in to comment.