diff --git a/integration_tests/test_flask_json_response_type.py b/integration_tests/test_flask_json_response_type.py new file mode 100644 index 000000000..a508971ab --- /dev/null +++ b/integration_tests/test_flask_json_response_type.py @@ -0,0 +1,36 @@ +from core_codemods.flask_json_response_type import FlaskJsonResponseType +from integration_tests.base_test import ( + BaseIntegrationTest, + original_and_expected_from_code_path, +) + + +class TestFlaskJsonResponseType(BaseIntegrationTest): + codemod = FlaskJsonResponseType + code_path = "tests/samples/flask_json_response_type.py" + original_code, expected_new_code = original_and_expected_from_code_path( + code_path, + [ + ( + 8, + """ return (make_response(json_response), {'Content-Type': 'application/json'})\n""", + ), + ], + ) + + # fmt: off + expected_diff =( + """--- \n""" + """+++ \n""" + """@@ -6,4 +6,4 @@\n""" + """ @app.route("/test")\n""" + """ def foo(request):\n""" + """ json_response = json.dumps({ "user_input": request.GET.get("input") })\n""" + """- return make_response(json_response)\n""" + """+ return (make_response(json_response), {'Content-Type': 'application/json'})\n""" + ) + # fmt: on + + expected_line_change = "9" + change_description = FlaskJsonResponseType.CHANGE_DESCRIPTION + num_changed_files = 1 diff --git a/src/codemodder/codemods/utils_mixin.py b/src/codemodder/codemods/utils_mixin.py index 9d80baf0d..18c665700 100644 --- a/src/codemodder/codemods/utils_mixin.py +++ b/src/codemodder/codemods/utils_mixin.py @@ -228,6 +228,20 @@ def is_value_of_assignment( return parent return None + def is_target_of_assignment( + self, expr + ) -> Optional[cst.AnnAssign | cst.Assign | cst.WithItem | cst.NamedExpr]: + """ + Tests if expr is the value in an assignment. + """ + parent = self.get_metadata(ParentNodeProvider, expr) + parent = parent if isinstance(parent, cst.AssignTarget) else None + gparent = self.get_metadata(ParentNodeProvider, parent) if parent else None + match gparent: + case cst.AnnAssign() | cst.Assign() | cst.WithItem() | cst.NamedExpr(): + return gparent + return None + def has_attr_called(self, node: cst.CSTNode) -> Optional[cst.Name]: """ Checks if node is part of an expression of the form: .call(). @@ -298,6 +312,21 @@ def is_attribute_value(self, node: cst.CSTNode) -> Optional[cst.Attribute]: return maybe_parent return None + def find_immediate_function_def( + self, node: cst.CSTNode + ) -> Optional[cst.FunctionDef]: + """ + Find if node is inside a function definition. In case of nested functions it returns the most immediate one. + """ + # We disregard nested functions, we consider only the immediate one + ancestors = self.path_to_root(node) + first_fdef = None + for ancestor in ancestors: + if isinstance(ancestor, cst.FunctionDef): + first_fdef = ancestor + break + return first_fdef + def path_to_root(self, node: cst.CSTNode) -> list[cst.CSTNode]: """ Returns node's path to root. Includes self. @@ -338,6 +367,47 @@ def get_parent(self, node: cst.CSTNode) -> Optional[cst.CSTNode]: return None +class NameAndAncestorResolutionMixin(NameResolutionMixin, AncestorPatternsMixin): + METADATA_DEPENDENCIES: Tuple[Any, ...] = ( + ScopeProvider, + ParentNodeProvider, + ) + + def extract_value(self, node: cst.AnnAssign | cst.Assign | cst.WithItem): + match node: + case cst.AnnAssign(value=value) | cst.Assign(value=value) | cst.WithItem( + item=value + ) | cst.NamedExpr(value=value): + return value + return None + + def resolve_expression(self, node: cst.BaseExpression) -> cst.BaseExpression: + """ + If the expression is a Name, transitively resolves the name to another type of expression. Otherwise returns self. + """ + maybe_expr = None + match node: + case cst.Name(): + maybe_expr = self._resolve_name_transitive(node) + if maybe_expr: + return maybe_expr + return node + + def _resolve_name_transitive(self, node: cst.Name) -> Optional[cst.BaseExpression]: + maybe_assignment = self.find_single_assignment(node) + if maybe_assignment: + if maybe_target_assignment := self.is_target_of_assignment( + maybe_assignment.node + ): + value = self.extract_value(maybe_target_assignment) + match value: + case cst.Name(): + return self._resolve_name_transitive(value) + case _: + return value + return None + + def iterate_left_expressions(node: cst.BaseExpression): yield node match node: diff --git a/src/codemodder/scripts/generate_docs.py b/src/codemodder/scripts/generate_docs.py index 8ae4180c9..fc942c1f9 100644 --- a/src/codemodder/scripts/generate_docs.py +++ b/src/codemodder/scripts/generate_docs.py @@ -158,6 +158,10 @@ class DocMetadata: importance="Low", guidance_explained="This change fixes deprecated uses and is safe.", ), + "flask-json-response-type": DocMetadata( + importance="Medium", + guidance_explained="This change will only restrict the response type and will not alter the response data itself. Thus we deem it safe.", + ), } diff --git a/src/core_codemods/__init__.py b/src/core_codemods/__init__.py index 7b99fa264..4bd6a792e 100644 --- a/src/core_codemods/__init__.py +++ b/src/core_codemods/__init__.py @@ -33,6 +33,7 @@ from .file_resource_leak import FileResourceLeak from .django_receiver_on_top import DjangoReceiverOnTop from .django_json_response_type import DjangoJsonResponseType +from .flask_json_response_type import FlaskJsonResponseType registry = CodemodCollection( origin="pixee", @@ -72,5 +73,6 @@ DjangoReceiverOnTop, NumpyNanEquality, DjangoJsonResponseType, + FlaskJsonResponseType, ], ) diff --git a/src/core_codemods/django_json_response_type.py b/src/core_codemods/django_json_response_type.py index f79ebfdb9..bc1a963bd 100644 --- a/src/core_codemods/django_json_response_type.py +++ b/src/core_codemods/django_json_response_type.py @@ -6,9 +6,9 @@ class DjangoJsonResponseType(SemgrepCodemod): NAME = "django-json-response-type" - SUMMARY = "Set content type to `json/application` for `django.http.HttpResponse` with JSON data" + SUMMARY = "Set content type to `application/json` for `django.http.HttpResponse` with JSON data" REVIEW_GUIDANCE = ReviewGuidance.MERGE_WITHOUT_REVIEW - DESCRIPTION = "Sets `content_type` to `json/application`." + DESCRIPTION = "Sets `content_type` to `application/json`." REFERENCES = [ { "url": "https://docs.djangoproject.com/en/4.0/ref/request-response/#django.http.HttpResponse.__init__", diff --git a/src/core_codemods/docs/pixee_python_flask-json-response-type.md b/src/core_codemods/docs/pixee_python_flask-json-response-type.md new file mode 100644 index 000000000..0c0c939cc --- /dev/null +++ b/src/core_codemods/docs/pixee_python_flask-json-response-type.md @@ -0,0 +1,16 @@ +The default `mimetype` for `make_response` in Flask is `'text/html'`. This is true even when the response contains JSON data. +If the JSON contains (unsanitized) user-supplied input, a malicious user may supply HTML code which leaves the application vulnerable to cross-site scripting (XSS). +This fix explicitly sets the response type to `application/json` when the response body is JSON data to avoid this vulnerability. Our changes look something like this: + +```diff +from flask import make_response, Flask +import json + +app = Flask(__name__) + +@app.route("/test") +def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) +- return make_response(json_response) ++ return (make_response(json_response), {'Content-Type':'application/json'} +``` diff --git a/src/core_codemods/flask_json_response_type.py b/src/core_codemods/flask_json_response_type.py new file mode 100644 index 000000000..116d0d1a6 --- /dev/null +++ b/src/core_codemods/flask_json_response_type.py @@ -0,0 +1,193 @@ +from typing import Optional, Union +import libcst as cst +from codemodder.codemods.api import BaseCodemod + +from codemodder.codemods.base_codemod import ReviewGuidance + +from codemodder.codemods.utils_mixin import NameAndAncestorResolutionMixin + + +class FlaskJsonResponseType(BaseCodemod, NameAndAncestorResolutionMixin): + NAME = "flask-json-response-type" + SUMMARY = "Set content type to `application/json` for `flask.make_response` with JSON data" + REVIEW_GUIDANCE = ReviewGuidance.MERGE_WITHOUT_REVIEW + DESCRIPTION = "Sets `mimetype` to `application/json`." + REFERENCES = [ + { + "url": "https://flask.palletsprojects.com/en/2.3.x/patterns/javascript/#return-json-from-views", + "description": "", + }, + { + "url": "https://cheatsheetseries.owasp.org/cheatsheets/Cross_Site_Scripting_Prevention_Cheat_Sheet.html#output-encoding-for-javascript-contexts", + "description": "", + }, + ] + + content_type_key = "Content-Type" + json_content_type = "application/json" + + def leave_Return( + self, original_node: cst.Return, updated_node: cst.Return + ) -> Union[ + cst.BaseSmallStatement, + cst.FlattenSentinel[cst.BaseSmallStatement], + cst.RemovalSentinel, + ]: + if original_node.value: + # is inside a function def with a route decorator + maybe_function_def = self.find_immediate_function_def(original_node) + maybe_has_decorator = ( + self._has_route_decorator(maybe_function_def) + if maybe_function_def + else None + ) + if maybe_has_decorator: + # json.dumps(...) + maybe_json_dumps = self._is_json_dumps_call(original_node.value) + if maybe_json_dumps: + self.add_change(original_node, self.CHANGE_DESCRIPTION) + return updated_node.with_changes( + value=self._wrap_into_tuple_with_content_type( + original_node.value + ) + ) + + # make_response(json.dumps(...),...) + maybe_make_response = self.is_make_response_with_json( + original_node.value + ) + if maybe_make_response: + self.add_change(original_node, self.CHANGE_DESCRIPTION) + return updated_node.with_changes( + value=self._wrap_into_tuple_with_content_type( + original_node.value + ) + ) + + # tuple case + match original_node.value: + case cst.Tuple(): + maybe_fixed_tuple = self._fix_response_tuple( + original_node.value + ) + if maybe_fixed_tuple: + self.add_change(original_node, self.CHANGE_DESCRIPTION) + return updated_node.with_changes(value=maybe_fixed_tuple) + return updated_node + + def _is_string_or_int(self, node: cst.BaseExpression): + expr = self.resolve_expression(node) + if expr and isinstance(expr, cst.SimpleString | cst.Integer): + return True + return False + + def _fix_response_tuple(self, node: cst.Tuple) -> Optional[cst.Tuple]: + elements = list(node.elements) + # (make_response | json.dumps, ..., {...}) + if len(elements) == 3: + last = elements[-1].value + match last: + case cst.Dict() if not self._has_content_type_key(last): + elements[-1] = cst.Element( + self._add_key_value( + last, + cst.SimpleString(f"'{self.content_type_key}'"), + cst.SimpleString(f"'{self.json_content_type}'"), + ) + ) + return node.with_changes(elements=elements) + # (make_response | json.dumps, string|number) + # (make_response | json.dumps, {...}) + if len(elements) == 2: + last = elements[-1].value + expr = self.resolve_expression(last) + match expr: + case cst.Dict() if not self._has_content_type_key(expr): + if last == expr: + elements[-1] = cst.Element( + self._add_key_value( + expr, + cst.SimpleString(f"'{self.content_type_key}'"), + cst.SimpleString(f"'{self.json_content_type}'"), + ) + ) + return node.with_changes(elements=elements) + # TODO last != expr case. + case cst.Integer() | cst.SimpleString(): + elements.append(cst.Element(self._build_dict())) + return node.with_changes(elements=elements) + return None + + def _wrap_into_tuple_with_content_type(self, node: cst.BaseExpression) -> cst.Tuple: + return cst.Tuple([cst.Element(node), cst.Element(self._build_dict())]) + + def _build_dict(self) -> cst.Dict: + return cst.Dict( + [ + cst.DictElement( + cst.SimpleString(f"'{self.content_type_key}'"), + cst.SimpleString(f"'{self.json_content_type}'"), + ) + ] + ) + + def _has_route_decorator(self, node: cst.FunctionDef) -> bool: + # We cannot guarantee that this decorator originates from a flask app object + # thus we just check for the name + for decorator in node.decorators: + match decorator.decorator: + case cst.Call(func=cst.Attribute() as func): + if func.attr.value == "route": + return True + return False + + def _is_json_dumps_call(self, node: cst.BaseExpression) -> Optional[cst.Call]: + expr = node + match node: + case cst.Name(): + expr = self._resolve_name_transitive(node) + match expr: + case cst.Call(): + true_name = self.find_base_name(expr) + if true_name == "json.dumps": + return expr + return None + + def is_make_response_with_json( + self, node: cst.BaseExpression + ) -> Optional[cst.Call]: + expr = node + match node: + case cst.Name(): + expr = self._resolve_name_transitive(node) + match expr: + case cst.Call(): + true_name = self.find_base_name(expr) + if true_name != "flask.make_response": + return None + first_arg = expr.args[0].value if expr.args else None + if first_arg and self._is_json_dumps_call(first_arg): + return expr + return None + + def _has_content_type_key(self, dict_expr: cst.Dict): + for element in dict_expr.elements: + match element: + case cst.StarredDictElement(): + return True + case cst.DictElement(key=key): + match key: + case cst.SimpleString(): + if key.raw_value == self.content_type_key: + return True + # it may use variable or other expreesions that resolves to Content-Type + case _: + return True + return False + + def _add_key_value( + self, dict_expr: cst.Dict, key: cst.BaseExpression, value: cst.BaseExpression + ) -> cst.Dict: + elements = list(dict_expr.elements) + elements.append(cst.DictElement(key, value)) + return dict_expr.with_changes(elements=elements) diff --git a/tests/codemods/test_flask_json_response_type.py b/tests/codemods/test_flask_json_response_type.py new file mode 100644 index 000000000..6b1e547d5 --- /dev/null +++ b/tests/codemods/test_flask_json_response_type.py @@ -0,0 +1,212 @@ +from core_codemods.flask_json_response_type import FlaskJsonResponseType +from tests.codemods.base_codemod_test import BaseCodemodTest +from textwrap import dedent + + +class TestFlaskJsonResponseType(BaseCodemodTest): + codemod = FlaskJsonResponseType + + def test_name(self): + assert self.codemod.name() == "flask-json-response-type" + + def test_simple(self, tmpdir): + input_code = """\ + from flask import make_response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) + return make_response(json_response) + """ + expected = """\ + from flask import make_response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) + return (make_response(json_response), {'Content-Type': 'application/json'}) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expected)) + assert len(self.file_context.codemod_changes) == 1 + + def test_alias(self, tmpdir): + input_code = """\ + from flask import make_response as response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) + return response(json_response) + """ + expected = """\ + from flask import make_response as response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) + return (response(json_response), {'Content-Type': 'application/json'}) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expected)) + assert len(self.file_context.codemod_changes) == 1 + + def test_direct(self, tmpdir): + input_code = """\ + from flask import make_response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + return make_response(json.dumps({ "user_input": request.GET.get("input") })) + """ + expected = """\ + from flask import make_response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + return (make_response(json.dumps({ "user_input": request.GET.get("input") })), {'Content-Type': 'application/json'}) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expected)) + assert len(self.file_context.codemod_changes) == 1 + + def test_tuple_no_dict(self, tmpdir): + input_code = """\ + from flask import make_response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) + code = 404 + return (make_response(json_response), code) + """ + expected = """\ + from flask import make_response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) + code = 404 + return (make_response(json_response), code, {'Content-Type': 'application/json'}) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expected)) + assert len(self.file_context.codemod_changes) == 1 + + def test_tuple_dict_no_key(self, tmpdir): + input_code = """\ + from flask import make_response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) + return (make_response(json_response), {}) + """ + expected = """\ + from flask import make_response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) + return (make_response(json_response), {'Content-Type': 'application/json'}) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expected)) + assert len(self.file_context.codemod_changes) == 1 + + def test_triple_dict(self, tmpdir): + input_code = """\ + from flask import make_response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) + code = 404 + return (make_response(json_response), code, {}) + """ + expected = """\ + from flask import make_response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) + code = 404 + return (make_response(json_response), code, {'Content-Type': 'application/json'}) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expected)) + assert len(self.file_context.codemod_changes) == 1 + + def test_no_route_decorator(self, tmpdir): + input_code = """\ + from flask import make_response, Flask + import json + + app = Flask(__name__) + + def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) + return make_response(json_response) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + assert len(self.file_context.codemod_changes) == 0 + + def test_content_type_set(self, tmpdir): + input_code = """\ + from flask import make_response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) + return (make_response(json_response), {'Content-Type': 'application/json'}) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + assert len(self.file_context.codemod_changes) == 0 + + def test_no_json_input(self, tmpdir): + input_code = """\ + from flask import make_response, Flask + import json + + app = Flask(__name__) + + @app.route("/test") + def foo(request): + dict_response = { "user_input": request.GET.get("input") } + return make_response(dict_response) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + assert len(self.file_context.codemod_changes) == 0 diff --git a/tests/samples/flask_json_response_type.py b/tests/samples/flask_json_response_type.py new file mode 100644 index 000000000..ded1df4bc --- /dev/null +++ b/tests/samples/flask_json_response_type.py @@ -0,0 +1,9 @@ +from flask import make_response, Flask +import json + +app = Flask(__name__) + +@app.route("/test") +def foo(request): + json_response = json.dumps({ "user_input": request.GET.get("input") }) + return make_response(json_response)