diff --git a/docs/docs/en/confluent/security.md b/docs/docs/en/confluent/security.md index 7a1d91e411..38c7404e58 100644 --- a/docs/docs/en/confluent/security.md +++ b/docs/docs/en/confluent/security.md @@ -63,8 +63,17 @@ This chapter discusses the security options available in **FastStream** and how {!> docs_src/confluent/security/sasl_oauthbearer.py [ln:1-8] !} ``` +### 5. SASLGSSAPI Object with SSL/TLS -### 5. Other security related usecases +**Purpose:** The `SASLGSSAPI` object is used for authentication using Kerberos. + +**Usage:** + +```python linenums="1" +{!> docs_src/confluent/security/sasl_gssapi.py [ln:1-10.25,11-] !} +``` + +### 6. Other security related usecases **Purpose**: If you want to pass additional values to `confluent-kafka-python`, you can pass a dictionary called `config` to `KafkaBroker`. For example, to pass your own certificate file: diff --git a/docs/docs_src/confluent/security/sasl_gssapi.py b/docs/docs_src/confluent/security/sasl_gssapi.py new file mode 100644 index 0000000000..230cda2cea --- /dev/null +++ b/docs/docs_src/confluent/security/sasl_gssapi.py @@ -0,0 +1,8 @@ +import ssl + +from faststream.confluent import KafkaBroker +from faststream.security import SASLGSSAPI + +security = SASLGSSAPI(use_ssl=True,) + +broker = KafkaBroker("localhost:9092", security=security) diff --git a/faststream/confluent/security.py b/faststream/confluent/security.py index 14ce8e08da..4e3e0c0e48 100644 --- a/faststream/confluent/security.py +++ b/faststream/confluent/security.py @@ -3,6 +3,7 @@ from faststream.exceptions import SetupError from faststream.security import ( + SASLGSSAPI, BaseSecurity, SASLOAuthBearer, SASLPlaintext, @@ -30,6 +31,8 @@ def parse_security(security: Optional[BaseSecurity]) -> "AnyDict": return _parse_sasl_scram512(security) elif isinstance(security, SASLOAuthBearer): return _parse_sasl_oauthbearer(security) + elif isinstance(security, SASLGSSAPI): + return _parse_sasl_gssapi(security) elif isinstance(security, BaseSecurity): return _parse_base_security(security) else: @@ -74,3 +77,10 @@ def _parse_sasl_oauthbearer(security: SASLOAuthBearer) -> "AnyDict": "security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT", "sasl_mechanism": "OAUTHBEARER", } + + +def _parse_sasl_gssapi(security: SASLGSSAPI) -> "AnyDict": + return { + "security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT", + "sasl_mechanism": "GSSAPI", + } diff --git a/tests/a_docs/confluent/test_security.py b/tests/a_docs/confluent/test_security.py index 5dbab775e0..d82cffd566 100644 --- a/tests/a_docs/confluent/test_security.py +++ b/tests/a_docs/confluent/test_security.py @@ -113,3 +113,27 @@ async def test_oathbearer(): producer_call_kwargs["security_protocol"] == call_kwargs["security_protocol"] ) + + +@pytest.mark.asyncio() +@pytest.mark.confluent() +async def test_gssapi(): + from docs.docs_src.confluent.security.sasl_gssapi import ( + broker as gssapi_broker, + ) + + with patch_aio_consumer_and_producer() as producer: + async with gssapi_broker: + producer_call_kwargs = producer.call_args.kwargs + + call_kwargs = { + "sasl_mechanism": "GSSAPI", + "security_protocol": "SASL_SSL", + } + + assert call_kwargs.items() <= producer_call_kwargs.items() + + assert ( + producer_call_kwargs["security_protocol"] + == call_kwargs["security_protocol"] + ) diff --git a/tests/asyncapi/confluent/test_security.py b/tests/asyncapi/confluent/test_security.py index f865ba9163..0621cbf756 100644 --- a/tests/asyncapi/confluent/test_security.py +++ b/tests/asyncapi/confluent/test_security.py @@ -5,6 +5,7 @@ from faststream.asyncapi.generate import get_app_schema from faststream.confluent import KafkaBroker from faststream.security import ( + SASLGSSAPI, BaseSecurity, SASLOAuthBearer, SASLPlaintext, @@ -195,3 +196,26 @@ async def test_topic(msg: str) -> str: } assert schema == sasl_oauthbearer_security_schema + + +def test_gssapi_security_schema(): + ssl_context = ssl.create_default_context() + security = SASLGSSAPI(ssl_context=ssl_context) + + broker = KafkaBroker("localhost:9092", security=security) + app = FastStream(broker) + + @broker.publisher("test_2") + @broker.subscriber("test_1") + async def test_topic(msg: str) -> str: + pass + + schema = get_app_schema(app).to_jsonable() + + gssapi_security_schema = deepcopy(basic_schema) + gssapi_security_schema["servers"]["development"]["security"] = [{"gssapi": []}] + gssapi_security_schema["components"]["securitySchemes"] = { + "gssapi": {"type": "gssapi"} + } + + assert schema == gssapi_security_schema