From 111ed2af8721210210386d07fa14bc96daee4111 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Mon, 26 Feb 2024 14:16:58 +0100 Subject: [PATCH] [feat] Introduce managed_kubernetes_cluster and clean up collect (#1939) --- plugins/aws/resoto_plugin_aws/resource/ecs.py | 4 +- plugins/aws/resoto_plugin_aws/resource/eks.py | 10 +- .../resoto_plugin_digitalocean/client.py | 6 +- .../resoto_plugin_digitalocean/collector.py | 3 +- .../resoto_plugin_digitalocean/resources.py | 9 +- plugins/digitalocean/test/test_collector.py | 2 +- .../resoto_plugin_gcp/resources/container.py | 6 +- plugins/k8s/resoto_plugin_k8s/__init__.py | 6 +- resotolib/resotolib/baseplugin.py | 3 +- resotolib/resotolib/baseresources.py | 10 + resotoworker/resotoworker/__main__.py | 6 + resotoworker/resotoworker/collect.py | 242 ++++++++++-------- 12 files changed, 177 insertions(+), 130 deletions(-) diff --git a/plugins/aws/resoto_plugin_aws/resource/ecs.py b/plugins/aws/resoto_plugin_aws/resource/ecs.py index 412374e70a..dccd4b4710 100644 --- a/plugins/aws/resoto_plugin_aws/resource/ecs.py +++ b/plugins/aws/resoto_plugin_aws/resource/ecs.py @@ -1784,7 +1784,7 @@ class AwsEcsContainerInstance(EcsTaggable, AwsResource): "arn": S("containerInstanceArn"), "ec2_instance_id": S("ec2InstanceId"), "capacity_provider_name": S("capacityProviderName"), - "version": S("version"), + "version": S("version") >> F(str), "version_info": S("versionInfo") >> Bend(AwsEcsVersionInfo.mapping), "remaining_resources": S("remainingResources", default=[]) >> ForallBend(AwsEcsResource.mapping), "registered_resources": S("registeredResources", default=[]) >> ForallBend(AwsEcsResource.mapping), @@ -1800,7 +1800,7 @@ class AwsEcsContainerInstance(EcsTaggable, AwsResource): } ec2_instance_id: Optional[str] = field(default=None) capacity_provider_name: Optional[str] = field(default=None) - version: Optional[int] = field(default=None) + version: Optional[str] = field(default=None) version_info: Optional[AwsEcsVersionInfo] = field(default=None) remaining_resources: List[AwsEcsResource] = field(factory=list) registered_resources: List[AwsEcsResource] = field(factory=list) diff --git a/plugins/aws/resoto_plugin_aws/resource/eks.py b/plugins/aws/resoto_plugin_aws/resource/eks.py index b64f1840ed..dd417acb8a 100644 --- a/plugins/aws/resoto_plugin_aws/resource/eks.py +++ b/plugins/aws/resoto_plugin_aws/resource/eks.py @@ -6,7 +6,7 @@ from resoto_plugin_aws.resource.autoscaling import AwsAutoScalingGroup from resoto_plugin_aws.resource.base import AwsResource, GraphBuilder, AwsApiSpec from resoto_plugin_aws.resource.iam import AwsIamRole -from resotolib.baseresources import ModelReference +from resotolib.baseresources import ModelReference, BaseManagedKubernetesClusterProvider from resotolib.graph import Graph from resotolib.json_bender import Bender, S, Bend, ForallBend from resotolib.types import Json @@ -399,7 +399,7 @@ class AwsEksConnectorConfig: @define(eq=False, slots=False) -class AwsEksCluster(EKSTaggable, AwsResource): +class AwsEksCluster(EKSTaggable, BaseManagedKubernetesClusterProvider, AwsResource): kind: ClassVar[str] = "aws_eks_cluster" kind_display: ClassVar[str] = "AWS EKS Cluster" aws_metadata: ClassVar[Dict[str, Any]] = {"provider_link_tpl": "https://{region_id}.console.aws.amazon.com/eks/home?region={region}#/clusters/{name}", "arn_tpl": "arn:{partition}:eks:{region}:{account}:cluster/{name}"} # fmt: skip @@ -421,8 +421,8 @@ class AwsEksCluster(EKSTaggable, AwsResource): "name": S("name"), "arn": S("arn"), "ctime": S("createdAt"), - "cluster_version": S("version"), - "cluster_endpoint": S("endpoint"), + "version": S("version"), + "endpoint": S("endpoint"), "cluster_role_arn": S("roleArn"), "cluster_resources_vpc_config": S("resourcesVpcConfig") >> Bend(AwsEksVpcConfigResponse.mapping), "cluster_kubernetes_network_config": S("kubernetesNetworkConfig") @@ -436,8 +436,6 @@ class AwsEksCluster(EKSTaggable, AwsResource): "cluster_encryption_config": S("encryptionConfig", default=[]) >> ForallBend(AwsEksEncryptionConfig.mapping), "cluster_connector_config": S("connectorConfig") >> Bend(AwsEksConnectorConfig.mapping), } - cluster_version: Optional[str] = field(default=None) - cluster_endpoint: Optional[str] = field(default=None) cluster_role_arn: Optional[str] = field(default=None) cluster_resources_vpc_config: Optional[AwsEksVpcConfigResponse] = field(default=None) cluster_kubernetes_network_config: Optional[AwsEksKubernetesNetworkConfigResponse] = field(default=None) diff --git a/plugins/digitalocean/resoto_plugin_digitalocean/client.py b/plugins/digitalocean/resoto_plugin_digitalocean/client.py index 1e761d1d19..d8a2eba92e 100644 --- a/plugins/digitalocean/resoto_plugin_digitalocean/client.py +++ b/plugins/digitalocean/resoto_plugin_digitalocean/client.py @@ -1,14 +1,14 @@ import logging -from attrs import define +from datetime import datetime from functools import lru_cache from typing import List, Any, Optional, Union, TypeVar, Callable, Mapping +from urllib.parse import urljoin, urlencode import boto3 import requests +from attrs import define from botocore.exceptions import EndpointConnectionError, HTTPClientError from retrying import retry as retry_decorator -from urllib.parse import urljoin, urlencode -from datetime import datetime from resoto_plugin_digitalocean.utils import RetryableHttpError from resoto_plugin_digitalocean.utils import retry_on_error diff --git a/plugins/digitalocean/resoto_plugin_digitalocean/collector.py b/plugins/digitalocean/resoto_plugin_digitalocean/collector.py index 6fee2a22df..16c69f05e2 100644 --- a/plugins/digitalocean/resoto_plugin_digitalocean/collector.py +++ b/plugins/digitalocean/resoto_plugin_digitalocean/collector.py @@ -1,4 +1,5 @@ import logging + import math from pprint import pformat from typing import Tuple, Type, List, Dict, Callable, Any, Optional, cast, DefaultDict @@ -833,7 +834,7 @@ def collect_k8s_clusters(self) -> None: attr_map={ "id": "id", "urn": lambda c: kubernetes_id(c["id"]), - "k8s_version": "version", + "version": "version", "k8s_cluster_subnet": "cluster_subnet", "k8s_service_subnet": "service_subnet", "ipv4_address": "ipv4", diff --git a/plugins/digitalocean/resoto_plugin_digitalocean/resources.py b/plugins/digitalocean/resoto_plugin_digitalocean/resources.py index 8a34459bdb..bbf4ac9973 100644 --- a/plugins/digitalocean/resoto_plugin_digitalocean/resources.py +++ b/plugins/digitalocean/resoto_plugin_digitalocean/resources.py @@ -1,4 +1,6 @@ import logging +from dataclasses import field + from attrs import define from typing import ClassVar, Dict, List, Optional, Tuple, Any @@ -25,6 +27,7 @@ BaseDNSRecord, ModelReference, PhantomBaseResource, + BaseManagedKubernetesClusterProvider, ) from resotolib.graph import Graph import time @@ -285,7 +288,7 @@ class DigitalOceanDropletNeighborhood(DigitalOceanResource, PhantomBaseResource) @define(eq=False, slots=False) -class DigitalOceanKubernetesCluster(DigitalOceanResource, BaseResource): +class DigitalOceanKubernetesCluster(DigitalOceanResource, BaseManagedKubernetesClusterProvider): """DigitalOcean Kubernetes Cluster""" kind: ClassVar[str] = "digitalocean_kubernetes_cluster" @@ -301,13 +304,11 @@ class DigitalOceanKubernetesCluster(DigitalOceanResource, BaseResource): } } - k8s_version: Optional[str] = None k8s_cluster_subnet: Optional[str] = None k8s_service_subnet: Optional[str] = None ipv4_address: Optional[str] = None - endpoint: Optional[str] = None auto_upgrade_enabled: Optional[bool] = None - cluster_status: Optional[str] = None + cluster_status: Optional[str] = field(default=None, metadata=dict(ignore_history=True)) surge_upgrade_enabled: Optional[bool] = None registry_enabled: Optional[bool] = None ha_enabled: Optional[bool] = None diff --git a/plugins/digitalocean/test/test_collector.py b/plugins/digitalocean/test/test_collector.py index 536feda43a..9e6b88e4c7 100644 --- a/plugins/digitalocean/test/test_collector.py +++ b/plugins/digitalocean/test/test_collector.py @@ -330,7 +330,7 @@ def test_collect_k8s_clusters() -> None: cluster: DigitalOceanKubernetesCluster = graph.search_first("urn", "do:kubernetes:e1c48631-b382-4001-2168-c47c54795a26") # type: ignore # noqa: E501 assert cluster.urn == "do:kubernetes:e1c48631-b382-4001-2168-c47c54795a26" assert cluster.name == "k8s-1-22-7-do-0-fra1-test" - assert cluster.k8s_version == "1.22.7-do.0" + assert cluster.version == "1.22.7-do.0" assert cluster.region().urn == "do:region:fra1" # type: ignore assert cluster.k8s_cluster_subnet == "10.244.0.0/16" assert cluster.k8s_service_subnet == "10.245.0.0/16" diff --git a/plugins/gcp/resoto_plugin_gcp/resources/container.py b/plugins/gcp/resoto_plugin_gcp/resources/container.py index 32f3a90639..86e0df4027 100644 --- a/plugins/gcp/resoto_plugin_gcp/resources/container.py +++ b/plugins/gcp/resoto_plugin_gcp/resources/container.py @@ -5,7 +5,7 @@ from resoto_plugin_gcp.gcp_client import GcpApiSpec from resoto_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder -from resotolib.baseresources import ModelReference +from resotolib.baseresources import ModelReference, BaseManagedKubernetesClusterProvider from resotolib.json_bender import Bender, S, Bend, ForallBend, MapDict from resotolib.types import Json @@ -1083,7 +1083,7 @@ class GcpContainerResourceUsageExportConfig: @define(eq=False, slots=False) -class GcpContainerCluster(GcpResource): +class GcpContainerCluster(BaseManagedKubernetesClusterProvider, GcpResource): kind: ClassVar[str] = "gcp_container_cluster" kind_display: ClassVar[str] = "GCP Container Cluster" kind_description: ClassVar[str] = ( @@ -1124,6 +1124,7 @@ class GcpContainerCluster(GcpResource): "cost_management_config": S("costManagementConfig", "enabled"), "create_time": S("createTime"), "current_master_version": S("currentMasterVersion"), + "version": S("currentMasterVersion"), "current_node_count": S("currentNodeCount"), "current_node_version": S("currentNodeVersion"), "database_encryption": S("databaseEncryption", default={}) >> Bend(GcpContainerDatabaseEncryption.mapping), @@ -1192,7 +1193,6 @@ class GcpContainerCluster(GcpResource): default_max_pods_constraint: Optional[str] = field(default=None) enable_kubernetes_alpha: Optional[bool] = field(default=None) enable_tpu: Optional[bool] = field(default=None) - endpoint: Optional[str] = field(default=None) etag: Optional[str] = field(default=None) expire_time: Optional[datetime] = field(default=None) identity_service_config: Optional[bool] = field(default=None) diff --git a/plugins/k8s/resoto_plugin_k8s/__init__.py b/plugins/k8s/resoto_plugin_k8s/__init__.py index ca6a22e540..d72b3120c5 100644 --- a/plugins/k8s/resoto_plugin_k8s/__init__.py +++ b/plugins/k8s/resoto_plugin_k8s/__init__.py @@ -5,14 +5,14 @@ from tempfile import TemporaryDirectory from typing import Dict, Any, Type, Optional -import resotolib.logger -import resotolib.proc from kubernetes.client import ApiException from kubernetes.client import Configuration +import resotolib.logger +import resotolib.proc from resoto_plugin_k8s.base import K8sApiClient, K8sClient -from resoto_plugin_k8s.collector import KubernetesCollector from resoto_plugin_k8s.base import K8sConfig +from resoto_plugin_k8s.collector import KubernetesCollector from resoto_plugin_k8s.deferred_edges import create_deferred_edges from resotolib.args import ArgumentParser, Namespace from resotolib.baseplugin import BaseCollectorPlugin diff --git a/resotolib/resotolib/baseplugin.py b/resotolib/resotolib/baseplugin.py index 9282457dcb..60270227ae 100644 --- a/resotolib/resotolib/baseplugin.py +++ b/resotolib/resotolib/baseplugin.py @@ -1,9 +1,9 @@ import time from abc import ABC, abstractmethod from enum import Enum, auto +from queue import Queue from threading import Thread, current_thread from typing import Dict, Optional, Any -from queue import Queue from prometheus_client import Counter @@ -19,7 +19,6 @@ from resotolib.logger import log from resotolib.types import Json - metrics_unhandled_plugin_exceptions = Counter( "resoto_unhandled_plugin_exceptions_total", "Unhandled plugin exceptions", diff --git a/resotolib/resotolib/baseresources.py b/resotolib/resotolib/baseresources.py index 57dbbabac5..972423d2a1 100644 --- a/resotolib/resotolib/baseresources.py +++ b/resotolib/resotolib/baseresources.py @@ -1264,6 +1264,16 @@ class BaseOrganizationalUnit(BaseResource): metadata: ClassVar[Dict[str, Any]] = {"icon": "resource", "group": "misc"} +@define(eq=False, slots=False) +class BaseManagedKubernetesClusterProvider(BaseResource): + kind: ClassVar[str] = "managed_kubernetes_cluster_provider" + kind_display: ClassVar[str] = "Managed Kubernetes Cluster Provider" + kind_description: ClassVar[str] = "A managed kubernetes cluster provider." + metadata: ClassVar[Dict[str, Any]] = {"icon": "cluster", "group": "compute"} + version: Optional[str] = field(default=None, metadata={"description": "The kubernetes version"}) + endpoint: Optional[str] = field(default=None, metadata={"description": "The kubernetes API endpoint"}) + + @define(eq=False, slots=False) class UnknownCloud(BaseCloud): kind: ClassVar[str] = "unknown_cloud" diff --git a/resotoworker/resotoworker/__main__.py b/resotoworker/resotoworker/__main__.py index 0c810a4932..26c972af8a 100644 --- a/resotoworker/resotoworker/__main__.py +++ b/resotoworker/resotoworker/__main__.py @@ -236,6 +236,12 @@ def core_actions_processor( if kind == "action": try: if message_type == "collect": + if len(collectors) == 0: + log.error("No no collector plugins loaded - skipping collect") + return None + if config.resotoworker.pool_size == 0: + log.error("Zero workers configured - skipping collect") + return None collect_event.set() start_time = time.time() collector.collect_and_send(collectors, task_data=data) diff --git a/resotoworker/resotoworker/collect.py b/resotoworker/resotoworker/collect.py index 6ba5e6df3a..71689313a1 100644 --- a/resotoworker/resotoworker/collect.py +++ b/resotoworker/resotoworker/collect.py @@ -1,52 +1,131 @@ +from __future__ import annotations + import multiprocessing import threading -from tempfile import mkdtemp -from shutil import rmtree -from queue import Queue -import resotolib.proc -from time import time +from argparse import Namespace from concurrent import futures -from threading import Lock +from concurrent.futures import Executor, Future from multiprocessing.managers import SyncManager -from resotoworker.exceptions import DuplicateMessageError -from resotoworker.resotocore import Resotocore +from queue import Queue +from shutil import rmtree +from tempfile import mkdtemp +from threading import Lock +from time import time +from types import TracebackType +from typing import List, Optional, Type, Set + +import resotolib.proc +from resotolib.args import ArgumentParser from resotolib.baseplugin import BaseCollectorPlugin from resotolib.baseresources import GraphRoot, BaseCloud, BaseAccount, BaseResource +from resotolib.config import Config, RunningConfig from resotolib.core.actions import CoreFeedback from resotolib.graph import Graph, sanitize, GraphMergeKind from resotolib.logger import log, setup_logger -from resotolib.args import ArgumentParser -from argparse import Namespace -from typing import List, Optional, Type, Dict, Any, Set -from resotolib.config import Config, RunningConfig from resotolib.types import Json +from resotoworker.exceptions import DuplicateMessageError +from resotoworker.resotocore import Resotocore TaskId = str -class Collector: - def __init__(self, config: Config, resotocore: Resotocore, core_messages: Queue[Json]) -> None: - self._resotocore = resotocore - self._config = config - self.core_messages = core_messages - self.processing: Set[str] = set() - self.processing_lock = Lock() - - def graph_sender(self, graph_queue: Queue[Optional[Graph]], task_id: TaskId, tempdir: str) -> None: +class CollectRun: + def __init__( + self, + config: Config, + resotocore: Resotocore, + core_messages: Queue[Json], + collectors: List[Type[BaseCollectorPlugin]], + task_data: Json, + ) -> None: + self.config = config + self.resotocore = resotocore + self.task_data = task_data + self.collectors = collectors + self.task_id = task_data["task"] + self.step_name = task_data["step"] + self.core_feedback = CoreFeedback(self.task_id, self.step_name, "collect", core_messages) + self.mp_manager = SyncManager(ctx=multiprocessing.get_context("spawn")) + self.graph_queue: Optional[Queue[Optional[Graph]]] = None + self.graph_sender_threads: List[threading.Thread] = [] + self.tempdir = mkdtemp(prefix=f"resoto-{self.task_id}", dir=config.resotoworker.tempdir) + self.pool_executor: Optional[Executor] = None + self.futures_to_wait_for: List[Future[bool]] = [] + + def __enter__(self) -> CollectRun: + log.debug("Create multi process manager") + self.mp_manager.start(initializer=resotolib.proc.increase_limits) + graph_queue = self.mp_manager.Queue() + self.graph_queue = graph_queue + for i in range(self.config.resotoworker.graph_sender_pool_size): + graph_sender_t = threading.Thread( + target=self.__graph_sender, + args=(graph_queue, self.task_id, self.tempdir), + name=f"graph_sender_{i}", + ) + graph_sender_t.daemon = True + graph_sender_t.start() + self.graph_sender_threads.append(graph_sender_t) + pool_executor_class: Type[Executor] + pool_args = {"max_workers": max(len(self.collectors), self.config.resotoworker.pool_size)} + if self.config.resotoworker.fork_process: + pool_args["mp_context"] = multiprocessing.get_context("spawn") + pool_args["initializer"] = resotolib.proc.initializer + pool_executor_class = futures.ProcessPoolExecutor + else: + pool_executor_class = futures.ThreadPoolExecutor + + self.pool_executor = pool_executor_class(**pool_args) + self.pool_executor.__enter__() + return self + + def __exit__( + self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + ) -> Optional[bool]: + log.debug("Telling graph sender threads to end") + if self.graph_queue: + for _ in self.graph_sender_threads: + self.graph_queue.put(None) + for t in self.graph_sender_threads: + t.join(300) + if self.pool_executor: + log.debug("Stopping executor") + self.pool_executor.__exit__(exc_type, exc_val, exc_tb) + self.mp_manager.shutdown() + if not self.config.resotoworker.debug_dump_json: + rmtree(self.tempdir, ignore_errors=True) + return None + + def collect(self) -> None: + assert self.graph_queue, "No GraphQueue - CollectRun started?" + self.__collect_all(self.collectors, self.config.resotoworker.graph_merge_kind) + while self.futures_to_wait_for or not self.graph_queue.empty(): + for future in futures.as_completed(self.futures_to_wait_for.copy()): + self.futures_to_wait_for.remove(future) + log.info("Collect done. tearing down.") + + def __graph_sender(self, graph_queue: Queue[Optional[Graph]], task_id: TaskId, tempdir: str) -> None: log.debug("Waiting for collector graphs") start_time = time() while True: + # wait for the next element to come in collector_graph = graph_queue.get() if collector_graph is None: run_time = time() - start_time log.debug(f"Ending graph sender thread for task id {task_id} after {run_time} seconds") break + # signal to the outside world, that we are busy + import_graph: Future[bool] = Future() + self.futures_to_wait_for.append(import_graph) + + # Create and sanitize the graph graph = Graph(root=GraphRoot(id="root", tags={})) graph.merge(collector_graph) del collector_graph sanitize(graph) + # Create a human-readable description of the graph graph_info = "" assert isinstance(graph.root, BaseResource) for cloud in graph.successors(graph.root): @@ -55,20 +134,52 @@ def graph_sender(self, graph_queue: Queue[Optional[Graph]], task_id: TaskId, tem for account in graph.successors(cloud): if isinstance(account, BaseAccount): graph_info += f" {account.kdname}" - log.info(f"Received collector graph for{graph_info}") + # Make sure the graph is not cyclic if (cycle := graph.find_cycle()) is not None: desc = ", ".join, [f"{key.edge_type}: {key.src.kdname}-->{key.dst.kdname}" for key in cycle] log.error(f"Graph of {graph_info} is not acyclic - ignoring. Cycle {desc}") continue + # send it to core try: - self._resotocore.send_to_resotocore(graph, task_id, tempdir) + self.resotocore.send_to_resotocore(graph, task_id, tempdir) except Exception as e: log.error(f"Error sending graph of {graph_info} to resotocore: {e}") + + # delete the graph del graph + # mark work as done + import_graph.set_result(True) + + def __collect_all(self, collectors: List[Type[BaseCollectorPlugin]], merge_kind: GraphMergeKind) -> None: + assert self.graph_queue, "No GraphQueue - CollectRun started?" + assert self.pool_executor, "No Executor - CollectRun started?" + for collector in collectors: + self.futures_to_wait_for.append( + self.pool_executor.submit( + collect_plugin_graph, + collector, + self.core_feedback, + self.graph_queue, + merge_kind, + task_data=self.task_data, + args=ArgumentParser.args, + running_config=self.config.running_config, + ) + ) + + +class Collector: + def __init__(self, config: Config, resotocore: Resotocore, core_messages: Queue[Json]) -> None: + self.resotocore = resotocore + self.config = config + self.core_messages = core_messages + self.processing: Set[str] = set() + self.processing_lock = Lock() + def collect_and_send( self, collectors: List[Type[BaseCollectorPlugin]], @@ -76,94 +187,15 @@ def collect_and_send( ) -> None: task_id = task_data["task"] step_name = task_data["step"] - core_feedback = CoreFeedback(task_id, step_name, "collect", self.core_messages) - - def collect( - collectors: List[Type[BaseCollectorPlugin]], - graph_queue: Queue[Optional[Graph]], - task_data: Json, - ) -> bool: - all_success = True - graph_merge_kind = self._config.resotoworker.graph_merge_kind - - max_workers = ( - len(collectors) - if len(collectors) < self._config.resotoworker.pool_size - else self._config.resotoworker.pool_size - ) - if max_workers == 0: - log.error("No workers configured or no collector plugins loaded - skipping collect") - return False - pool_args = {"max_workers": max_workers} - pool_executor: Type[futures.Executor] - collect_args: Dict[str, Any] - if self._config.resotoworker.fork_process: - pool_args["mp_context"] = multiprocessing.get_context("spawn") - pool_args["initializer"] = resotolib.proc.initializer - pool_executor = futures.ProcessPoolExecutor - collect_args = { - "args": ArgumentParser.args, - "running_config": self._config.running_config, - } - else: - pool_executor = futures.ThreadPoolExecutor - collect_args = {} - - with pool_executor(**pool_args) as executor: - wait_for = [ - executor.submit( - collect_plugin_graph, - collector, - core_feedback, - graph_queue, - graph_merge_kind, - task_data=task_data, - **collect_args, - ) - for collector in collectors - ] - for future in futures.as_completed(wait_for): - collector_success = future.result() - if not collector_success: - all_success = False - return all_success - processing_id = f"{task_id}:{step_name}" try: with self.processing_lock: if processing_id in self.processing: raise DuplicateMessageError(f"Already processing {processing_id} - ignoring message") self.processing.add(processing_id) - - ctx = multiprocessing.get_context("spawn") - mp_manager = SyncManager(ctx=ctx) - mp_manager.start(initializer=resotolib.proc.increase_limits) - graph_queue: Queue[Optional[Graph]] = mp_manager.Queue() - graph_sender_threads = [] - graph_sender_pool_size = self._config.resotoworker.graph_sender_pool_size - tempdir = mkdtemp(prefix=f"resoto-{task_id}", dir=self._config.resotoworker.tempdir) - try: - for i in range(graph_sender_pool_size): - graph_sender_t = threading.Thread( - target=self.graph_sender, - args=(graph_queue, task_id, tempdir), - name=f"graph_sender_{i}", - ) - graph_sender_t.daemon = True - graph_sender_t.start() - graph_sender_threads.append(graph_sender_t) - - self._resotocore.create_graph_and_update_model(tempdir=tempdir) - collect(collectors, graph_queue, task_data) - finally: - log.debug("Telling graph sender threads to end") - for _ in range(graph_sender_pool_size): - graph_queue.put(None) - for t in graph_sender_threads: - t.join(300) - mp_manager.shutdown() - if not self._config.resotoworker.debug_dump_json: - rmtree(tempdir, ignore_errors=True) + with CollectRun(self.config, self.resotocore, self.core_messages, collectors, task_data) as run: + self.resotocore.create_graph_and_update_model(tempdir=run.tempdir) + run.collect() finally: with self.processing_lock: if processing_id in self.processing: