diff --git a/integration_tests/test_secure_flask_session_config.py b/integration_tests/test_secure_flask_session_config.py new file mode 100644 index 00000000..f138ba18 --- /dev/null +++ b/integration_tests/test_secure_flask_session_config.py @@ -0,0 +1,16 @@ +from core_codemods.secure_flask_session_config import SecureFlaskSessionConfig +from integration_tests.base_test import ( + BaseIntegrationTest, + original_and_expected_from_code_path, +) + + +class TestSecureFlaskSessionConfig(BaseIntegrationTest): + codemod = SecureFlaskSessionConfig + code_path = "tests/samples/flask_app.py" + original_code, expected_new_code = original_and_expected_from_code_path( + code_path, [(2, "app.config['SESSION_COOKIE_HTTPONLY'] = True\n")] + ) + expected_diff = "--- \n+++ \n@@ -1,6 +1,6 @@\n from flask import Flask\n app = Flask(__name__)\n-app.config['SESSION_COOKIE_HTTPONLY'] = False\n+app.config['SESSION_COOKIE_HTTPONLY'] = True\n @app.route('/')\n def hello_world():\n return 'Hello World!'\n" + expected_line_change = "3" + change_description = SecureFlaskSessionConfig.CHANGE_DESCRIPTION diff --git a/src/codemodder/project_analysis/file_parsers/setup_py_file_parser.py b/src/codemodder/project_analysis/file_parsers/setup_py_file_parser.py index 2fbd78d7..371d0a59 100644 --- a/src/codemodder/project_analysis/file_parsers/setup_py_file_parser.py +++ b/src/codemodder/project_analysis/file_parsers/setup_py_file_parser.py @@ -1,11 +1,11 @@ from codemodder.project_analysis.file_parsers.package_store import PackageStore +from codemodder.utils.utils import clean_simplestring from pathlib import Path import libcst as cst from libcst import matchers from packaging.requirements import Requirement from .base_parser import BaseParser -from .utils import clean_simplestring class SetupPyParser(BaseParser): diff --git a/src/codemodder/project_analysis/file_parsers/utils.py b/src/codemodder/project_analysis/file_parsers/utils.py index 4513523e..e69de29b 100644 --- a/src/codemodder/project_analysis/file_parsers/utils.py +++ b/src/codemodder/project_analysis/file_parsers/utils.py @@ -1,7 +0,0 @@ -import libcst as cst - - -def clean_simplestring(node: cst.SimpleString | str) -> str: - if isinstance(node, str): - return node.strip('"') - return node.raw_value diff --git a/src/codemodder/scripts/generate_docs.py b/src/codemodder/scripts/generate_docs.py index 4f39cfb6..bf253c4f 100644 --- a/src/codemodder/scripts/generate_docs.py +++ b/src/codemodder/scripts/generate_docs.py @@ -134,6 +134,10 @@ class DocMetadata: importance="Low", guidance_explained="We believe this replacement is safe and leads to better performance.", ), + "secure-flask-session-configuration": DocMetadata( + importance="Medium", + guidance_explained="Our change fixes explicitly insecure session configuration for a Flask application. However, there may be valid cases to use these insecure configurations, such as for testing or backward compatibility.", + ), } diff --git a/src/codemodder/utils/utils.py b/src/codemodder/utils/utils.py new file mode 100644 index 00000000..4f223778 --- /dev/null +++ b/src/codemodder/utils/utils.py @@ -0,0 +1,38 @@ +import libcst as cst + + +def clean_simplestring(node: cst.SimpleString | str) -> str: + if isinstance(node, str): + return node.strip('"') + return node.raw_value + + +def true_value(node: cst.Name | cst.SimpleString) -> str | int | bool: + match node: + case cst.SimpleString(): + return clean_simplestring(node) + case cst.Name(): + val = node.value + if val.lower() == "true": + return True + elif val.lower() == "false": + return False + return val + return "" + + +def extract_targets_of_assignment( + assignment: cst.AnnAssign | cst.Assign | cst.WithItem | cst.NamedExpr, +) -> list[cst.BaseExpression]: + match assignment: + case cst.AnnAssign(): + if assignment.target: + return [assignment.target] + case cst.Assign(): + return [t.target for t in assignment.targets] + case cst.NamedExpr(): + return [assignment.target] + case cst.WithItem(): + if assignment.asname: + return [assignment.asname.name] + return [] diff --git a/src/core_codemods/__init__.py b/src/core_codemods/__init__.py index ee5dcd18..120915ce 100644 --- a/src/core_codemods/__init__.py +++ b/src/core_codemods/__init__.py @@ -27,6 +27,7 @@ from .use_generator import UseGenerator from .use_walrus_if import UseWalrusIf from .with_threading_lock import WithThreadingLock +from .secure_flask_session_config import SecureFlaskSessionConfig registry = CodemodCollection( origin="pixee", @@ -60,5 +61,6 @@ UseWalrusIf, WithThreadingLock, SQLQueryParameterization, + SecureFlaskSessionConfig, ], ) diff --git a/src/core_codemods/docs/pixee_python_secure-flask-session-configuration.md b/src/core_codemods/docs/pixee_python_secure-flask-session-configuration.md new file mode 100644 index 00000000..3c17e42e --- /dev/null +++ b/src/core_codemods/docs/pixee_python_secure-flask-session-configuration.md @@ -0,0 +1,13 @@ +Flask applications can configure sessions behavior at the application level. +This codemod looks for Flask application configuration that set `SESSION_COOKIE_HTTPONLY`, `SESSION_COOKIE_SECURE`, or `SESSION_COOKIE_SAMESITE` to an insecure value and changes it to a secure one. + +The changes from this codemod look like this: + +```diff + from flask import Flask + app = Flask(__name__) +- app.config['SESSION_COOKIE_HTTPONLY'] = False +- app.config.update(SESSION_COOKIE_SECURE=False) ++ app.config['SESSION_COOKIE_HTTPONLY'] = True ++ app.config.update(SESSION_COOKIE_SECURE=True) +``` diff --git a/src/core_codemods/secure_flask_session_config.py b/src/core_codemods/secure_flask_session_config.py new file mode 100644 index 00000000..0bc4c7ee --- /dev/null +++ b/src/core_codemods/secure_flask_session_config.py @@ -0,0 +1,190 @@ +import libcst as cst +from libcst.codemod import Codemod, CodemodContext +from libcst.metadata import ParentNodeProvider, PositionProvider + +from libcst import matchers +from codemodder.codemods.base_codemod import ReviewGuidance +from codemodder.codemods.api import BaseCodemod +from codemodder.codemods.utils_mixin import NameResolutionMixin +from codemodder.utils.utils import extract_targets_of_assignment, true_value +from codemodder.codemods.base_visitor import BaseTransformer +from codemodder.change import Change +from codemodder.file_context import FileContext + + +class SecureFlaskSessionConfig(BaseCodemod, Codemod): + NAME = "secure-flask-session-configuration" + SUMMARY = "Flip Insecure `Flask` Session Configurations" + REVIEW_GUIDANCE = ReviewGuidance.MERGE_AFTER_REVIEW + DESCRIPTION = "Flip Flask session configuration if defined as insecure." + REFERENCES = [ + { + "url": "https://owasp.org/www-community/controls/SecureCookieAttribute", + "description": "", + }, + { + "url": "https://cheatsheetseries.owasp.org/cheatsheets/Session_Management_Cheat_Sheet.html", + "description": "", + }, + ] + + def transform_module_impl(self, tree: cst.Module) -> cst.Module: + flask_codemod = FixFlaskConfig(self.context, self.file_context) + result_tree = flask_codemod.transform_module(tree) + + if not flask_codemod.flask_app_name: + return tree + + # Later: if we want to write at the end of the module any + # default insecure configs. + # if flask_codemod.configs_to_write: + # return self.insert_secure_configs( + # tree, + # result_tree, + # flask_codemod.flask_app_name, + # flask_codemod.configs_to_write, + # ) + return result_tree + + # def insert_secure_configs( + # self, + # original_node: cst.Module, + # updated_node: cst.Module, + # app_name: str, + # configs: dict, + # ) -> cst.Module: + # if not configs: + # return updated_node + # + # config_string = ", ".join( + # f"{key}='{value[0]}'" if isinstance(value[0], str) else f"{key}={value[0]}" + # for key, value in configs.items() + # if value and value[0] is not None + # ) + # if not config_string: + # return updated_node + # + # self.report_change_endof_module(original_node) + # final_line = cst.parse_statement(f"{app_name}.config.update({config_string})") + # new_body = updated_node.body + (final_line,) + # return updated_node.with_changes(body=new_body) + # + # def report_change_endof_module(self, original_node: cst.Module) -> None: + # # line_number is the end of the module where we will insert the new line. + # pos_to_match = self.node_position(original_node) + # line_number = pos_to_match.end.line + # self.file_context.codemod_changes.append( + # Change(line_number, self.CHANGE_DESCRIPTION) + # ) + + +class FixFlaskConfig(BaseTransformer, NameResolutionMixin): + """ + Visitor to find calls to flask.Flask and related `.config` accesses. + """ + + METADATA_DEPENDENCIES = (PositionProvider, ParentNodeProvider) + SECURE_SESSION_CONFIGS = { + # None value indicates unassigned, using default is safe + # values in order of precedence + "SESSION_COOKIE_HTTPONLY": [None, True], + "SESSION_COOKIE_SECURE": [True], + "SESSION_COOKIE_SAMESITE": ["Lax", "Strict"], + } + + def __init__(self, codemod_context: CodemodContext, file_context: FileContext): + super().__init__(codemod_context, []) + self.flask_app_name = "" + # Later: if we want to store configs to write later + # self.configs_to_write = self.SECURE_SESSION_CONFIGS.copy() + self.file_context = file_context + + def _store_flask_app(self, original_node) -> None: + flask_app_parent = self.get_metadata(ParentNodeProvider, original_node) + match flask_app_parent: + case cst.AnnAssign() | cst.Assign(): + targets = extract_targets_of_assignment(flask_app_parent) + # TODO: handle other assignments ex. l[0] = Flask(...) , a.b = Flask(...) + if targets and matchers.matches( + first_target := targets[0], matchers.Name() + ): + self.flask_app_name = first_target.value + + # def _remove_config(self, key): + # try: + # del self.configs_to_write[key] + # except KeyError: + # pass + + def _get_secure_config_val(self, key): + val = self.SECURE_SESSION_CONFIGS[key][0] or self.SECURE_SESSION_CONFIGS[key][1] + return cst.parse_expression(f'"{val}"' if isinstance(val, str) else f"{val}") + + @property + def flask_app_is_assigned(self): + return bool(self.flask_app_name) + + def leave_Call(self, original_node: cst.Call, updated_node: cst.Call): + true_name = self.find_base_name(original_node.func) + if true_name == "flask.Flask": + self._store_flask_app(original_node) + + if self.flask_app_is_assigned and self._is_config_update_call(original_node): + return self.call_node_with_secure_configs(original_node, updated_node) + return updated_node + + def call_node_with_secure_configs( + self, original_node: cst.Call, updated_node: cst.Call + ) -> cst.Call: + new_args = [] + for arg in updated_node.args: + if (key := arg.keyword.value) in self.SECURE_SESSION_CONFIGS: + # self._remove_config(key) + if true_value(arg.value) not in self.SECURE_SESSION_CONFIGS[key]: # type: ignore + safe_value = self._get_secure_config_val(key) + arg = arg.with_changes(value=safe_value) + new_args.append(arg) + + if updated_node.args != new_args: + self.report_change(original_node) + return updated_node.with_changes(args=new_args) + + def leave_Assign(self, original_node: cst.Assign, updated_node: cst.Assign): + if self.flask_app_is_assigned and self._is_config_subscript(original_node): + return self.assign_node_with_secure_config(original_node, updated_node) + return updated_node + + def assign_node_with_secure_config( + self, original_node: cst.Assign, updated_node: cst.Assign + ) -> cst.Assign: + key = true_value(updated_node.targets[0].target.slice[0].slice.value) + if key in self.SECURE_SESSION_CONFIGS: + # self._remove_config(key) + if true_value(updated_node.value) not in self.SECURE_SESSION_CONFIGS[key]: # type: ignore + safe_value = self._get_secure_config_val(key) + self.report_change(original_node) + return updated_node.with_changes(value=safe_value) + return updated_node + + def _is_config_update_call(self, original_node: cst.Call): + config = matchers.Name(value="config") + app_name = matchers.Name(value=self.flask_app_name) + app_config_node = matchers.Attribute(value=app_name, attr=config) + update = cst.Name(value="update") + return matchers.matches( + original_node.func, matchers.Attribute(value=app_config_node, attr=update) + ) + + def _is_config_subscript(self, original_node: cst.Assign): + config = matchers.Name(value="config") + app_name = matchers.Name(value=self.flask_app_name) + app_config_node = matchers.Attribute(value=app_name, attr=config) + return matchers.matches( + original_node.targets[0].target, matchers.Subscript(value=app_config_node) + ) + + def report_change(self, original_node): + line_number = self.lineno_for_node(original_node) + self.file_context.codemod_changes.append( + Change(line_number, SecureFlaskSessionConfig.CHANGE_DESCRIPTION) + ) diff --git a/tests/codemods/test_secure_flask_session_config.py b/tests/codemods/test_secure_flask_session_config.py new file mode 100644 index 00000000..0700e868 --- /dev/null +++ b/tests/codemods/test_secure_flask_session_config.py @@ -0,0 +1,235 @@ +import pytest +from core_codemods.secure_flask_session_config import SecureFlaskSessionConfig +from tests.codemods.base_codemod_test import BaseCodemodTest +from textwrap import dedent + + +class TestSecureFlaskSessionConfig(BaseCodemodTest): + codemod = SecureFlaskSessionConfig + + def test_name(self): + assert self.codemod.name() == "secure-flask-session-configuration" + + def test_no_flask_app(self, tmpdir): + input_code = """\ + import flask + + response = flask.Response() + var = "hello" + response.set_cookie("name", "value") + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + assert len(self.file_context.codemod_changes) == 0 + + def test_app_defined_separate_module(self, tmpdir): + # TODO: test this as an integration test with two real modules + input_code = """\ + from my_app_module import app + + app.config["SESSION_COOKIE_SECURE"] = False + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + assert len(self.file_context.codemod_changes) == 0 + + def test_app_not_assigned(self, tmpdir): + input_code = """\ + import flask + + flask.Flask(__name__) + print(1) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + assert len(self.file_context.codemod_changes) == 0 + + def test_app_accessed_config_not_called(self, tmpdir): + input_code = """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + # more code + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + assert len(self.file_context.codemod_changes) == 0 + + def test_from_import(self, tmpdir): + input_code = """\ + from flask import Flask + + app = Flask(__name__) + app.secret_key = "dev" + app.config.update(SESSION_COOKIE_SECURE=False) + """ + expexted_output = """\ + from flask import Flask + + app = Flask(__name__) + app.secret_key = "dev" + app.config.update(SESSION_COOKIE_SECURE=True) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) + assert len(self.file_context.codemod_changes) == 1 + + def test_import_alias(self, tmpdir): + input_code = """\ + from flask import Flask as flask_app + app = flask_app(__name__) + app.secret_key = "dev" + # more code + app.config.update(SESSION_COOKIE_SECURE=False) + """ + expexted_output = """\ + from flask import Flask as flask_app + app = flask_app(__name__) + app.secret_key = "dev" + # more code + app.config.update(SESSION_COOKIE_SECURE=True) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) + assert len(self.file_context.codemod_changes) == 1 + + def test_annotated_assign(self, tmpdir): + input_code = """\ + import flask + app: flask.Flask = flask.Flask(__name__) + app.secret_key = "dev" + # more code + app.config.update(SESSION_COOKIE_SECURE=False) + """ + expexted_output = """\ + import flask + app: flask.Flask = flask.Flask(__name__) + app.secret_key = "dev" + # more code + app.config.update(SESSION_COOKIE_SECURE=True) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) + assert len(self.file_context.codemod_changes) == 1 + + def test_other_assignment_type(self, tmpdir): + input_code = """\ + import flask + class AppStore: + pass + store = AppStore() + store.app = flask.Flask(__name__) + store.app.secret_key = "dev" + # more code + store.app.config.update(SESSION_COOKIE_SECURE=False) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + assert len(self.file_context.codemod_changes) == 0 + + @pytest.mark.parametrize( + "config_lines,expected_config_lines", + [ + ( + """app.config""", + """app.config""", + ), + ( + """app.config["TESTING"] = True""", + """app.config["TESTING"] = True""", + ), + ( + """app.config.testing = True""", + """app.config.testing = True""", + ), + ( + """app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + """app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + ), + ( + """app.config.update(SESSION_COOKIE_SECURE=True)""", + """app.config.update(SESSION_COOKIE_SECURE=True)""", + ), + ( + """app.config.update(SESSION_COOKIE_HTTPONLY=True)""", + """app.config.update(SESSION_COOKIE_HTTPONLY=True)""", + ), + ( + """app.config.update(SESSION_COOKIE_HTTPONLY=False)""", + """app.config.update(SESSION_COOKIE_HTTPONLY=True)""", + ), + ( + """app.config['SESSION_COOKIE_SECURE'] = False""", + """app.config['SESSION_COOKIE_SECURE'] = True""", + ), + ( + """app.config['SESSION_COOKIE_HTTPONLY'] = False""", + """app.config['SESSION_COOKIE_HTTPONLY'] = True""", + ), + ( + """app.config["SESSION_COOKIE_SECURE"] = True +app.config["SESSION_COOKIE_SAMESITE"] = "Lax" +""", + """app.config["SESSION_COOKIE_SECURE"] = True +app.config["SESSION_COOKIE_SAMESITE"] = "Lax" +""", + ), + ( + """app.config["SESSION_COOKIE_SECURE"] = False +app.config["SESSION_COOKIE_SAMESITE"] = None +""", + """app.config["SESSION_COOKIE_SECURE"] = True +app.config["SESSION_COOKIE_SAMESITE"] = "Lax" +""", + ), + ( + """app.config["SESSION_COOKIE_SECURE"] = False +app.config["SESSION_COOKIE_HTTPONLY"] = False +app.config["SESSION_COOKIE_SAMESITE"] = "Strict" +""", + """app.config["SESSION_COOKIE_SECURE"] = True +app.config["SESSION_COOKIE_HTTPONLY"] = True +app.config["SESSION_COOKIE_SAMESITE"] = "Strict" +""", + ), + ( + """app.config["SESSION_COOKIE_SECURE"] = False +app.config["SESSION_COOKIE_SECURE"] = True +""", + """app.config["SESSION_COOKIE_SECURE"] = True +app.config["SESSION_COOKIE_SECURE"] = True +""", + ), + ], + ) + def test_config_accessed_variations( + self, tmpdir, config_lines, expected_config_lines + ): + input_code = f"""import flask +app = flask.Flask(__name__) +app.secret_key = "dev" +{config_lines} +""" + expected_output = f"""import flask +app = flask.Flask(__name__) +app.secret_key = "dev" +{expected_config_lines} +""" + self.run_and_assert(tmpdir, dedent(input_code), dedent(expected_output)) + + @pytest.mark.skip() + def test_func_scope(self, tmpdir): + input_code = """\ + from flask import Flask + app = Flask(__name__) + + @app.route('/') + def hello_world(): + return 'Hello World!' + + def configure(): + app.config['TESTING'] = True + + if __name__ == '__main__': + configure() + app.run() + """ + expexted_output = """\ + # TODO correct fix could be multiple options, + # either within configure() call or after it's called + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) + assert len(self.file_context.codemod_changes) == 1 diff --git a/tests/samples/flask_app.py b/tests/samples/flask_app.py new file mode 100644 index 00000000..879f6d37 --- /dev/null +++ b/tests/samples/flask_app.py @@ -0,0 +1,6 @@ +from flask import Flask +app = Flask(__name__) +app.config['SESSION_COOKIE_HTTPONLY'] = False +@app.route('/') +def hello_world(): + return 'Hello World!'