Skip to content

Commit

Permalink
Mimetype parameters handling
Browse files Browse the repository at this point in the history
  • Loading branch information
p1c2u committed Sep 24, 2023
1 parent 7a17349 commit a570e60
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 19 deletions.
4 changes: 3 additions & 1 deletion openapi_core/deserializing/media_types/deserializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ def __init__(
self,
mimetype: str,
deserializer_callable: Optional[DeserializerCallable] = None,
**parameters: str,
):
self.mimetype = mimetype
self.deserializer_callable = deserializer_callable
self.parameters = parameters

def deserialize(self, value: Any) -> Any:
if self.deserializer_callable is None:
warnings.warn(f"Unsupported {self.mimetype} mimetype")
return value

try:
return self.deserializer_callable(value)
return self.deserializer_callable(value, **self.parameters)
except (ParseError, ValueError, TypeError, AttributeError):
raise MediaTypeDeserializeError(self.mimetype, value)
8 changes: 7 additions & 1 deletion openapi_core/deserializing/media_types/factories.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Mapping
from typing import Optional

from openapi_core.deserializing.media_types.datatypes import (
Expand All @@ -23,18 +24,23 @@ def __init__(
def create(
self,
mimetype: str,
parameters: Optional[Mapping[str, str]] = None,
extra_media_type_deserializers: Optional[
MediaTypeDeserializersDict
] = None,
) -> CallableMediaTypeDeserializer:
if parameters is None:
parameters = {}
if extra_media_type_deserializers is None:
extra_media_type_deserializers = {}
deserialize_callable = self.get_deserializer_callable(
mimetype,
extra_media_type_deserializers=extra_media_type_deserializers,
)

return CallableMediaTypeDeserializer(mimetype, deserialize_callable)
return CallableMediaTypeDeserializer(
mimetype, deserialize_callable, **parameters
)

def get_deserializer_callable(
self,
Expand Down
17 changes: 13 additions & 4 deletions openapi_core/deserializing/media_types/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,26 @@
from urllib.parse import parse_qsl


def plain_loads(value: Union[str, bytes]) -> str:
def plain_loads(value: Union[str, bytes], **parameters: str) -> str:
charset = "utf-8"
if "charset" in parameters:
charset = parameters["charset"]
if isinstance(value, bytes):
value = value.decode("ASCII", errors="surrogateescape")
try:
return value.decode(charset)
# fallback safe decode
except UnicodeDecodeError:
return value.decode("ASCII", errors="surrogateescape")
return value


def urlencoded_form_loads(value: Any) -> Dict[str, Any]:
def urlencoded_form_loads(value: Any, **parameters: str) -> Dict[str, Any]:
return dict(parse_qsl(value))


def data_form_loads(value: Union[str, bytes]) -> Dict[str, Any]:
def data_form_loads(
value: Union[str, bytes], **parameters: str
) -> Dict[str, Any]:
if isinstance(value, bytes):
value = value.decode("ASCII", errors="surrogateescape")
parser = Parser()
Expand Down
5 changes: 4 additions & 1 deletion openapi_core/templating/media_types/datatypes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from collections import namedtuple
from dataclasses import dataclass
from typing import Mapping
from typing import Optional

MediaType = namedtuple("MediaType", ["value", "key"])
MediaType = namedtuple("MediaType", ["mime_type", "parameters", "media_type"])
33 changes: 27 additions & 6 deletions openapi_core/templating/media_types/finders.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""OpenAPI core templating media types finders module"""
import fnmatch
from typing import Mapping
from typing import Tuple

from openapi_core.spec import Spec
from openapi_core.templating.media_types.datatypes import MediaType
Expand All @@ -12,15 +14,34 @@ def __init__(self, content: Spec):

def get_first(self) -> MediaType:
mimetype, media_type = next(self.content.items())
return MediaType(media_type, mimetype)
return MediaType(mimetype, {}, media_type)

def find(self, mimetype: str) -> MediaType:
if mimetype in self.content:
return MediaType(self.content / mimetype, mimetype)
if mimetype is None:
raise MediaTypeNotFound(mimetype, list(self.content.keys()))

if mimetype:
mime_type, parameters = self._parse_mimetype(mimetype)

# simple mime type
for m in [mimetype, mime_type]:
if m in self.content:
return MediaType(mime_type, parameters, self.content / m)

# range mime type
if mime_type:
for key, value in self.content.items():
if fnmatch.fnmatch(mimetype, key):
return MediaType(value, key)
if fnmatch.fnmatch(mime_type, key):
return MediaType(key, parameters, value)

raise MediaTypeNotFound(mimetype, list(self.content.keys()))

def _parse_mimetype(self, mimetype: str) -> Tuple[str, Mapping[str, str]]:
mimetype_parts = mimetype.split("; ")
mime_type = mimetype_parts[0]
parameters = {}
if len(mimetype_parts) > 1:
parameters_list = (
param_str.split("=") for param_str in mimetype_parts[1:]
)
parameters = dict(parameters_list)
return mime_type, parameters
11 changes: 8 additions & 3 deletions openapi_core/validation/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,13 @@ def _find_media_type(
return finder.get_first()
return finder.find(mimetype)

def _deserialise_media_type(self, mimetype: str, value: Any) -> Any:
def _deserialise_media_type(
self, mimetype: str, parameters: Mapping[str, str], value: Any
) -> Any:
deserializer = self.media_type_deserializers_factory.create(
mimetype,
extra_media_type_deserializers=self.extra_media_type_deserializers,
parameters=parameters,
)
return deserializer.deserialize(value)

Expand Down Expand Up @@ -194,8 +197,10 @@ def _convert_content_schema_value_and_schema(
content: Spec,
mimetype: Optional[str] = None,
) -> Tuple[Any, Optional[Spec]]:
media_type, mime_type = self._find_media_type(content, mimetype)
deserialised = self._deserialise_media_type(mime_type, raw)
mime_type, parameters, media_type = self._find_media_type(
content, mimetype
)
deserialised = self._deserialise_media_type(mime_type, parameters, raw)
casted = self._cast(media_type, deserialised)

if "schema" not in media_type:
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/test_petstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ def test_get_pets_response_no_schema(self, spec):
assert result.body is None

data = "<html></html>"
response = MockResponse(data, status_code=404, mimetype="text/html")
response = MockResponse(
data, status_code=404, mimetype="text/html; charset=utf-8"
)

response_result = unmarshal_response(request, response, spec=spec)

Expand Down
13 changes: 11 additions & 2 deletions tests/unit/templating/test_media_types_finders.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,26 @@ def content(self, spec):
def finder(self, content):
return MediaTypeFinder(content)

def test_charset(self, finder, content):
mimetype = "text/html; charset=utf-8"

mimetype, parameters, _ = finder.find(mimetype)
assert mimetype == "text/*"
assert parameters == {"charset": "utf-8"}

def test_exact(self, finder, content):
mimetype = "application/json"

_, mimetype = finder.find(mimetype)
mimetype, parameters, _ = finder.find(mimetype)
assert mimetype == "application/json"
assert parameters == {}

def test_match(self, finder, content):
mimetype = "text/html"

_, mimetype = finder.find(mimetype)
mimetype, parameters, _ = finder.find(mimetype)
assert mimetype == "text/*"
assert parameters == {}

def test_not_found(self, finder, content):
mimetype = "unknown"
Expand Down

0 comments on commit a570e60

Please sign in to comment.