Skip to content

Commit

Permalink
Merge branch 'main' into nm/access-edge-creator-process
Browse files Browse the repository at this point in the history
  • Loading branch information
meln1k authored Sep 23, 2024
2 parents 13ac740 + 1735b14 commit 9177edd
Show file tree
Hide file tree
Showing 47 changed files with 2,803 additions and 261 deletions.
2 changes: 1 addition & 1 deletion fixcore/fixcore/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -4240,7 +4240,7 @@ async def perform_request(e: JsonElement) -> int:
headers=template.headers,
params=template.params,
data=data,
compress=template.compress,
compress="deflate" if template.compress else None,
timeout=template.timeout,
ssl=False if template.no_ssl_verify else self.dependencies.cert_handler.client_context,
auth=BasicAuth(login=authuser, password=(authpass if authpass else "")) if authuser else None,
Expand Down
2 changes: 1 addition & 1 deletion fixcore/fixcore/db/db_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def get_graph_db(self, name: GraphName, no_check: bool = False) -> GraphDB:
else:
if not no_check and not self.database.has_graph(name):
raise NoSuchGraph(name)
graph_db = ArangoGraphDB(self.db, name, self.adjust_node, self.config.graph)
graph_db = ArangoGraphDB(self.db, name, self.adjust_node, self.config.graph, self.lock_db)
event_db = EventGraphDB(graph_db, self.event_sender)
self.graph_dbs[name] = event_db
return event_db
Expand Down
152 changes: 49 additions & 103 deletions fixcore/fixcore/db/graphdb.py

Large diffs are not rendered by default.

64 changes: 60 additions & 4 deletions fixcore/fixcore/db/model.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from __future__ import annotations

from abc import ABC
from typing import Dict, Any, Optional, Tuple, List
from collections import defaultdict
from typing import Dict, Any, Optional, Tuple, List, DefaultDict

from attr import define
from attrs import field

from fixcore.model.graph_access import Section
from fixcore.model.graph_access import Section, EdgeTypes
from fixcore.model.model import Model, ResolvedPropertyPath, ComplexKind
from fixcore.model.resolve_in_graph import GraphResolver
from fixcore.query.model import Query
from fixcore.types import Json, EdgeType
from fixcore.util import first

ancestor_merges = {
Expand Down Expand Up @@ -49,7 +51,7 @@ def owners(self, path: str) -> List[ComplexKind]:


@define(repr=True, eq=True)
class GraphUpdate(ABC):
class GraphUpdate:
nodes_created: int = 0
nodes_updated: int = 0
nodes_deleted: int = 0
Expand All @@ -76,3 +78,57 @@ def __add__(self, other: GraphUpdate) -> GraphUpdate:
self.edges_updated + other.edges_updated,
self.edges_deleted + other.edges_deleted,
)


@define
class GraphChange:
node_inserts: List[Json] = field(factory=list)
node_updates: List[Json] = field(factory=list)
node_deletes: List[Json] = field(factory=list)
edge_inserts: DefaultDict[EdgeType, List[Json]] = field(factory=lambda: defaultdict(list))
edge_updates: DefaultDict[EdgeType, List[Json]] = field(factory=lambda: defaultdict(list))
edge_deletes: DefaultDict[EdgeType, List[Json]] = field(factory=lambda: defaultdict(list))

def to_update(self) -> GraphUpdate:
return GraphUpdate(
len(self.node_inserts),
len(self.node_updates),
len(self.node_deletes),
sum(len(edges) for edges in self.edge_inserts.values()),
sum(len(edges) for edges in self.edge_updates.values()),
sum(len(edges) for edges in self.edge_deletes.values()),
)

def change_count(self) -> int:
return self.to_update().all_changes()

