diff --git a/stix2/datastore/relational_db/database_backends/database_backend_base.py b/stix2/datastore/relational_db/database_backends/database_backend_base.py index e5082451..7e5ad4af 100644 --- a/stix2/datastore/relational_db/database_backends/database_backend_base.py +++ b/stix2/datastore/relational_db/database_backends/database_backend_base.py @@ -120,6 +120,10 @@ def determine_stix_type(stix_object): def array_allowed(): return False + @staticmethod + def create_regex_constraint_expression(column, pattern): + pass + def process_value_for_insert(self, stix_type, value): sql_type = stix_type.determine_sql_type(self) if sql_type == self.determine_sql_type_for_string_property(): diff --git a/stix2/datastore/relational_db/database_backends/postgres_backend.py b/stix2/datastore/relational_db/database_backends/postgres_backend.py index 931636e2..c90f5256 100644 --- a/stix2/datastore/relational_db/database_backends/postgres_backend.py +++ b/stix2/datastore/relational_db/database_backends/postgres_backend.py @@ -73,3 +73,7 @@ def determine_sql_type_for_timestamp_property(): # noqa: F811 @staticmethod def array_allowed(): return True + + @staticmethod + def create_regex_constraint_expression(column_name, pattern): + return f"{column_name} ~ {pattern}" diff --git a/stix2/datastore/relational_db/table_creation.py b/stix2/datastore/relational_db/table_creation.py index f57ff856..ea71b947 100644 --- a/stix2/datastore/relational_db/table_creation.py +++ b/stix2/datastore/relational_db/table_creation.py @@ -162,6 +162,7 @@ def create_kill_chain_phases_table(name, metadata, db_backend, schema_name, tabl def create_granular_markings_table(metadata, db_backend, sco_or_sdo): schema_name = db_backend.schema_for_core() tables = list() + reg_ex = f"'^marking-definition--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'" # noqa: E131 columns = [ Column( "id", @@ -173,10 +174,7 @@ def create_granular_markings_table(metadata, db_backend, sco_or_sdo): Column( "marking_ref", db_backend.determine_sql_type_for_reference_property(), - CheckConstraint( - "marking_ref ~ '^marking-definition--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'", - # noqa: E131 - ), + CheckConstraint(db_backend.create_regex_constraint_expression("marking_ref", reg_ex)), ), ] if db_backend.array_allowed(): @@ -230,15 +228,13 @@ def create_granular_markings_table(metadata, db_backend, sco_or_sdo): def create_external_references_tables(metadata, db_backend): + reg_ex = "'^[a-z][a-z0-9-]+[a-z0-9]--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'" # noqa: E131 columns = [ Column( "id", db_backend.determine_sql_type_for_key_as_id(), ForeignKey("common.core_sdo" + ".id", ondelete="CASCADE"), - CheckConstraint( - "id ~ '^[a-z][a-z0-9-]+[a-z0-9]--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'", # noqa: E131 - ), - ), + CheckConstraint(db_backend.create_regex_constraint_expression("id", reg_ex))), Column("source_name", db_backend.determine_sql_type_for_string_property()), Column("description", db_backend.determine_sql_type_for_string_property()), Column("url", db_backend.determine_sql_type_for_string_property()), @@ -255,26 +251,23 @@ def create_external_references_tables(metadata, db_backend): def create_core_table(metadata, db_backend, stix_type_name): tables = list() table_name = "core_" + stix_type_name + reg_ex = "'^[a-z][a-z0-9-]+[a-z0-9]--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'" # noqa: E131 columns = [ Column( "id", db_backend.determine_sql_type_for_key_as_id(), - CheckConstraint( - "id ~ '^[a-z][a-z0-9-]+[a-z0-9]--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'", # noqa: E131 - ), + CheckConstraint(db_backend.create_regex_constraint_expression("id", reg_ex)), primary_key=True, ), Column("spec_version", db_backend.determine_sql_type_for_string_property(), default="2.1"), ] if stix_type_name == "sdo": + reg_ex = "'^identity--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'" # noqa: E131 sdo_columns = [ Column( "created_by_ref", db_backend.determine_sql_type_for_reference_property(), - CheckConstraint( - "created_by_ref ~ '^identity--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'", # noqa: E131 - ), - ), + CheckConstraint(db_backend.create_regex_constraint_expression("created_by_ref", reg_ex))), Column("created", db_backend.determine_sql_type_for_timestamp_property()), Column("modified", db_backend.determine_sql_type_for_timestamp_property()), Column("revoked", db_backend.determine_sql_type_for_boolean_property()), @@ -405,9 +398,8 @@ def generate_table_information(self, name, db_backend, **kwargs): # noqa: F811 return Column( name, self.determine_sql_type(db_backend), - CheckConstraint( - # this regular expression might accept or reject some legal base64 strings - f"{name} ~ " + "'^[-A-Za-z0-9+/]*={0,3}$'", + # this regular expression might accept or reject some legal base64 strings + CheckConstraint(db_backend.create_regex_constraint_expression(name, "'^[-A-Za-z0-9+/]*={0,3}$'") ), nullable=not self.required, ) @@ -534,9 +526,7 @@ def generate_table_information(self, name, db_backend, **kwargs): # noqa: F811 return Column( name, self.determine_sql_type(db_backend), - CheckConstraint( - f"{name} ~ '^{enum_re}$'", - ), + CheckConstraint(db_backend.create_regex_constraint_expression(name, f"'^{enum_re}$'")), nullable=not self.required, ) @@ -609,18 +599,7 @@ def generate_table_information(self, name, db_backend, **kwargs): # noqa: F811 schema_name = kwargs.get('schema_name') table_name = kwargs.get("table_name") core_table = kwargs.get("core_table") - # if schema_name == "common": - # return Column( - # name, - # Text, - # CheckConstraint( - # f"{name} ~ '^{table_name}" + "--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'", - # # noqa: E131 - # ), - # primary_key=True, - # nullable=not (self.required), - # ) - # else: + id_req_exp = f"'^{table_name}" + "--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'" # noqa: E131 if schema_name: foreign_key_column = f"common.core_{core_table}.id" else: @@ -630,8 +609,7 @@ def generate_table_information(self, name, db_backend, **kwargs): # noqa: F811 db_backend.determine_sql_type_for_key_as_id(), ForeignKey(foreign_key_column, ondelete="CASCADE"), CheckConstraint( - f"{name} ~ '^{table_name}" + "--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'", - # noqa: E131 + db_backend.create_regex_constraint_expression(name, id_req_exp) ), primary_key=True, nullable=not (self.required), @@ -743,17 +721,14 @@ def ref_column(name, specifics, db_backend, auth_type=0): if specifics: types = "|".join(specifics) if auth_type == 0: + reg_ex = f"'^({types})" + "--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'" # noqa: F811 constraint = \ - CheckConstraint( - f"{name} ~ '^({types})" + - "--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'", - ) + CheckConstraint(db_backend.create_regex_constraint_expression(name, reg_ex)) else: + reg_ex = "'--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$')" constraint = \ - CheckConstraint( - f"(NOT({name} ~ '^({types})')) AND ({name} ~ " + - "'--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$')", - ) + CheckConstraint(db_backend.create_regex_constraint_expression(f"NOT({name}", f"'^({types})'") + " AND " + + db_backend.create_regex_constraint_expression(name, reg_ex)) return Column(name, db_backend.determine_sql_type_for_reference_property(), constraint) else: return Column( @@ -789,9 +764,6 @@ def generate_table_information(self, name, db_backend, **kwargs): # noqa: F811 return Column( name, self.determine_sql_type(db_backend), - # CheckConstraint( - # f"{name} ~ '^{enum_re}$'" - # ), nullable=not (self.required), )