-
-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Minimal infrastructure for http, httpApiKey, and oauth2 security schemes #120
Changes from 3 commits
5e44d44
d02b1a7
7fef7a7
70cd039
7f31070
434ad27
9f7bc36
0d8f327
eca58da
4ce814f
34974e6
acc2b78
256cfff
a8e4e58
24ac1ac
7b29e59
7e460ff
811f628
becef94
5f67ace
6a11f0f
6ade4e2
4b3b132
fece4c6
c72c818
512cef3
edf7647
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from typing import Any, Mapping | ||
|
||
JSONMappingValue = Any | ||
JSONMapping = Mapping[str, JSONMappingValue] | ||
JSONSchema = JSONMapping | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
def default_on_connect_handler(*args, **kwargs): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to use a separate module for this. |
||
"""Injected into api when security is specified by no connect handler is provided""" | ||
|
||
pass | ||
|
||
|
||
DEFAULT_ON_CONNECT_HANDLER = "asynction.default_handlers.default_on_connect_handler" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
from importlib import import_module | ||
from typing import Callable | ||
|
||
|
||
def load_handler(handler_id: str) -> Callable: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would prefer this to remain under |
||
*module_path_elements, object_name = handler_id.split(".") | ||
module = import_module(".".join(module_path_elements)) | ||
|
||
return getattr(module, object_name) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
from typing import Sequence | ||
|
||
from asynction.common_types import JSONMapping | ||
|
||
from .exceptions import UnregisteredSecurityScheme | ||
from .exceptions import UnsupportedSecurityScheme | ||
from .types import SecurityRequirement | ||
from .types import SecurityScheme | ||
from .types import SecuritySchemesType | ||
from .validation import security_handler_factory | ||
|
||
|
||
def _resolve_security_scheme( | ||
security: Sequence[JSONMapping], schemes: JSONMapping | ||
) -> Sequence[JSONMapping]: | ||
new_security = [] | ||
for item in security: | ||
for scheme_name, scopes in item.items(): | ||
if scheme_name not in schemes: | ||
raise UnregisteredSecurityScheme | ||
scheme = schemes[scheme_name] | ||
new_security.append(dict(name=scheme_name, scopes=scopes, scheme=scheme)) | ||
|
||
return new_security | ||
|
||
|
||
def _resolve_server_security_schemes( | ||
raw_spec: JSONMapping, schemes: JSONMapping | ||
) -> JSONMapping: | ||
for name, server in raw_spec.get("servers", {}).items(): | ||
if "security" in server: | ||
server["security"] = ( | ||
_resolve_security_scheme(server["security"], schemes) or None | ||
) | ||
|
||
return raw_spec | ||
|
||
|
||
def resolve_security_schemes(raw_spec: JSONMapping) -> JSONMapping: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can get away with not having this resolver function. Since the AsyncAPI specification does not use JSONSchema references for these objects, I think it would be more intuitive to not move data structures from one place of the spec to another. Let's deserialise the complete map of security scheme objects under the components filed of the |
||
schemes = raw_spec.get("components", {}).get("securitySchemes", {}) | ||
if not schemes: | ||
return raw_spec | ||
raw_spec = _resolve_server_security_schemes(raw_spec, schemes) | ||
|
||
return raw_spec | ||
|
||
|
||
__all__ = [ | ||
"SecurityRequirement", | ||
"SecurityScheme", | ||
"SecuritySchemesType", | ||
"security_handler_factory", | ||
"resolve_security_schemes", | ||
"UnregisteredSecurityScheme", | ||
"UnsupportedSecurityScheme", | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from asynction.exceptions import AsynctionException | ||
|
||
|
||
class SecurityException(AsynctionException): | ||
""" | ||
Base Security Exception type. | ||
""" | ||
pass | ||
|
||
|
||
class UnregisteredSecurityScheme(SecurityException): | ||
""" | ||
Raised when a security scheme not listed in the securitySchemes section of the | ||
spec is used in a ``security`` or ``x-security`` specification | ||
""" | ||
pass | ||
|
||
|
||
class UnsupportedSecurityScheme(SecurityException): | ||
""" | ||
Raised when a specified security scheme is not supported by asynction | ||
""" | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
from dataclasses import dataclass | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the top level I wouldn't want to spread types across multiple places. For me the top level types module should be treated as a stand-alone entity that can serialise/deserialise AsyncAPI data. This top lvl For now let's concentrate all the AsyncAPI relevant types under the top level types module. If it starts getting too big we can make this module a |
||
from enum import Enum | ||
from typing import Mapping, Optional, Sequence, Type | ||
|
||
from svarog import register_forge | ||
from svarog.types import Forge | ||
|
||
from asynction.common_types import JSONMapping | ||
from .exceptions import UnsupportedSecurityScheme | ||
|
||
|
||
class HTTPSecuritySchemeType(Enum): | ||
BASIC = "basic" | ||
DIGEST = "digest" | ||
BEARER = "bearer" | ||
alex-zywicki marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
class OAuth2FlowType(Enum): | ||
""" | ||
https://www.asyncapi.com/docs/specifications/v2.2.0#oauthFlowsObject | ||
""" | ||
IMPLICIT = "implicit" | ||
PASSWORD = "password" | ||
CLIENT_CREDENTIALS = "clientCredentials " | ||
alex-zywicki marked this conversation as resolved.
Show resolved
Hide resolved
|
||
AUTHORIZATION_CODE = "authorizationCode" | ||
|
||
|
||
@dataclass | ||
class OAuth2Flow: | ||
""" | ||
https://www.asyncapi.com/docs/specifications/v2.2.0#oauthFlowObject | ||
""" | ||
scopes: Sequence[str] | ||
alex-zywicki marked this conversation as resolved.
Show resolved
Hide resolved
|
||
authorization_url: Optional[str] = None | ||
token_url: Optional[str] = None | ||
refresh_url: Optional[str] = None | ||
|
||
@staticmethod | ||
def forge( | ||
type_: Type["OAuth2Flow"], | ||
data: JSONMapping, | ||
forge: Forge | ||
) -> "OAuth2Flow": | ||
return type_( | ||
scopes=forge( | ||
type_.__annotations__["scopes"], | ||
data.get("scopes") | ||
), | ||
authorization_url=forge( | ||
type_.__annotations__["authorization_url"], | ||
data.get("authorizationUrl") | ||
), | ||
token_url=forge( | ||
type_.__annotations__["token_url"], | ||
data.get("tokenUrl") | ||
), | ||
refresh_url=forge( | ||
type_.__annotations__["refresh_url"], | ||
data.get("refreshUrl") | ||
) | ||
) | ||
|
||
|
||
register_forge(OAuth2Flow, OAuth2Flow.forge) | ||
|
||
|
||
class SecuritySchemesType(Enum): | ||
""" | ||
https://www.asyncapi.com/docs/specifications/v2.2.0#securitySchemeObject | ||
alex-zywicki marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
USER_PASSWORD = "userPassword" | ||
API_KEY = "apiKey" | ||
X509 = "X509" | ||
SYMMETRIC_ENCRYPTION = "symmetricEncryption" | ||
ASYMMETRIC_ENCRYPTION = "asymmetricEncryption" | ||
HTTP_API_KEY = "httpApiKey" | ||
HTTP = "http" | ||
OAUTH2 = "oauth2" | ||
OPENID_CONNECT = "openIdConnect" | ||
PLAIN = "plain" | ||
SCRAM_SHA256 = "scramSha256" | ||
SCRAM_SHA512 = "scramSha512" | ||
GSSAPI = "gssapi" | ||
|
||
|
||
@dataclass | ||
class SecurityScheme: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This dataclass should also have a Validation required:
|
||
""" | ||
https://www.asyncapi.com/docs/specifications/v2.2.0#securitySchemeObject | ||
""" | ||
type: SecuritySchemesType | ||
alex-zywicki marked this conversation as resolved.
Show resolved
Hide resolved
|
||
description: Optional[str] = None | ||
name: Optional[str] = None # Required for httpApiKey | ||
in_: Optional[str] = None # Required for httpApiKey | apiKey | ||
scheme: Optional[HTTPSecuritySchemeType] = None # Required for http | ||
alex-zywicki marked this conversation as resolved.
Show resolved
Hide resolved
|
||
bearer_format: Optional[str] = None # Optional for http ("bearer") | ||
flows: Optional[Mapping[OAuth2FlowType, OAuth2Flow]] = None # Required for oauth2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do see the reason behind choosing a For example: @dataclass
class OAuthFlows:
"""https://www.asyncapi.com/docs/specifications/v2.2.0#oauthFlowsObject"""
internal: Optional[OAuthFlow] = None
password: Optional[OAuthFlow] = None
client_credentials: Optional[OAuthFlow] = None
authorization_code: Optional[OAuthFlow] = None
def __post_init__(self):
if self.internal is not None:
if self.internal.authorization_url is None:
raise ValueError("Internal OAuth flow is missing Authorization URL")
# and similar rules for the rest of the flow kinds, as described in the AsyncAPI docs |
||
open_id_connect_url: Optional[str] = None # Required for openIdConnect | ||
|
||
x_basic_info_func: Optional[str] = None # Required for http(basic) | ||
x_token_info_func: Optional[str] = None # Required for oauth2 | ||
x_api_key_info_func: Optional[str] = None # Required for apiKey | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe also add |
||
|
||
@staticmethod | ||
def forge( | ||
type_: Type["SecurityScheme"], | ||
data: JSONMapping, | ||
forge: Forge | ||
) -> "SecurityScheme": | ||
|
||
scheme_type_raw = data.get("type") | ||
if not scheme_type_raw: | ||
raise UnsupportedSecurityScheme | ||
try: | ||
SecuritySchemesType(scheme_type_raw) | ||
except ValueError: | ||
raise UnsupportedSecurityScheme(scheme_type_raw) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be removed. Svarog and dataclasses will take care of this validation. |
||
|
||
return type_( | ||
type=forge( | ||
type_.__annotations__["type"], | ||
data.get("type") | ||
), | ||
description=forge( | ||
type_.__annotations__["description"], | ||
data.get("description") | ||
), | ||
name=forge( | ||
type_.__annotations__["name"], | ||
data.get("name") | ||
), | ||
in_=forge( | ||
type_.__annotations__["in_"], | ||
data.get("in") | ||
), | ||
scheme=forge( | ||
type_.__annotations__["scheme"], | ||
data.get("scheme") | ||
), | ||
bearer_format=forge( | ||
type_.__annotations__["bearer_format"], | ||
data.get("bearerFormat") | ||
), | ||
flows=forge( | ||
type_.__annotations__["flows"], | ||
data.get("flows") | ||
), | ||
open_id_connect_url=forge( | ||
type_.__annotations__["open_id_connect_url"], | ||
data.get("openIdConnectUrl") | ||
), | ||
x_basic_info_func=forge( | ||
type_.__annotations__["x_basic_info_func"], | ||
data.get("x-basicInfoFunc") | ||
), | ||
x_token_info_func=forge( | ||
type_.__annotations__["x_token_info_func"], | ||
data.get("x-tokenInfoFunc") | ||
), | ||
x_api_key_info_func=forge( | ||
type_.__annotations__["x_api_key_info_func"], | ||
data.get("x-apiKeyInfoFunc") | ||
) | ||
) | ||
|
||
|
||
register_forge(SecurityScheme, SecurityScheme.forge) | ||
|
||
|
||
@dataclass | ||
class SecurityRequirement: | ||
alex-zywicki marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# https://www.asyncapi.com/docs/specifications/v2.2.0#securityRequirementObject | ||
name: str | ||
scopes: Sequence[str] | ||
scheme: SecurityScheme | ||
|
||
@staticmethod | ||
def forge( | ||
type_: Type["SecurityRequirement"], | ||
data: JSONMapping, | ||
forge: Forge | ||
) -> "SecurityRequirement": | ||
return type_( | ||
name=forge(type_.__annotations__["name"], data.get("name")), | ||
scopes=forge(type_.__annotations__["scopes"], data.get("scopes")), | ||
scheme=forge(type_.__annotations__["scheme"], data.get("scheme")) | ||
) | ||
|
||
|
||
register_forge(SecurityRequirement, SecurityRequirement.forge) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An explicit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These could stay in the top level types module. See the comments below with regards to keeping everything under a single types module/package.