Skip to content

Commit

Permalink
✨ [#100] Add ConfigurationStep for Service model
Browse files Browse the repository at this point in the history
which can load any number of Services, configured in a YAML file
  • Loading branch information
stevenbal committed Nov 29, 2024
1 parent 9148ba2 commit a98c6db
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 0 deletions.
Empty file.
Empty file.
38 changes: 38 additions & 0 deletions zgw_consumers/contrib/setup_configuration/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from django_setup_configuration.models import ConfigurationModel, DjangoModelRef
from pydantic import Field

from zgw_consumers.models import Service


class SingleServiceConfigurationModel(ConfigurationModel):
# TODO these should probably be defined in simple_certmanager and referred to?
# client_certificate: FilePath = DjangoModelRef(Service, "client_certificate")
# server_certificate: FilePath = DjangoModelRef(Service, "server_certificate")
connection_check_expected_status: int | None = Field(
description="The status code that indicates that the connection check is successful",
default=200,
)
timeout: int | None = DjangoModelRef(Service, "timeout")

class Meta:
django_model_refs = {
Service: [
"slug",
"label",
"api_type",
"api_root",
"api_connection_check_path",
"auth_type",
"client_id",
"secret",
"header_key",
"header_value",
"nlx",
"user_id",
"user_representation",
]
}


class ServicesConfigurationModel(ConfigurationModel):
services: list[SingleServiceConfigurationModel] = Field(default_factory=list)
48 changes: 48 additions & 0 deletions zgw_consumers/contrib/setup_configuration/steps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from django_setup_configuration.configuration import BaseConfigurationStep
from django_setup_configuration.exceptions import SelfTestFailed

from zgw_consumers.models import Service

from .models import ServicesConfigurationModel


class ServiceConfigurationStep(BaseConfigurationStep[ServicesConfigurationModel]):
"""
Configure Services to connect with external APIs
"""

verbose_name = "Configuration to connect with external services"
config_model = ServicesConfigurationModel
namespace = "ZGW_CONSUMERS"
enable_setting = "ZGW_CONSUMERS_CONFIG_ENABLE"

def is_configured(self, model: ServicesConfigurationModel) -> bool:
slugs = [config.slug for config in model.services]
return Service.objects.filter(slug__in=slugs).count() == len(slugs)

def execute(self, model: ServicesConfigurationModel):
ignore_fields = ["slug", "connection_check_expected_status"]
for config in model.services:
Service.objects.update_or_create(
slug=config.slug,
defaults={
k: v
for k, v in config.model_dump().items()
if k not in ignore_fields
},
)

def validate_result(self, model: ServicesConfigurationModel) -> None:
slugs = [config.slug for config in model.services]
failed_checks = []
ordered_models = sorted(model.services, key=lambda x: x.slug)
for service, model in zip(
Service.objects.filter(slug__in=slugs).order_by("slug"), ordered_models
):
if service.connection_check != model.connection_check_expected_status:
failed_checks.append(service.slug)

if failed_checks:
raise SelfTestFailed(
f"non-success response from the following service(s): {failed_checks}"
)

0 comments on commit a98c6db

Please sign in to comment.