def __add__(self, other: GraphChange) -> GraphChange:
update = GraphChange()
# insert
update.node_inserts.extend(self.node_inserts)
update.node_inserts.extend(other.node_inserts)
# update
update.node_updates.extend(self.node_updates)
update.node_updates.extend(other.node_updates)
# delete
update.node_deletes.extend(self.node_deletes)
update.node_deletes.extend(other.node_deletes)
for edge_type in EdgeTypes.all:
# insert
update.edge_inserts[edge_type].extend(self.edge_inserts[edge_type])
update.edge_inserts[edge_type].extend(other.edge_inserts[edge_type])
# update
update.edge_updates[edge_type].extend(self.edge_updates[edge_type])
update.edge_updates[edge_type].extend(other.edge_updates[edge_type])
# delete
update.edge_deletes[edge_type].extend(self.edge_deletes[edge_type])
update.edge_deletes[edge_type].extend(other.edge_deletes[edge_type])
return update

def clear(self) -> None:
self.node_inserts.clear()
self.node_updates.clear()
self.node_deletes.clear()
self.edge_inserts.clear()
self.edge_updates.clear()
self.edge_deletes.clear()
3 changes: 2 additions & 1 deletion fixcore/fixcore/model/graph_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,9 @@ def check_complete(self) -> None:
for edge_type in EdgeTypes.all:
key = GraphAccess.edge_key(rid, succ, edge_type)
if self.graph.has_edge(rid, succ, key):
data = self.graph.get_edge_data(rid, succ, key)
self.graph.remove_edge(rid, succ, key)
self.add_edge("root", succ, edge_type)
self.add_edge("root", succ, edge_type, data.get("reported"))
self.graph.remove_node(rid)


Expand Down
1 change: 0 additions & 1 deletion fixcore/fixcore/web/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@
from aiohttp.web import Request, StreamResponse

RequestHandler = Callable[[Request], Awaitable[StreamResponse]]
Middleware = Callable[[Request, RequestHandler], Awaitable[StreamResponse]]
32 changes: 21 additions & 11 deletions fixcore/fixcore/web/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
)
from urllib.parse import urlencode, urlparse, parse_qs, urlunparse

import aiofiles
import aiohttp_jinja2
import jinja2
import prometheus_client
Expand All @@ -44,6 +45,7 @@
MultipartReader,
ClientSession,
TCPConnector,
BodyPartReader,
)
from aiohttp.abc import AbstractStreamWriter
from aiohttp.hdrs import METH_ANY
Expand Down Expand Up @@ -1380,6 +1382,23 @@ def line_to_js(line: ParsedCommandLine) -> Json:
@timed("api", "execute")
async def execute(self, request: Request, deps: TenantDependencies) -> StreamResponse:
temp_dir: Optional[str] = None

async def write_files(mpr: MultipartReader, tmp_dir: str) -> Dict[str, str]:
files: Dict[str, str] = {}
async for part in mpr:
if isinstance(part, MultipartReader):
files.update(await write_files(part, tmp_dir))
elif isinstance(part, BodyPartReader):
name = part.filename
if not name:
raise AttributeError("Multipart request: content disposition name is required!")
path = os.path.join(tmp_dir, rnd_str()) # use random local path to avoid clashes
files[name] = path
async with aiofiles.open(path, "wb") as writer:
while not part.at_eof():
await writer.write(await part.read_chunk())
return files

try:
ctx = self.cli_context_from_request(request)
if request.content_type.startswith("text"):
Expand All @@ -1388,18 +1407,9 @@ async def execute(self, request: Request, deps: TenantDependencies) -> StreamRes
command = request.headers["Fix-Shell-Command"].strip()
temp = tempfile.mkdtemp()
temp_dir = temp
files = {}
# for now, we assume that all multi-parts are file uploads
async for part in MultipartReader(request.headers, request.content):
name = part.name
if not name:
raise AttributeError("Multipart request: content disposition name is required!")
path = os.path.join(temp, rnd_str()) # use random local path to avoid clashes
files[name] = path
with open(path, "wb") as writer:
while not part.at_eof():
writer.write(await part.read_chunk())
ctx = evolve(ctx, uploaded_files=files)
uploaded = await write_files(MultipartReader(request.headers, request.content), temp)
ctx = evolve(ctx, uploaded_files=uploaded)
else:
raise AttributeError(f"Not able to handle: {request.content_type}")

