Skip to content

Commit

Permalink
Optimization implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbruinsslot committed Nov 2, 2023
1 parent 78dd737 commit afa502b
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 29 deletions.
25 changes: 13 additions & 12 deletions mula/scheduler/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,22 +175,23 @@ def collect_metrics(self) -> None:
This method that allows to collect metrics throughout the application.
"""
for s in self.schedulers.values():
self.ctx.metrics_qsize.labels(
scheduler_id=s.scheduler_id,
).set(
s.queue.qsize(),
)

status_counts = self.ctx.datastores.task_store.get_status_counts(s.scheduler_id)
for status, count in status_counts.items():
self.ctx.metrics_task_status_counts.labels(
with self.lock:
for s in self.schedulers.values():
self.ctx.metrics_qsize.labels(
scheduler_id=s.scheduler_id,
status=status,
).set(
count,
s.queue.qsize(),
)

status_counts = self.ctx.datastores.task_store.get_status_counts(s.scheduler_id)
for status, count in status_counts.items():
self.ctx.metrics_task_status_counts.labels(
scheduler_id=s.scheduler_id,
status=status,
).set(
count,
)

def run(self) -> None:
"""Start the main scheduler application, and run in threads the
following processes:
Expand Down
51 changes: 46 additions & 5 deletions mula/scheduler/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ class Settings(BaseSettings):
)

# Application settings
katalogus_cache_ttl: int = Field(
30,
description="The lifetime of the katalogus cache in seconds",
)

monitor_organisations_interval: int = Field(
60,
description="Interval in seconds of the execution of the "
Expand All @@ -90,11 +85,57 @@ class Settings(BaseSettings):
"their schedulers.",
)

# External services settings
octopoes_request_timeout: int = Field(
10,
description="The timeout in seconds for the requests to the octopoes api",
)

octopoes_pool_maxsize: int = Field(
10,
description="The maximum number of connections to the octopoes api",
)

octopoes_pool_connections: int = Field(
10,
description="The maximum number of connections to save in the pool for the octopoes api",
)

katalogus_cache_ttl: int = Field(
30,
description="The lifetime of the katalogus cache in seconds",
)

katalogus_request_timeout: int = Field(
10,
description="The timeout in seconds for the requests to the katalogus api",
)

katalogus_pool_maxsize: int = Field(
10,
description="The maximum number of connections to save in the pool for the katalogus api",
)

katalogus_pool_connections: int = Field(
10,
description="The maximum number of connections to save in the pool for the katalogus api",
)

bytes_request_timeout: int = Field(
10,
description="The timeout in seconds for the requests to the bytes api",
)

bytes_pool_maxsize: int = Field(
10,
description="The maximum number of connections to save in the pool for the bytes api",
)

bytes_pool_connections: int = Field(
10,
description="The maximum number of connections to save in the pool for the bytes api",
)

