From 08b75b1c8663d8fc155df7ec1ede2004d56c70eb Mon Sep 17 00:00:00 2001 From: Jacob Walls Date: Mon, 11 Nov 2024 08:01:10 -0500 Subject: [PATCH] Improve SQL safety --- arches/app/models/lookups.py | 49 ++++++++++++++---------------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/arches/app/models/lookups.py b/arches/app/models/lookups.py index f3940b13e7..5d4f543f6a 100644 --- a/arches/app/models/lookups.py +++ b/arches/app/models/lookups.py @@ -1,66 +1,55 @@ from django.db.models import JSONField, Lookup +from psycopg2.extensions import AsIs, QuotedString -# TODO: manually merging parameters is usually a no-no, -# but are these already safe via the \" quotes? -# Look into sql.Identifier or AsIs(). +class JSONPathFilter: + """Handle the double-quoting and escaping for JSONPath filters.""" -# Seems like a Django bug that I need to override get_db_prep_lookup(). TODO: Ask. + def process_rhs(self, compiler, connection): + rhs, params = super().process_rhs(compiler, connection) + escaped = AsIs(QuotedString(params[0]).getquoted().decode()[1:-1]) + return rhs, (escaped,) @JSONField.register_lookup -class AnyLanguageEquals(Lookup): +class AnyLanguageEquals(JSONPathFilter, Lookup): lookup_name = "any_lang" - def get_db_prep_lookup(self, value, connection): - return ("%s", (value,)) - def as_sql(self, compiler, connection): lhs, lhs_params = self.process_lhs(compiler, connection) rhs, rhs_params = self.process_rhs(compiler, connection) - placeholder = "%s @? '$.*.value ? (@ == \"" + rhs_params[0] + "\")'" - return placeholder % (lhs,), lhs_params + params = lhs_params + rhs_params + return "%s @? '$.*.value ? (@ == \"%s\")'" % (lhs, rhs), params @JSONField.register_lookup -class AnyLanguageContains(Lookup): +class AnyLanguageContains(JSONPathFilter, Lookup): lookup_name = "any_lang_contains" - def get_db_prep_lookup(self, value, connection): - return ("%s", (value,)) - def as_sql(self, compiler, connection): lhs, lhs_params = self.process_lhs(compiler, connection) rhs, rhs_params = self.process_rhs(compiler, connection) - placeholder = "%s @? '$.*.value ? (@ like_regex \"" + rhs_params[0] + "\")'" - return placeholder % (lhs,), lhs_params + params = lhs_params + rhs_params + return "%s @? '$.*.value ? (@ like_regex \"%s\")'" % (lhs, rhs), params @JSONField.register_lookup -class AnyLanguageIContains(Lookup): +class AnyLanguageIContains(JSONPathFilter, Lookup): lookup_name = "any_lang_icontains" - def get_db_prep_lookup(self, value, connection): - return ("%s", (value,)) - def as_sql(self, compiler, connection): lhs, lhs_params = self.process_lhs(compiler, connection) rhs, rhs_params = self.process_rhs(compiler, connection) - placeholder = ( - "%s @? '$.*.value ? (@ like_regex \"" + rhs_params[0] + '" flag "i")\'' - ) - return placeholder % (lhs,), lhs_params + params = lhs_params + rhs_params + return '%s @? \'$.*.value ? (@ like_regex "%s" flag "i")\'' % (lhs, rhs), params @JSONField.register_lookup -class AnyLanguageStartsWith(Lookup): +class AnyLanguageStartsWith(JSONPathFilter, Lookup): lookup_name = "any_lang_startswith" - def get_db_prep_lookup(self, value, connection): - return ("%s", (value,)) - def as_sql(self, compiler, connection): lhs, lhs_params = self.process_lhs(compiler, connection) rhs, rhs_params = self.process_rhs(compiler, connection) - placeholder = "%s @? '$.*.value ? (@ starts with \"" + rhs_params[0] + "\")'" - return placeholder % (lhs,), lhs_params + params = lhs_params + rhs_params + return "%s @? '$.*.value ? (@ starts with \"%s\")'" % (lhs, rhs), params