Expand Down
3 changes: 2 additions & 1 deletion fixcore/fixcore/web/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import jwt
from aiohttp import web
from aiohttp.typedefs import Middleware
from aiohttp.web import Request, StreamResponse
from aiohttp.web import middleware
from attr import define
Expand All @@ -26,7 +27,7 @@
from fixcore.web.certificate_handler import CertificateHandler
from fixcore.web.permission import PermissionChecker
from fixlib import jwt as ck_jwt
from fixlib.asynchronous.web import RequestHandler, Middleware, TenantRequestHandler
from fixlib.asynchronous.web import RequestHandler, TenantRequestHandler
from fixlib.jwt import encode_jwt, create_jwk_dict
from fixlib.types import Json
from fixlib.utils import utc
Expand Down
9 changes: 4 additions & 5 deletions fixcore/fixcore/web/directives.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
from re import RegexFlag, fullmatch
from typing import Optional, Callable, Awaitable, Tuple
from typing import Optional, Tuple

from aiohttp.hdrs import METH_OPTIONS, METH_GET, METH_HEAD, METH_POST, METH_PUT, METH_DELETE, METH_PATCH
from aiohttp.typedefs import Middleware
from aiohttp.web import HTTPRedirection, HTTPNotFound, HTTPBadRequest, HTTPException, HTTPNoContent
from aiohttp.web_exceptions import HTTPServiceUnavailable, HTTPError
from aiohttp.web_middlewares import middleware
Expand Down Expand Up @@ -84,9 +85,7 @@ async def metrics_handler(request: Request, handler: RequestHandler) -> StreamRe
RequestInProgress.labels(request.path, request.method).dec()


def error_handler(
config: CoreConfig, event_sender: AnalyticsEventSender
) -> Callable[[Request, RequestHandler], Awaitable[StreamResponse]]:
def error_handler(config: CoreConfig, event_sender: AnalyticsEventSender) -> Middleware:
is_debug = (logging.root.level < logging.INFO) or config.runtime.debug

def exc_info(ex: Exception) -> Optional[Exception]:
Expand Down Expand Up @@ -129,7 +128,7 @@ def message_from_error(e: Exception) -> Tuple[str, str, str]:
return error_handler_middleware


def default_middleware(api_handler: "api.Api") -> Callable[[Request, RequestHandler], Awaitable[StreamResponse]]:
def default_middleware(api_handler: "api.Api") -> Middleware:
@middleware
async def default_handler(request: Request, handler: RequestHandler) -> StreamResponse:
if api_handler.in_shutdown:
Expand Down
4 changes: 2 additions & 2 deletions fixcore/tests/fixcore/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ def test_db(local_client: ArangoClient, system_db: StandardDatabase) -> Standard


@fixture
async def graph_db(async_db: AsyncArangoDB) -> ArangoGraphDB:
graph_db = ArangoGraphDB(async_db, GraphName("ns"), NoAdjust(), GraphConfig(use_view=False))
async def graph_db(async_db: AsyncArangoDB, lock_db: LockDB) -> ArangoGraphDB:
graph_db = ArangoGraphDB(async_db, GraphName("ns"), NoAdjust(), GraphConfig(use_view=False), lock_db)
await graph_db.create_update_schema()
await model_db(async_db, "ns_model").create_update_schema()
await async_db.truncate(graph_db.in_progress)
Expand Down
4 changes: 2 additions & 2 deletions fixcore/tests/fixcore/web/api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import pytest
from _pytest.fixtures import fixture
from aiohttp import ClientSession, MultipartReader
from aiohttp import ClientSession, MultipartReader, BodyPartReader
from networkx import MultiDiGraph
from datetime import timedelta
from fixclient import models as rc
Expand Down Expand Up @@ -394,7 +394,7 @@ async def test_cli(core_client: FixInventoryClient) -> None:
# execute multiple commands
response = await core_client.cli_execute_raw("echo foo; echo bar; echo bla")
reader: MultipartReader = MultipartReader.from_response(response.undrelying) # type: ignore
assert [await p.text() async for p in reader] == ['"foo"', '"bar"', '"bla"']
assert [await p.text() async for p in reader if isinstance(p, BodyPartReader)] == ['"foo"', '"bar"', '"bla"']

# list all cli commands
info = AccessJson(await core_client.cli_info())
Expand Down
1 change: 0 additions & 1 deletion fixlib/fixlib/asynchronous/web/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@

