From cffda898f5a11f64edba74612f39375c7a7dbea2 Mon Sep 17 00:00:00 2001 From: p1c2u Date: Sun, 24 Sep 2023 09:56:40 +0000 Subject: [PATCH] Mimetype parameters handling --- .../media_types/deserializers.py | 4 ++- .../deserializing/media_types/factories.py | 6 +++- .../deserializing/media_types/util.py | 17 ++++++++--- .../templating/media_types/datatypes.py | 5 +++- .../templating/media_types/finders.py | 30 +++++++++++++++---- openapi_core/validation/validators.py | 11 +++++-- tests/integration/test_petstore.py | 4 ++- 7 files changed, 60 insertions(+), 17 deletions(-) diff --git a/openapi_core/deserializing/media_types/deserializers.py b/openapi_core/deserializing/media_types/deserializers.py index 43f99c81..2bdef976 100644 --- a/openapi_core/deserializing/media_types/deserializers.py +++ b/openapi_core/deserializing/media_types/deserializers.py @@ -16,9 +16,11 @@ def __init__( self, mimetype: str, deserializer_callable: Optional[DeserializerCallable] = None, + **parameters: str, ): self.mimetype = mimetype self.deserializer_callable = deserializer_callable + self.parameters = parameters def deserialize(self, value: Any) -> Any: if self.deserializer_callable is None: @@ -26,6 +28,6 @@ def deserialize(self, value: Any) -> Any: return value try: - return self.deserializer_callable(value) + return self.deserializer_callable(value, **self.parameters) except (ParseError, ValueError, TypeError, AttributeError): raise MediaTypeDeserializeError(self.mimetype, value) diff --git a/openapi_core/deserializing/media_types/factories.py b/openapi_core/deserializing/media_types/factories.py index f35257b2..00301d65 100644 --- a/openapi_core/deserializing/media_types/factories.py +++ b/openapi_core/deserializing/media_types/factories.py @@ -1,3 +1,4 @@ +from typing import Mapping from typing import Optional from openapi_core.deserializing.media_types.datatypes import ( @@ -23,6 +24,7 @@ def __init__( def create( self, mimetype: str, + parameters: Mapping[str, str], extra_media_type_deserializers: Optional[ MediaTypeDeserializersDict ] = None, @@ -34,7 +36,9 @@ def create( extra_media_type_deserializers=extra_media_type_deserializers, ) - return CallableMediaTypeDeserializer(mimetype, deserialize_callable) + return CallableMediaTypeDeserializer( + mimetype, deserialize_callable, **parameters + ) def get_deserializer_callable( self, diff --git a/openapi_core/deserializing/media_types/util.py b/openapi_core/deserializing/media_types/util.py index df03eba2..c73315d7 100644 --- a/openapi_core/deserializing/media_types/util.py +++ b/openapi_core/deserializing/media_types/util.py @@ -5,17 +5,26 @@ from urllib.parse import parse_qsl -def plain_loads(value: Union[str, bytes]) -> str: +def plain_loads(value: Union[str, bytes], **parameters: str) -> str: + charset = "utf-8" + if "charset" in parameters: + charset = parameters["charset"] if isinstance(value, bytes): - value = value.decode("ASCII", errors="surrogateescape") + try: + return value.decode(charset) + # fallback safe decode + except UnicodeDecodeError: + return value.decode("ASCII", errors="surrogateescape") return value -def urlencoded_form_loads(value: Any) -> Dict[str, Any]: +def urlencoded_form_loads(value: Any, **parameters: str) -> Dict[str, Any]: return dict(parse_qsl(value)) -def data_form_loads(value: Union[str, bytes]) -> Dict[str, Any]: +def data_form_loads( + value: Union[str, bytes], **parameters: str +) -> Dict[str, Any]: if isinstance(value, bytes): value = value.decode("ASCII", errors="surrogateescape") parser = Parser() diff --git a/openapi_core/templating/media_types/datatypes.py b/openapi_core/templating/media_types/datatypes.py index d76fe9d2..37c4c064 100644 --- a/openapi_core/templating/media_types/datatypes.py +++ b/openapi_core/templating/media_types/datatypes.py @@ -1,3 +1,6 @@ from collections import namedtuple +from dataclasses import dataclass +from typing import Mapping +from typing import Optional -MediaType = namedtuple("MediaType", ["value", "key"]) +MediaType = namedtuple("MediaType", ["mime_type", "parameters", "media_type"]) diff --git a/openapi_core/templating/media_types/finders.py b/openapi_core/templating/media_types/finders.py index 6477c9d7..7cf25704 100644 --- a/openapi_core/templating/media_types/finders.py +++ b/openapi_core/templating/media_types/finders.py @@ -1,5 +1,7 @@ """OpenAPI core templating media types finders module""" import fnmatch +from typing import Mapping +from typing import Tuple from openapi_core.spec import Spec from openapi_core.templating.media_types.datatypes import MediaType @@ -12,15 +14,31 @@ def __init__(self, content: Spec): def get_first(self) -> MediaType: mimetype, media_type = next(self.content.items()) - return MediaType(media_type, mimetype) + return MediaType(mimetype, {}, media_type) def find(self, mimetype: str) -> MediaType: - if mimetype in self.content: - return MediaType(self.content / mimetype, mimetype) + mime_type, parameters = self._parse_mimetype(mimetype) - if mimetype: + # simple mime type + for m in [mimetype, mime_type]: + if m in self.content: + return MediaType(mime_type, parameters, self.content / m) + + # range mime type + if mime_type: for key, value in self.content.items(): - if fnmatch.fnmatch(mimetype, key): - return MediaType(value, key) + if fnmatch.fnmatch(mime_type, key): + return MediaType(key, parameters, value) raise MediaTypeNotFound(mimetype, list(self.content.keys())) + + def _parse_mimetype(self, mimetype: str) -> Tuple[str, Mapping[str, str]]: + mimetype_parts = mimetype.split("; ") + mime_type = mimetype_parts[0] + parameters = {} + if len(mimetype_parts) > 1: + parameters_list = ( + param_str.split("=") for param_str in mimetype_parts[1:] + ) + parameters = dict(parameters_list) + return mime_type, parameters diff --git a/openapi_core/validation/validators.py b/openapi_core/validation/validators.py index 20166ae9..b9e7f397 100644 --- a/openapi_core/validation/validators.py +++ b/openapi_core/validation/validators.py @@ -86,10 +86,13 @@ def _find_media_type( return finder.get_first() return finder.find(mimetype) - def _deserialise_media_type(self, mimetype: str, value: Any) -> Any: + def _deserialise_media_type( + self, mimetype: str, parameters: Mapping[str, str], value: Any + ) -> Any: deserializer = self.media_type_deserializers_factory.create( mimetype, extra_media_type_deserializers=self.extra_media_type_deserializers, + parameters=parameters, ) return deserializer.deserialize(value) @@ -194,8 +197,10 @@ def _convert_content_schema_value_and_schema( content: Spec, mimetype: Optional[str] = None, ) -> Tuple[Any, Optional[Spec]]: - media_type, mime_type = self._find_media_type(content, mimetype) - deserialised = self._deserialise_media_type(mime_type, raw) + mime_type, parameters, media_type = self._find_media_type( + content, mimetype + ) + deserialised = self._deserialise_media_type(mime_type, parameters, raw) casted = self._cast(media_type, deserialised) if "schema" not in media_type: diff --git a/tests/integration/test_petstore.py b/tests/integration/test_petstore.py index 2d8794d5..16e8d79d 100644 --- a/tests/integration/test_petstore.py +++ b/tests/integration/test_petstore.py @@ -231,7 +231,9 @@ def test_get_pets_response_no_schema(self, spec): assert result.body is None data = "" - response = MockResponse(data, status_code=404, mimetype="text/html") + response = MockResponse( + data, status_code=404, mimetype="text/html; charset=utf-8" + ) response_result = unmarshal_response(request, response, spec=spec)