Skip to content

Commit

Permalink
Merge branch 'main' into switch-to-granian
Browse files Browse the repository at this point in the history
  • Loading branch information
underdarknl authored Nov 10, 2023
2 parents 10f3b87 + 7e6fa2b commit 54bd976
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 32 deletions.
3 changes: 1 addition & 2 deletions boefjes/boefjes/job_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from octopoes.models.types import OOIType

logger = logging.getLogger(__name__)

bytes_api_client = BytesAPIClient(
settings.bytes_api,
username=settings.bytes_username,
Expand Down Expand Up @@ -153,7 +154,6 @@ def handle(self, boefje_meta: BoefjeMeta) -> None:
boefje_meta.ended_at = datetime.now(timezone.utc)
logger.info("Saving to Bytes for boefje %s[%s]", boefje_meta.boefje.id, str(boefje_meta.id))

bytes_api_client.login()
bytes_api_client.save_boefje_meta(boefje_meta)

if boefje_results:
Expand All @@ -175,7 +175,6 @@ def __init__(self, job_runner):
def handle(self, normalizer_meta: NormalizerMeta) -> None:
logger.info("Handling normalizer %s[%s]", normalizer_meta.normalizer.id, normalizer_meta.id)

bytes_api_client.login()
raw = bytes_api_client.get_raw(normalizer_meta.raw_data.id)

normalizer_meta.started_at = datetime.now(timezone.utc)
Expand Down
2 changes: 1 addition & 1 deletion docs/source/developer_documentation/boefjes.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This module has several entry points discussed below, but let us first consider the prerequisites and scope.
If you already have running setup and want to learn where each bit of functionality goes, read the following page:

[Developing Openkat Plugins](README.md#your-first-boefje)
[Developing Openkat Plugins]([https://docs.openkat.nl/introduction/makeyourown.html])

## Prerequisites

Expand Down
4 changes: 4 additions & 0 deletions docs/source/modules/api.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
API
The API is used to query the status of tasks within OpenKAT. By querying a task identifier you can see if the task has completed or failed. In the future the API can be used to create your own boefjes.

Boefjes receive their task input and save their output using REST APIs.
25 changes: 13 additions & 12 deletions mula/scheduler/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,22 +175,23 @@ def collect_metrics(self) -> None:
This method that allows to collect metrics throughout the application.
"""
for s in self.schedulers.values():
self.ctx.metrics_qsize.labels(
scheduler_id=s.scheduler_id,
).set(
s.queue.qsize(),
)

status_counts = self.ctx.datastores.task_store.get_status_counts(s.scheduler_id)
for status, count in status_counts.items():
self.ctx.metrics_task_status_counts.labels(
with self.lock:
for s in self.schedulers.values():
self.ctx.metrics_qsize.labels(
scheduler_id=s.scheduler_id,
status=status,
).set(
count,
s.queue.qsize(),
)

status_counts = self.ctx.datastores.task_store.get_status_counts(s.scheduler_id)
for status, count in status_counts.items():
self.ctx.metrics_task_status_counts.labels(
scheduler_id=s.scheduler_id,
status=status,
).set(
count,
)

def run(self) -> None:
"""Start the main scheduler application, and run in threads the
following processes:
Expand Down
36 changes: 31 additions & 5 deletions mula/scheduler/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ class Settings(BaseSettings):
)

# Application settings
katalogus_cache_ttl: int = Field(
30,
description="The lifetime of the katalogus cache in seconds",
)

monitor_organisations_interval: int = Field(
60,
description="Interval in seconds of the execution of the "
Expand All @@ -90,11 +85,42 @@ class Settings(BaseSettings):
"their schedulers.",
)

# External services settings
octopoes_request_timeout: int = Field(
10,
description="The timeout in seconds for the requests to the octopoes api",
)

octopoes_pool_connections: int = Field(
10,
description="The maximum number of connections to save in the pool for the octopoes api",
)

katalogus_cache_ttl: int = Field(
30,
description="The lifetime of the katalogus cache in seconds",
)

katalogus_request_timeout: int = Field(
10,
description="The timeout in seconds for the requests to the katalogus api",
)

katalogus_pool_connections: int = Field(
10,
description="The maximum number of connections to save in the pool for the katalogus api",
)

bytes_request_timeout: int = Field(
10,
description="The timeout in seconds for the requests to the bytes api",
)

bytes_pool_connections: int = Field(
10,
description="The maximum number of connections to save in the pool for the bytes api",
)

rabbitmq_prefetch_count: int = Field(
100,
description="RabbitMQ prefetch_count for `channel.basic_qos()`, "
Expand Down
10 changes: 7 additions & 3 deletions mula/scheduler/connectors/services/bytes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
import typing
from functools import wraps
from typing import Any, Callable, Dict, Optional
Expand Down Expand Up @@ -33,7 +34,7 @@ class Bytes(HTTPService):

name = "bytes"

def __init__(self, host: str, source: str, user: str, password: str, timeout: int = 5):
def __init__(self, host: str, source: str, user: str, password: str, timeout: int, pool_connections: int):
"""Initialize the Bytes service.
Args:
Expand All @@ -48,10 +49,13 @@ def __init__(self, host: str, source: str, user: str, password: str, timeout: in
"password": password,
}

super().__init__(host=host, source=source, timeout=timeout)
self.lock: threading.Lock = threading.Lock()

super().__init__(host, source, timeout, pool_connections)

def login(self) -> None:
self.headers.update({"Authorization": f"bearer {self.get_token()}"})
with self.lock:
self.headers.update({"Authorization": f"bearer {self.get_token()}"})

@staticmethod
def _verify_response(response: requests.Response) -> None:
Expand Down
4 changes: 2 additions & 2 deletions mula/scheduler/connectors/services/katalogus.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class Katalogus(HTTPService):

name = "katalogus"

def __init__(self, host: str, source: str, timeout: int = 5, cache_ttl: int = 30):
super().__init__(host, source, timeout)
def __init__(self, host: str, source: str, timeout: int, pool_connections: int, cache_ttl: int = 30):
super().__init__(host, source, timeout, pool_connections)

# For every organisation we cache its plugins, it references the
# plugin-id as key and the plugin as value.
Expand Down
3 changes: 2 additions & 1 deletion mula/scheduler/connectors/services/octopoes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ def __init__(
host: str,
source: str,
orgs: List[Organisation],
pool_connections: int,
timeout: int = 10,
):
self.orgs: List[Organisation] = orgs
super().__init__(host, source, timeout)
super().__init__(host, source, timeout, pool_connections)

@exception_handler
def get_objects_by_object_types(
Expand Down
21 changes: 17 additions & 4 deletions mula/scheduler/connectors/services/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ class HTTPService(Connector):
name: Optional[str] = None
health_endpoint: Optional[str] = "health"

def __init__(self, host: str, source: str, timeout: int = 5, retries: int = 5):
def __init__(
self,
host: str,
source: str,
timeout: int = 10,
pool_connections: int = 10,
retries: int = 5,
):
"""Initializer of the HTTPService class. During initialization the
host will be checked if it is available and healthy.
Expand All @@ -51,6 +58,8 @@ def __init__(self, host: str, source: str, timeout: int = 5, retries: int = 5):
from where the requests came from.
timeout:
An integer defining the timeout of requests.
pool_connections:
The number of connections kept alive in the pool.
retries:
An integer defining the number of retries to make before
giving up.
Expand All @@ -61,16 +70,20 @@ def __init__(self, host: str, source: str, timeout: int = 5, retries: int = 5):
self.session: requests.Session = requests.Session()
self.host: str = host
self.timeout: int = timeout
self.retries = retries
self.retries: int = retries
self.pool_connections: int = pool_connections
self.source: str = source

max_retries = Retry(
total=self.retries,
backoff_factor=0.1,
status_forcelist=[500, 502, 503, 504],
)
self.session.mount("http://", HTTPAdapter(max_retries=max_retries))
self.session.mount("https://", HTTPAdapter(max_retries=max_retries))

# Mount the HTTPAdapter to the session
http_adapter = HTTPAdapter(max_retries=max_retries, pool_connections=self.pool_connections)
self.session.mount("http://", http_adapter)
self.session.mount("https://", http_adapter)

if self.source:
self.headers["User-Agent"] = self.source
Expand Down
9 changes: 7 additions & 2 deletions mula/scheduler/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,26 @@ def __init__(self) -> None:
katalogus_service = services.Katalogus(
host=remove_trailing_slash(str(self.config.host_katalogus)),
source=f"scheduler/{scheduler.__version__}",
timeout=self.config.katalogus_request_timeout,
pool_connections=self.config.katalogus_pool_connections,
cache_ttl=self.config.katalogus_cache_ttl,
)

bytes_service = services.Bytes(
host=remove_trailing_slash(str(self.config.host_bytes)),
source=f"scheduler/{scheduler.__version__}",
user=self.config.host_bytes_user,
password=self.config.host_bytes_password,
source=f"scheduler/{scheduler.__version__}",
timeout=self.config.bytes_request_timeout,
pool_connections=self.config.bytes_pool_connections,
)

octopoes_service = services.Octopoes(
host=remove_trailing_slash(str(self.config.host_octopoes)),
source=f"scheduler/{scheduler.__version__}",
orgs=katalogus_service.get_organisations(),
timeout=self.config.octopoes_request_timeout,
pool_connections=self.config.octopoes_pool_connections,
orgs=katalogus_service.get_organisations(),
)

# Register external services, SimpleNamespace allows us to use dot
Expand Down
4 changes: 4 additions & 0 deletions mula/tests/integration/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def setUp(self) -> None:
host=remove_trailing_slash(str(self.config.host_bytes)),
user=self.config.host_bytes_user,
password=self.config.host_bytes_password,
timeout=self.config.bytes_request_timeout,
pool_connections=self.config.bytes_pool_connections,
source="scheduler_test",
)

Expand Down Expand Up @@ -54,6 +56,8 @@ def setUp(self) -> None:
self.service_katalogus = services.Katalogus(
host=remove_trailing_slash(str(self.config.host_katalogus)),
source="scheduler_test",
timeout=self.config.katalogus_request_timeout,
pool_connections=self.config.katalogus_pool_connections,
cache_ttl=12345,
)

Expand Down
Empty file added mula/tests/scripts/__init__.py
Empty file.
Loading

0 comments on commit 54bd976

Please sign in to comment.