RequestHandler = Callable[[Request], Awaitable[StreamResponse]]
TenantRequestHandler = Callable[[Request, TenantDependencies], Awaitable[StreamResponse]]
Middleware = Callable[[Request, RequestHandler], Awaitable[StreamResponse]]
25 changes: 25 additions & 0 deletions fixlib/fixlib/baseresources.py
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,31 @@ class BaseManagedKubernetesClusterProvider(BaseResource):
endpoint: Optional[str] = field(default=None, metadata={"description": "The kubernetes API endpoint"})


@define(eq=False, slots=False)
class BaseAIResource(BaseResource):
kind: ClassVar[str] = "ai_resource"
kind_display: ClassVar[str] = "AI resource"
kind_description: ClassVar[str] = "An AI Resource."
metadata: ClassVar[Dict[str, Any]] = {"icon": "resource", "group": "ai"}
_categories: ClassVar[List[Category]] = [Category.ai, Category.compute]


@define(eq=False, slots=False)
class BaseAIJob(BaseAIResource):
kind: ClassVar[str] = "ai_job"
kind_display: ClassVar[str] = "AI Job"
kind_description: ClassVar[str] = "An AI Job resource."
metadata: ClassVar[Dict[str, Any]] = {"icon": "job", "group": "ai"}


@define(eq=False, slots=False)
class BaseAIModel(BaseAIResource):
kind: ClassVar[str] = "ai_model"
kind_display: ClassVar[str] = "AI Model"
kind_description: ClassVar[str] = "An AI Model resource."
metadata: ClassVar[Dict[str, Any]] = {"icon": "resource", "group": "ai"}


@define(eq=False, slots=False)
class UnknownCloud(BaseCloud):
kind: ClassVar[str] = "unknown_cloud"
Expand Down
30 changes: 30 additions & 0 deletions plugins/aws/fix_plugin_aws/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
acm,
waf,
backup,
bedrock,
)
from fix_plugin_aws.resource.base import AwsAccount, AwsApiSpec, AwsRegion, AwsResource, GraphBuilder
from fixlib.baseresources import Cloud, EdgeType, BaseOrganizationalRoot, BaseOrganizationalUnit
Expand Down Expand Up @@ -108,6 +109,7 @@
+ redshift.resources
+ backup.resources
+ amazonq.resources
+ bedrock.resources
)
all_resources: List[Type[AwsResource]] = global_resources + regional_resources

Expand Down Expand Up @@ -264,6 +266,8 @@ def get_last_run() -> Optional[datetime]:

# wait for all futures to finish
shared_queue.wait_for_submitted_work()
# remove unused nodes
self.remove_unused()
self.core_feedback.progress_done(self.account.dname, 1, 1, context=[self.cloud.id])
self.error_accumulator.report_all(global_builder.core_feedback)

Expand Down Expand Up @@ -335,6 +339,32 @@ def collect_and_set_metrics(

builder.submit_work("cloudwatch", collect_and_set_metrics, start, region, queries)

def remove_unused(self) -> None:
remove_nodes = []

def rm_nodes(cls, ignore_kinds: Optional[Type[Any]] = None, check_pred: bool = True) -> None: # type: ignore
for node in self.graph.nodes:
if not isinstance(node, cls):
continue
if check_pred:
nodes = list(self.graph.predecessors(node))
else:
nodes = list(self.graph.successors(node))
if ignore_kinds is not None:
nodes = [n for n in nodes if not isinstance(n, ignore_kinds)]
if not nodes:
remove_nodes.append(node)
log.debug(f"Removing {len(remove_nodes)} unreferenced nodes of type {cls}")
removed = set()
for node in remove_nodes:
if node in removed:
continue
removed.add(node)
self.graph.remove_node(node)
remove_nodes.clear()

rm_nodes(bedrock.AwsBedrockFoundationModel, check_pred=False)

# TODO: move into separate AwsAccountSettings
def update_account(self) -> None:
log.info(f"Collecting AWS IAM Account Summary in account {self.account.dname}")
Expand Down
Loading

0 comments on commit 9177edd

Please sign in to comment.