rabbitmq_prefetch_count: int = Field(
100,
description="RabbitMQ prefetch_count for `channel.basic_qos()`, "
Expand Down
12 changes: 9 additions & 3 deletions mula/scheduler/connectors/services/bytes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
import typing
from functools import wraps
from typing import Any, Callable, Dict, Optional
Expand Down Expand Up @@ -33,7 +34,9 @@ class Bytes(HTTPService):

name = "bytes"

def __init__(self, host: str, source: str, user: str, password: str, timeout: int = 5):
def __init__(
self, host: str, source: str, user: str, password: str, timeout: int, pool_maxsize: int, pool_connections: int
):
"""Initialize the Bytes service.
Args:
Expand All @@ -48,10 +51,13 @@ def __init__(self, host: str, source: str, user: str, password: str, timeout: in
"password": password,
}

super().__init__(host=host, source=source, timeout=timeout)
self.lock: threading.Lock = threading.Lock()

super().__init__(host, source, timeout, pool_maxsize, pool_connections)

def login(self) -> None:
self.headers.update({"Authorization": f"bearer {self.get_token()}"})
with self.lock:
self.headers.update({"Authorization": f"bearer {self.get_token()}"})

@staticmethod
def _verify_response(response: requests.Response) -> None:
Expand Down
6 changes: 4 additions & 2 deletions mula/scheduler/connectors/services/katalogus.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ class Katalogus(HTTPService):

name = "katalogus"

def __init__(self, host: str, source: str, timeout: int = 5, cache_ttl: int = 30):
super().__init__(host, source, timeout)
def __init__(
self, host: str, source: str, timeout: int, pool_maxsize: int, pool_connections: int, cache_ttl: int = 30
):
super().__init__(host, source, timeout, pool_maxsize, pool_connections)

# For every organisation we cache its plugins, it references the
# plugin-id as key and the plugin as value.
Expand Down
4 changes: 3 additions & 1 deletion mula/scheduler/connectors/services/octopoes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ def __init__(
host: str,
source: str,
orgs: List[Organisation],
pool_maxsize: int,
pool_connections: int,
timeout: int = 10,
):
self.orgs: List[Organisation] = orgs
super().__init__(host, source, timeout)
super().__init__(host, source, timeout, pool_maxsize, pool_connections)

@exception_handler
def get_objects_by_object_types(
Expand Down
34 changes: 30 additions & 4 deletions mula/scheduler/connectors/services/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,15 @@ class HTTPService(Connector):
name: Optional[str] = None
health_endpoint: Optional[str] = "health"

def __init__(self, host: str, source: str, timeout: int = 5, retries: int = 5):
def __init__(
self,
host: str,
source: str,
timeout: int = 10,
pool_maxsize: int = 10,
pool_connections: int = 10,
retries: int = 5,
):
"""Initializer of the HTTPService class. During initialization the
host will be checked if it is available and healthy.
Expand All @@ -51,6 +59,10 @@ def __init__(self, host: str, source: str, timeout: int = 5, retries: int = 5):
from where the requests came from.
timeout:
An integer defining the timeout of requests.
pool_maxsize:
The maximum number of connections to save in the pool.
pool_connections:
The maximum number of connections to allow to a single host.
retries:
An integer defining the number of retries to make before
giving up.
Expand All @@ -61,16 +73,30 @@ def __init__(self, host: str, source: str, timeout: int = 5, retries: int = 5):
self.session: requests.Session = requests.Session()
self.host: str = host
self.timeout: int = timeout
self.retries = retries
self.retries: int = retries
self.pool_maxsize: int = pool_maxsize
self.pool_connections: int = pool_connections
self.source: str = source

max_retries = Retry(
total=self.retries,
backoff_factor=0.1,
status_forcelist=[500, 502, 503, 504],
)
self.session.mount("http://", HTTPAdapter(max_retries=max_retries))
self.session.mount("https://", HTTPAdapter(max_retries=max_retries))

# Mount the HTTPAdapter to the session
self.session.mount(
"http://",
HTTPAdapter(
max_retries=max_retries, pool_maxsize=self.pool_maxsize, pool_connections=self.pool_connections
),
)
self.session.mount(
"https://",
HTTPAdapter(
max_retries=max_retries, pool_maxsize=self.pool_maxsize, pool_connections=self.pool_connections
),
)

if self.source:
self.headers["User-Agent"] = self.source
Expand Down
12 changes: 10 additions & 2 deletions mula/scheduler/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,29 @@ def __init__(self) -> None:
katalogus_service = services.Katalogus(
host=remove_trailing_slash(str(self.config.host_katalogus)),
source=f"scheduler/{scheduler.__version__}",
timeout=self.config.katalogus_request_timeout,
pool_maxsize=self.config.katalogus_pool_maxsize,
pool_connections=self.config.katalogus_pool_connections,
cache_ttl=self.config.katalogus_cache_ttl,
)

bytes_service = services.Bytes(
host=remove_trailing_slash(str(self.config.host_bytes)),
source=f"scheduler/{scheduler.__version__}",
user=self.config.host_bytes_user,
password=self.config.host_bytes_password,
source=f"scheduler/{scheduler.__version__}",
timeout=self.config.bytes_request_timeout,
pool_maxsize=self.config.bytes_pool_maxsize,
pool_connections=self.config.bytes_pool_connections,
)

octopoes_service = services.Octopoes(
host=remove_trailing_slash(str(self.config.host_octopoes)),
source=f"scheduler/{scheduler.__version__}",
orgs=katalogus_service.get_organisations(),
timeout=self.config.octopoes_request_timeout,
pool_maxsize=self.config.octopoes_pool_maxsize,
pool_connections=self.config.octopoes_pool_connections,
orgs=katalogus_service.get_organisations(),
)

# Register external services, SimpleNamespace allows us to use dot
Expand Down
6 changes: 6 additions & 0 deletions mula/tests/integration/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ def setUp(self) -> None:
host=remove_trailing_slash(str(self.config.host_bytes)),
user=self.config.host_bytes_user,
password=self.config.host_bytes_password,
timeout=self.config.bytes_request_timeout,
pool_maxsize=self.config.bytes_pool_maxsize,
pool_connections=self.config.bytes_pool_connections,
source="scheduler_test",
)

Expand Down Expand Up @@ -54,6 +57,9 @@ def setUp(self) -> None:
self.service_katalogus = services.Katalogus(
host=remove_trailing_slash(str(self.config.host_katalogus)),
source="scheduler_test",
timeout=self.config.katalogus_request_timeout,
pool_maxsize=self.config.katalogus_pool_maxsize,
pool_connections=self.config.katalogus_pool_connections,
cache_ttl=12345,
)

Expand Down
Loading

0 comments on commit afa502b

Please sign in to comment.