diff --git a/ogc/bblocks/oas30.py b/ogc/bblocks/oas30.py index 2d16e8e..191eb09 100644 --- a/ogc/bblocks/oas30.py +++ b/ogc/bblocks/oas30.py @@ -1,7 +1,9 @@ #!/usr/bin/env python3 from __future__ import annotations +import argparse import logging +import os import re from builtins import isinstance from collections import deque @@ -10,7 +12,7 @@ from urllib.parse import urlparse from jsonpointer import resolve_pointer -from ogc.na.util import is_url, load_yaml +from ogc.na.util import is_url, load_yaml, dump_yaml from ogc.bblocks.util import load_file_cached, PathOrUrl @@ -302,9 +304,15 @@ def schema_to_oas30(schema_fn: Path, schema_url: str, bbr: BuildingBlockRegister } -def oas31_to_oas30(document: dict, document_location: PathOrUrl | str, bbr: BuildingBlockRegister | None, +def oas31_to_oas30(document: dict, document_location: PathOrUrl | str, bbr: BuildingBlockRegister | None = None, x_defs_path='/x-defs', target_version='3.0.3'): + if not isinstance(document, dict): + if isinstance(document, Path) or not is_url(document): + document = load_yaml(filename=document) + else: + document = load_yaml(url=document) + # == 1. Bundle if x_defs_path[0] != '/': x_defs_path = '/' + x_defs_path @@ -430,6 +438,12 @@ def fn(subschema: dict[str, Any], parent_is_properties=False, property=None, **k def process_path_item_object(o: dict): if not o: return + + ref = o.get('$ref') + if ref: + process_path_item_object(resolve_pointer(root_schema, ref[1:])) + return + for op_key in OAS_OPERATION_KEYS: operation = o.get(op_key) if operation: @@ -442,17 +456,39 @@ def process_path_item_object(o: dict): def process_operation_object(o: dict): if not o: return - for parameter in o.get('parameters', []): - resolve_parameter(parameter) - process_schema_object(parameter) - process_content_object(parameter) + + ref = o.get('$ref') + if ref: + process_operation_object(resolve_pointer(root_schema, ref[1:])) + return + + parameters = o.get('parameters') + if isinstance(parameters, dict): # $ref! + parameters = resolve_pointer(root_schema, parameters['$ref'][1:]) + if parameters: + for parameter in parameters: + resolve_parameter(parameter) + process_schema_object(parameter) + process_content_object(parameter) process_content_object(o.get('requestBody')) - for response in o.get('responses', {}).values(): - process_content_object(response) - for header in response.get('headers', {}).values(): - process_schema_object(header) + responses = o.get('responses') + if responses: + ref = responses.get('$ref') + if ref: + responses = resolve_pointer(root_schema, ref[1:]) + if responses: + for response in o.get('responses', {}).values(): + process_content_object(response) + headers = response.get('headers') + if headers: + ref = responses.get('$ref') + if ref: + headers = resolve_pointer(root_schema, ref[1:]) + if headers: + for header in headers.values(): + process_schema_object(header) for callback in o.get('callbacks', {}).values(): process_operation_object(callback) @@ -460,7 +496,18 @@ def process_operation_object(o: dict): def process_content_object(o: dict): if not o: return + + ref = o.get('$ref') + if ref: + process_content_object(resolve_pointer(root_schema, ref[1:])) + return + content = o.get('content') + if content: + ref = content.get('$ref') + if ref: + content = resolve_pointer(root_schema, ref[1:]) + if content: for schema_object in content.values(): # remove description because 3.0 doesn't support it @@ -515,3 +562,40 @@ def process_document(): root_schema['openapi'] = target_version return root_schema + + +def _main(): + parser = argparse.ArgumentParser( + description='Downcompiles OpenApi 3.1 documents to 3.0', + ) + + parser.add_argument( + 'document', + help='Document to downcompile', + ) + + parser.add_argument( + '--url', + help='Canonical URL of document', + ) + + parser.add_argument( + '-o', + '--output', + help='Output file' + ) + + args = parser.parse_args() + + document_location = PathOrUrl(args.url or args.document) + + result = oas31_to_oas30(args.document, document_location) + + if not args.output: + print(dump_yaml(result)) + else: + dump_yaml(result, args.output) + + +if __name__ == '__main__': + _main()