diff --git a/requirements-all.txt b/requirements-all.txt index d5c4ed0bfb..ad0706b88c 100644 --- a/requirements-all.txt +++ b/requirements-all.txt @@ -4,7 +4,7 @@ aiohttp[speedups]==3.9.1 aiohttp-jinja2==1.6 aiohttp-swagger3==0.8.0 aiosignal==1.3.1 -aiostream==0.4.5 +aiostream==0.5.2 annotated-types==0.6.0 apscheduler==3.10.4 asn1crypto==1.5.1 @@ -41,7 +41,6 @@ defusedxml==0.7.1 deprecated==1.2.14 dill==0.3.7 distlib==0.3.7 -docopt==0.6.2 fastjsonschema==2.19.0 filelock==3.13.1 flake8==6.1.0 @@ -63,7 +62,7 @@ importlib-metadata==7.0.0 inflect==7.0.0 iniconfig==2.0.0 isodate==0.6.1 -isort==5.13.0 +isort==5.12.0 jaraco-collections==5.0.0 jaraco-context==4.3.0 jaraco-functools==4.0.0 @@ -94,16 +93,12 @@ packaging==23.2 paramiko==3.3.1 parsy==2.1 pathspec==0.12.1 -pep517==0.13.1 pep8-naming==0.13.3 pint==0.23 pip==23.3.1 -pip-api==0.0.30 pip-tools==7.3.0 -pipreqs==0.4.13 plantuml==0.3.0 platformdirs==3.11.0 -plette[validation]==0.4.4 pluggy==1.3.0 portalocker==2.8.2 portend==3.2.0 @@ -125,7 +120,7 @@ pyflakes==3.1.0 pygithub==2.1.1 pygments==2.17.2 pyjwt[crypto]==2.8.0 -pylint==3.0.2 +pylint==3.0.3 pymysql==1.1.0 pynacl==1.5.0 pyopenssl==23.3.0 @@ -144,7 +139,6 @@ pyyaml==6.0.1 requests==2.31.0 requests-oauthlib==1.3.1 requests-toolbelt==1.0.0 -requirementslib==3.0.0 resotoclient==1.6.1 resotodata==0.2.2 resotodatalink[extra]==1.2.0 @@ -189,7 +183,6 @@ wcwidth==0.2.12 websocket-client==1.7.0 wheel==0.42.0 wrapt==1.16.0 -yarg==0.1.9 yarl==1.9.4 zc-lockfile==3.0.post1 zipp==3.17.0 diff --git a/requirements-extra.txt b/requirements-extra.txt index a1738fa947..d4c9f789df 100644 --- a/requirements-extra.txt +++ b/requirements-extra.txt @@ -4,7 +4,7 @@ aiohttp[speedups]==3.9.1 aiohttp-jinja2==1.6 aiohttp-swagger3==0.8.0 aiosignal==1.3.1 -aiostream==0.4.5 +aiostream==0.5.2 annotated-types==0.6.0 apscheduler==3.10.4 asn1crypto==1.5.1 diff --git a/requirements-test.txt b/requirements-test.txt index 2caea3c7d2..1ed8df9ec5 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -4,7 +4,7 @@ aiohttp[speedups]==3.9.1 aiohttp-jinja2==1.6 aiohttp-swagger3==0.8.0 aiosignal==1.3.1 -aiostream==0.4.5 +aiostream==0.5.2 annotated-types==0.6.0 apscheduler==3.10.4 astroid==3.0.1 @@ -40,7 +40,6 @@ defusedxml==0.7.1 deprecated==1.2.14 dill==0.3.7 distlib==0.3.7 -docopt==0.6.2 fastjsonschema==2.19.0 filelock==3.13.1 flake8==6.1.0 @@ -58,7 +57,7 @@ importlib-metadata==7.0.0 inflect==7.0.0 iniconfig==2.0.0 isodate==0.6.1 -isort==5.13.0 +isort==5.12.0 jaraco-collections==5.0.0 jaraco-context==4.3.0 jaraco-functools==4.0.0 @@ -88,16 +87,12 @@ packaging==23.2 paramiko==3.3.1 parsy==2.1 pathspec==0.12.1 -pep517==0.13.1 pep8-naming==0.13.3 pint==0.23 pip==23.3.1 -pip-api==0.0.30 pip-tools==7.3.0 -pipreqs==0.4.13 plantuml==0.3.0 platformdirs==3.11.0 -plette[validation]==0.4.4 pluggy==1.3.0 portalocker==2.8.2 portend==3.2.0 @@ -117,7 +112,7 @@ pyflakes==3.1.0 pygithub==2.1.1 pygments==2.17.2 pyjwt[crypto]==2.8.0 -pylint==3.0.2 +pylint==3.0.3 pynacl==1.5.0 pyparsing==3.1.1 pyproject-api==1.6.1 @@ -134,7 +129,6 @@ pyyaml==6.0.1 requests==2.31.0 requests-oauthlib==1.3.1 requests-toolbelt==1.0.0 -requirementslib==3.0.0 resotoclient==1.6.1 resotodata==0.2.2 resotodatalink==1.2.0 @@ -176,7 +170,6 @@ wcwidth==0.2.12 websocket-client==1.7.0 wheel==0.42.0 wrapt==1.16.0 -yarg==0.1.9 yarl==1.9.4 zc-lockfile==3.0.post1 zipp==3.17.0 diff --git a/requirements.txt b/requirements.txt index 2d2fec400e..d9bf2bc8e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ aiohttp[speedups]==3.9.1 aiohttp-jinja2==1.6 aiohttp-swagger3==0.8.0 aiosignal==1.3.1 -aiostream==0.4.5 +aiostream==0.5.2 annotated-types==0.6.0 apscheduler==3.10.4 attrs==23.1.0 diff --git a/resotocore/.pylintrc b/resotocore/.pylintrc index 9cb708e4ff..e2dc230225 100644 --- a/resotocore/.pylintrc +++ b/resotocore/.pylintrc @@ -85,7 +85,6 @@ disable= cyclic-import, unnecessary-lambda-assignment - [REPORTS] # Set the output format. Available formats are text, parseable, colorized, msvs diff --git a/resotocore/pyproject.toml b/resotocore/pyproject.toml index ac4af62665..4df52a5d0c 100644 --- a/resotocore/pyproject.toml +++ b/resotocore/pyproject.toml @@ -35,7 +35,7 @@ dependencies = [ "aiohttp-jinja2", "aiohttp-swagger3", "aiohttp[speedups]", - "aiostream < 0.5", # TODO: aiostream 0.5 introduces type parameters - need to be supported by mypy + "aiostream", "cryptography", "deepdiff", "frozendict", diff --git a/resotocore/resotocore/cli/__init__.py b/resotocore/resotocore/cli/__init__.py index ed6f24d61e..d334a63f4e 100644 --- a/resotocore/resotocore/cli/__init__.py +++ b/resotocore/resotocore/cli/__init__.py @@ -2,14 +2,25 @@ from argparse import ArgumentParser from datetime import datetime from functools import lru_cache -from typing import TypeVar, Union, Any, Callable, AsyncIterator, NoReturn, Optional, Awaitable, Tuple, List -from typing_extensions import TypeAlias +from typing import ( + TypeVar, + Union, + Any, + Callable, + NoReturn, + Optional, + Awaitable, + Tuple, + List, + AsyncIterable, +) +from aiostream import stream from aiostream.core import Stream from parsy import Parser, regex, string from resotocore.model.graph_access import Section -from resotocore.types import JsonElement +from resotocore.types import JsonElement, Json from resotocore.util import utc, parse_utc, AnyT from resotolib.durations import parse_duration, DurationRe from resotolib.parse_util import ( @@ -31,12 +42,12 @@ ) T = TypeVar("T") -# Allow the function to return either a coroutine or the result directly -Result = Union[T, Awaitable[T]] -JsStream: TypeAlias = Stream -JsGen = Union[JsStream, AsyncIterator[JsonElement]] +JsStream = Stream[JsonElement] +JsGen = AsyncIterable[JsonElement] # A sink function takes a stream and creates a result -Sink = Callable[[JsGen], Awaitable[T]] +Sink = Callable[[JsStream], Awaitable[T]] + +list_sink: Callable[[JsGen], Awaitable[Any]] = stream.list # type: ignore @make_parser @@ -117,6 +128,10 @@ def strip_quotes(maybe_quoted: str) -> str: # check if a is a json node element +def get_node(a: Any) -> Optional[Json]: + return a if isinstance(a, dict) and "id" in a and Section.reported in a else None + + def is_node(a: Any) -> bool: return "id" in a and Section.reported in a if isinstance(a, dict) else False diff --git a/resotocore/resotocore/cli/cli.py b/resotocore/resotocore/cli/cli.py index 6654fe1ac2..ef9befa9cd 100644 --- a/resotocore/resotocore/cli/cli.py +++ b/resotocore/resotocore/cli/cli.py @@ -11,14 +11,13 @@ from typing import Optional, Any, TYPE_CHECKING from aiostream import stream -from aiostream.core import Stream from attrs import evolve from parsy import Parser from rich.padding import Padding from resotocore import version from resotocore.analytics import CoreEvent -from resotocore.cli import cmd_with_args_parser, key_values_parser, T, Sink, args_values_parser +from resotocore.cli import cmd_with_args_parser, key_values_parser, T, Sink, args_values_parser, JsGen from resotocore.cli.command import ( SearchPart, PredecessorsPart, @@ -188,7 +187,7 @@ def overview() -> str: logo = ctx.render_console(Padding(WelcomeCommand.ck, pad=(0, 0, 0, middle))) if ctx.supports_color() else "" return headline + logo + ctx.render_console(result) - def help_command() -> Stream: + def help_command() -> JsGen: if not arg: result = overview() elif arg == "placeholders": diff --git a/resotocore/resotocore/cli/command.py b/resotocore/resotocore/cli/command.py index 2e6285cfec..388537297f 100644 --- a/resotocore/resotocore/cli/command.py +++ b/resotocore/resotocore/cli/command.py @@ -70,6 +70,7 @@ args_parts_unquoted_parser, is_edge, is_node, + get_node, js_value_at, key_values_parser, parse_time_or_delta, @@ -850,7 +851,7 @@ def args_info(self) -> ArgsInfo: @staticmethod async def aggregate_in( - content: JsStream, + content: JsGen, group_props: Optional[List[str]] = None, fn_props: Optional[List[AggregateFunction]] = None, ) -> Dict[tuple[Any, ...], _AggregateIntermediateResult]: @@ -990,7 +991,7 @@ def info(self) -> str: def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwargs: Any) -> CLIAction: size = self.parse_size(arg) - return CLIFlow(lambda in_stream: stream.take(in_stream, size)) + return CLIFlow(lambda in_stream: in_stream | pipe.take(size)) def args_info(self) -> ArgsInfo: return [ArgInfo(expects_value=True, help_text="number of elements to take")] @@ -1044,7 +1045,7 @@ def args_info(self) -> ArgsInfo: def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwargs: Any) -> CLIAction: size = HeadCommand.parse_size(arg) - return CLIFlow(lambda in_stream: stream.takelast(in_stream, size)) + return CLIFlow(lambda in_stream: in_stream | pipe.takelast(size)) class CountCommand(SearchCLIPart): @@ -1326,7 +1327,7 @@ def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwa name_path = ["group", "name"] count_path = ["count"] - async def to_count(in_stream: AsyncIterator[JsonElement]) -> AsyncIterator[JsonElement]: + async def to_count(in_stream: JsStream) -> AsyncIterator[JsonElement]: null_value = 0 total = 0 in_streamer = in_stream if isinstance(in_stream, Stream) else stream.iterate(in_stream) @@ -1581,7 +1582,7 @@ def args_info(self) -> ArgsInfo: def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwargs: Any) -> CLIFlow: size = int(arg) if arg else 100 - return CLIFlow(lambda in_stream: stream.chunks(in_stream, size), required_permissions={Permission.read}) + return CLIFlow(lambda in_stream: in_stream | pipe.chunks(size), required_permissions={Permission.read}) class FlattenCommand(CLICommand): @@ -1631,10 +1632,10 @@ def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwa def iterable(it: Any) -> bool: return False if isinstance(it, str) else isinstance(it, Iterable) - def iterate(it: Any) -> JsStream: + def iterate(it: Any) -> JsGen: return stream.iterate(it) if is_async_iterable(it) or iterable(it) else stream.just(it) - return CLIFlow(lambda in_stream: stream.flatmap(in_stream, iterate), required_permissions={Permission.read}) + return CLIFlow(lambda i: i | pipe.flatmap(iterate), required_permissions={Permission.read}) # type: ignore class UniqCommand(CLICommand): @@ -1786,12 +1787,12 @@ def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwa in_arg = args[1] if len(args) == 2 and args[0] == "--no-rewrite" else self.rewrite_props(strip_quotes(arg), ctx) compiled = jq.compile(strip_quotes(in_arg)) - def process(in_json: Json) -> Json: + def process(in_json: JsonElement) -> JsonElement: out = compiled.input(in_json).all() result = out[0] if len(out) == 1 else out return cast(Json, result) - return CLIFlow(lambda in_stream: stream.map(in_stream, process), required_permissions={Permission.read}) + return CLIFlow(lambda i: i | pipe.map(process), required_permissions={Permission.read}) # type: ignore class KindsCommand(CLICommand, PreserveOutputFormat): @@ -1930,7 +1931,7 @@ def property_defined_in(model: Model, path_: str) -> List[str]: if any(p for p in kind.resolved_properties() if p.path.same_as(path)) ] - async def source() -> Tuple[int, JsStream]: + async def source() -> Tuple[int, JsGen]: model = await self.dependencies.model_handler.load_model(graph_name) def show(k: ComplexKind) -> bool: @@ -1941,7 +1942,9 @@ def show(k: ComplexKind) -> bool: if args.name: kind = args.name - result = kind_to_js(model, model[kind]) if kind in model else f"No kind with this name: {kind}" + result: JsonElement = ( + kind_to_js(model, model[kind]) if kind in model else f"No kind with this name: {kind}" + ) return 1, stream.just(result) elif args.property_path: no_section = Section.without_section(args.property_path) @@ -1966,8 +1969,7 @@ def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwa buffer_size = 1000 func = partial(self.set_desired, arg, ctx.graph_name, self.patch(arg, ctx)) return CLIFlow( - lambda in_stream: stream.flatmap(stream.chunks(in_stream, buffer_size), func), - required_permissions={Permission.write}, + lambda i: i | pipe.chunks(buffer_size) | pipe.flatmap(func), required_permissions={Permission.write} ) async def set_desired( @@ -2105,8 +2107,7 @@ def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwa buffer_size = 1000 func = partial(self.set_metadata, ctx.graph_name, self.patch(arg, ctx)) return CLIFlow( - lambda in_stream: stream.flatmap(stream.chunks(in_stream, buffer_size), func), - required_permissions={Permission.write}, + lambda i: i | pipe.chunks(buffer_size) | pipe.flatmap(func), required_permissions={Permission.write} ) async def set_metadata(self, graph_name: GraphName, patch: Json, items: List[Json]) -> AsyncIterator[JsonElement]: @@ -2326,7 +2327,7 @@ async def format_stream(in_stream: JsStream) -> JsGen: else: raise ValueError(f"Unknown format: {use}") elif formatting_string: - return stream.map(in_stream, ctx.formatter(arg)) if arg else in_stream + return in_stream | pipe.map(ctx.formatter(arg)) if arg else in_stream # type: ignore else: return in_stream @@ -2668,11 +2669,11 @@ def unique_name(path: List[str], current: str) -> str: props_to_show = create_unique_names(props_to_show) def fmt_json(elem: Json) -> JsonElement: - if is_node(elem): + if node := get_node(elem): result = "" first = True for prop_path, name in props_to_show: - value = js_value_at(elem, prop_path) + value = js_value_at(node, prop_path) if value is not None: delim = "" if first else ", " result += f"{delim}{to_str(name, value)}" @@ -2700,10 +2701,10 @@ def to_csv_string(lst: List[Any]) -> str: async with in_stream.stream() as s: async for elem in s: - if is_node(elem): + if node := get_node(elem): result = [] for prop_path, _ in props_to_show: - value = js_value_at(elem, prop_path) + value = js_value_at(node, prop_path) result.append(value) yield to_csv_string(result) @@ -2733,10 +2734,10 @@ def kind_of(path: List[str]) -> str: # data columns async with in_stream.stream() as s: async for elem in s: - if is_node(elem): + if node := get_node(elem): yield { - "id": elem["id"], - "row": {name: js_value_at(elem, prop_path) for prop_path, name in props_to_show}, + "id": node["id"], + "row": {name: js_value_at(node, prop_path) for prop_path, name in props_to_show}, } def markdown_stream(in_stream: JsStream) -> JsGen: @@ -2797,15 +2798,15 @@ def to_str(elem: Any) -> str: markdown_chunks = ( in_stream | pipe.filter(is_node) - | pipe.map(extract_values) + | pipe.map(extract_values) # type: ignore | pipe.chunks(chunk_size) | pipe.enumerate() - | pipe.flatmap(generate_markdown) + | pipe.flatmap(generate_markdown) # type: ignore ) return markdown_chunks - def fmt(in_stream: JsGen) -> JsGen: + def fmt(in_stream: JsStream) -> JsGen: if parsed.csv: return csv_stream(in_stream) elif parsed.markdown: @@ -2815,9 +2816,12 @@ def fmt(in_stream: JsGen) -> JsGen: async def load_model() -> Model: return await self.dependencies.model_handler.load_model(ctx.graph_name) - return stream.flatmap(stream.call(load_model), partial(json_table_stream, in_stream)) + return stream.call(load_model) | pipe.flatmap(partial(json_table_stream, in_stream)) # type: ignore else: - return stream.map(in_stream, lambda elem: fmt_json(elem) if isinstance(elem, dict) else str(elem)) + return stream.map( + in_stream, + lambda elem: fmt_json(elem) if isinstance(elem, dict) else str(elem), # type: ignore + ) return CLIFlow(fmt, produces=MediaType.String, required_permissions={Permission.read}) @@ -3140,7 +3144,7 @@ async def send_to_queue(task_name: str, task_args: Dict[str, str], data: Json) - await self.dependencies.forked_tasks.put((result_task, f"WorkerTask {task_name}:{task.id}")) return f"Spawned WorkerTask {task_name}:{task.id}" - return stream.starmap(in_stream, send_to_queue, ordered=False, task_limit=self.task_limit()) + return in_stream | pipe.starmap(send_to_queue, ordered=False, task_limit=self.task_limit()) # type: ignore def load_by_id_merged( self, @@ -3176,7 +3180,7 @@ async def load_element(items: List[JsonElement]) -> AsyncIterator[JsonElement]: async for a in crs: yield a - return stream.flatmap(stream.chunks(in_stream, 1000), load_element) + return stream.chunks(in_stream, 1000) | pipe.flatmap(load_element) # type: ignore async def no_update(self, _: WorkerTask, future_result: Future[Json]) -> Json: return await future_result @@ -3186,10 +3190,10 @@ async def to_result(task: WorkerTask, future_result: Future[Json]) -> Json: nid = js_value_at(task.data, ["node", "id"]) try: result = await future_result - if is_node(result): + if node := get_node(result): db = self.dependencies.db_access.get_graph_db(GraphName(env["graph"])) try: - updated: Json = await db.update_node(model, result["id"], result, True, None) + updated: Json = await db.update_node(model, node["id"], node, True, None) return updated except ClientError as ex: # if the change could not be reflected in database, show success @@ -3197,7 +3201,7 @@ async def to_result(task: WorkerTask, future_result: Future[Json]) -> Json: f"Update not reflected in db. Wait until next collector run. Reason: {str(ex)}", exc_info=ex, ) - return result + return node else: log.warning( f"Result from worker is not a node. " @@ -3317,14 +3321,13 @@ def setup_stream(in_stream: JsStream) -> JsStream: def with_dependencies(model: Model) -> JsStream: load = self.load_by_id_merged(model, in_stream, variables, allowed_on_kind, **ctx.env) handler = self.update_node_in_graphdb(model, **ctx.env) if expect_node_result else self.no_update - return self.send_to_queue_stream(stream.map(load, fn), handler, True) + return self.send_to_queue_stream(load | pipe.map(fn), handler, True) # type: ignore # dependencies are not resolved directly (no async function is allowed here) async def load_model() -> Model: return await self.dependencies.model_handler.load_model(ctx.graph_name) - dependencies = stream.call(load_model) - return stream.flatmap(dependencies, with_dependencies) + return stream.call(load_model) | pipe.flatmap(with_dependencies) # type: ignore def setup_source() -> JsStream: arg = {"args": args_parts_unquoted_parser.parse(formatter({}))} @@ -3445,14 +3448,13 @@ def setup_stream(in_stream: JsStream) -> JsStream: def with_dependencies(model: Model) -> JsStream: load = self.load_by_id_merged(model, in_stream, variables, **ctx.env) result_handler = self.update_node_in_graphdb(model, **ctx.env) - return self.send_to_queue_stream(stream.map(load, fn), result_handler, not ns.nowait) + return self.send_to_queue_stream(load | pipe.map(fn), result_handler, not ns.nowait) # type: ignore async def load_model() -> Model: return await self.dependencies.model_handler.load_model(ctx.graph_name) # dependencies are not resolved directly (no async function is allowed here) - dependencies = stream.call(load_model) - return stream.flatmap(dependencies, with_dependencies) + return stream.call(load_model) | pipe.flatmap(with_dependencies) # type: ignore return CLIFlow(setup_stream, required_permissions={Permission.write}) @@ -3897,7 +3899,7 @@ async def get_template(name: str) -> AsyncIterator[JsonElement]: maybe_template = await self.dependencies.template_expander.get_template(name) yield maybe_template.template if maybe_template else f"No template with this name: {name}" - async def list_templates() -> Tuple[int, AsyncIterator[Json]]: + async def list_templates() -> Tuple[int, Stream[str]]: templates = await self.dependencies.template_expander.list_templates() return len(templates), stream.iterate(template_str(t) for t in templates) @@ -4375,7 +4377,7 @@ def info(rt: RunningTask) -> JsonElement: return len(tasks), stream.iterate(info(t) for t in tasks if isinstance(t.descriptor, Workflow)) - async def show_log(wf_id: str) -> Tuple[int, AsyncIterator[JsonElement]]: + async def show_log(wf_id: str) -> Tuple[int, JsStream]: rtd = await self.dependencies.db_access.running_task_db.get(wf_id) if rtd: messages = [msg.info() for msg in rtd.info_messages()] @@ -4417,7 +4419,7 @@ async def history_of(history_args: List[str]) -> Tuple[int, JsStream]: ) cursor: AsyncCursor = context.cursor try: - return cursor.count() or 0, stream.map(cursor, running_task_data) + return cursor.count() or 0, stream.map(cursor, running_task_data) # type: ignore finally: cursor.close() @@ -4742,7 +4744,7 @@ async def welcome() -> str: res = ctx.render_console(grid) return res - return CLISource.single(lambda: stream.just(welcome()), required_permissions={Permission.read}) + return CLISource.single(lambda: stream.just(welcome()), required_permissions={Permission.read}) # type: ignore class TipOfTheDayCommand(CLICommand): @@ -4779,7 +4781,7 @@ async def totd() -> str: res = ctx.render_console(info) return res - return CLISource.single(lambda: stream.just(totd()), required_permissions={Permission.read}) + return CLISource.single(lambda: stream.just(totd()), required_permissions={Permission.read}) # type: ignore class CertificateCommand(CLICommand): @@ -5244,7 +5246,7 @@ async def apps_list() -> AsyncIterator[JsonElement]: yield app async def app_run( - in_stream: JsGen, app_name: InfraAppName, dry_run: bool, config: Optional[str], argv: List[str] + in_stream: JsStream, app_name: InfraAppName, dry_run: bool, config: Optional[str], argv: List[str] ) -> AsyncIterator[JsonElement]: runtime = self.dependencies.infra_apps_runtime manifest = await self.dependencies.infra_apps_package_manager.get_manifest(app_name) @@ -5258,22 +5260,15 @@ async def app_run( else: raise ValueError(f"Config {config} not found.") - async def stream_to_iterator(in_stream: JsStream) -> AsyncIterator[JsonElement]: + async def stream_to_iterator() -> AsyncIterator[JsonElement]: async with in_stream.stream() as streamer: async for item in streamer: yield item - stdin: AsyncIterator[JsonElement] = ( - stream_to_iterator(in_stream) if isinstance(in_stream, Stream) else in_stream - ) - + stdin = stream_to_iterator() if dry_run: return runtime.generate_template( - graph=ctx.graph_name, - manifest=manifest, - config=app_config, - stdin=stdin, - argv=argv, + graph=ctx.graph_name, manifest=manifest, config=app_config, stdin=stdin, argv=argv ) else: return runtime.execute( diff --git a/resotocore/resotocore/cli/model.py b/resotocore/resotocore/cli/model.py index 980b6aa304..7cb1e88f5b 100644 --- a/resotocore/resotocore/cli/model.py +++ b/resotocore/resotocore/cli/model.py @@ -316,7 +316,7 @@ def empty() -> CLISource: class CLIFlow(CLIAction): def __init__( self, - fn: Callable[[JsGen], Union[JsGen, Awaitable[JsGen]]], + fn: Callable[[JsStream], Union[JsGen, Awaitable[JsGen]]], produces: MediaType = MediaType.Json, requires: Optional[List[CLICommandRequirement]] = None, envelope: Optional[Dict[str, str]] = None, @@ -729,7 +729,7 @@ async def execute(self) -> Tuple[CLISourceContext, JsStream]: flow = await flow_action.flow(flow) return context, flow else: - return CLISourceContext(count=0), stream.empty() + return CLISourceContext(count=0), stream.empty() # type: ignore class CLI(ABC): diff --git a/resotocore/resotocore/db/graphdb.py b/resotocore/resotocore/db/graphdb.py index 4a8a2b08a7..7895465345 100644 --- a/resotocore/resotocore/db/graphdb.py +++ b/resotocore/resotocore/db/graphdb.py @@ -24,7 +24,7 @@ Union, ) -from aiostream import stream +from aiostream import stream, pipe from arango import AnalyzerGetError from arango.collection import VertexCollection, StandardCollection, EdgeCollection from arango.graph import Graph @@ -558,7 +558,7 @@ async def move_security_temp_to_proper() -> None: try: # stream updates to the temp collection - async with stream.chunks(stream.iterate(iterator), 1000).stream() as streamer: + async with (stream.iterate(iterator) | pipe.chunks(1000)).stream() as streamer: async for part in streamer: await update_chunk(dict(part)) # move temp collection to proper and history collection diff --git a/resotocore/resotocore/infra_apps/local_runtime.py b/resotocore/resotocore/infra_apps/local_runtime.py index 15fa8d0229..26c568eeb2 100644 --- a/resotocore/resotocore/infra_apps/local_runtime.py +++ b/resotocore/resotocore/infra_apps/local_runtime.py @@ -1,21 +1,21 @@ +import logging +from argparse import Namespace +from pydoc import locate from typing import List, AsyncIterator, Type, Optional, Any -from resotocore.infra_apps.runtime import Runtime + +from aiostream import stream, pipe +from jinja2 import Environment + +from resotocore.cli import NoExitArgumentParser, JsStream, JsGen +from resotocore.cli.model import CLI, CLIContext +from resotocore.db.model import QueryModel +from resotocore.ids import GraphName from resotocore.infra_apps.manifest import AppManifest +from resotocore.infra_apps.runtime import Runtime from resotocore.service import Service from resotocore.types import Json, JsonElement -from resotocore.ids import GraphName -from resotocore.db.model import QueryModel -from resotocore.cli.model import CLI, CLIContext -from resotocore.cli import NoExitArgumentParser -from jinja2 import Environment -import logging -from aiostream.core import Stream -from aiostream import stream -from argparse import Namespace -from resotolib.durations import parse_optional_duration from resotolib.asynchronous.utils import async_lines -from pydoc import locate - +from resotolib.durations import parse_optional_duration log = logging.getLogger(__name__) @@ -108,8 +108,8 @@ def str_to_type(type_str: Optional[str]) -> Optional[Type[Any]]: return parser.parse_args(argv) - async def _interpret_line(self, line: str, ctx: CLIContext) -> Stream: - command_streams: List[Stream] = [] + async def _interpret_line(self, line: str, ctx: CLIContext) -> JsStream: + command_streams: List[JsGen] = [] total_nr_outputs: int = 0 parsed_commands = await self.cli.evaluate_cli_command(line, ctx, True) for parsed in parsed_commands: @@ -117,4 +117,4 @@ async def _interpret_line(self, line: str, ctx: CLIContext) -> Stream: total_nr_outputs = total_nr_outputs + (src_ctx.count or 0) command_streams.append(command_output_stream) - return stream.concat(stream.iterate(command_streams), task_limit=1) + return stream.iterate(command_streams) | pipe.concat(task_limit=1) diff --git a/resotocore/resotocore/model/db_updater.py b/resotocore/resotocore/model/db_updater.py index 1f63d2365c..dbe2db570c 100644 --- a/resotocore/resotocore/model/db_updater.py +++ b/resotocore/resotocore/model/db_updater.py @@ -16,7 +16,7 @@ from typing import Optional, Union, Any, Generator, List, AsyncIterator, Dict import aiofiles -from aiostream import stream +from aiostream import stream, pipe from aiostream.core import Stream from attrs import define @@ -270,8 +270,11 @@ async def __process_item(self, item: GraphUpdateTask) -> Union[GraphUpdate, Exce async def start(self) -> None: async def wait_for_update() -> None: log.info("Start waiting for graph updates") - cl = stream.cycle(stream.call(self.update_queue.get)) - fl = stream.map(cl, self.__process_item, task_limit=self.config.graph_update.parallel_imports) + fl = ( + stream.call(self.update_queue.get) # type: ignore + | pipe.cycle() + | pipe.map(self.__process_item, task_limit=self.config.graph_update.parallel_imports) # type: ignore + ) with suppress(CancelledError): async with fl.stream() as streamer: async for update in streamer: @@ -371,7 +374,7 @@ async def read_forever() -> GraphUpdate: if isinstance(content, Path): await send_to_child(ReadFile(content, task_id)) else: - chunked: Stream = stream.chunks(content, BatchSize) + chunked: Stream[List[Union[bytes, Json]]] = stream.chunks(content, BatchSize) # type: ignore async with chunked.stream() as streamer: async for lines in streamer: if not await send_to_child(ReadElement(lines, task_id)): diff --git a/resotocore/resotocore/report/benchmark_renderer.py b/resotocore/resotocore/report/benchmark_renderer.py index d4b8f36d45..9e4e65b30c 100644 --- a/resotocore/resotocore/report/benchmark_renderer.py +++ b/resotocore/resotocore/report/benchmark_renderer.py @@ -1,7 +1,6 @@ -from typing import AsyncIterator, AsyncGenerator, List, Union +from typing import AsyncGenerator, List, AsyncIterable from aiostream import stream -from aiostream.core import Stream from networkx import DiGraph from rich._emoji_codes import EMOJI @@ -90,7 +89,7 @@ def render_check_result(check_result: CheckResult, account: str) -> str: return result -async def respond_benchmark_result(gen: Union[Stream, AsyncIterator[JsonElement]]) -> AsyncGenerator[str, None]: +async def respond_benchmark_result(gen: AsyncIterable[JsonElement]) -> AsyncGenerator[str, None]: # step 1: read graph graph = DiGraph() async with stream.iterate(gen).stream() as streamer: diff --git a/resotocore/resotocore/report/inspector_service.py b/resotocore/resotocore/report/inspector_service.py index c74c318adb..ba3efb4837 100644 --- a/resotocore/resotocore/report/inspector_service.py +++ b/resotocore/resotocore/report/inspector_service.py @@ -5,10 +5,12 @@ from functools import lru_cache from typing import Optional, List, Dict, Tuple, Callable, AsyncIterator, cast -from aiostream import stream +from aiostream import stream, pipe +from aiostream.core import Stream from attr import define from resotocore.analytics import CoreEvent +from resotocore.cli import list_sink from resotocore.cli.model import CLIContext, CLI from resotocore.config import ConfigEntity, ConfigHandler from resotocore.db.model import QueryModel @@ -302,7 +304,7 @@ async def perform_cmd(cmd: str) -> AsyncIterator[Json]: if context.accounts: account_list = ",".join(f'"{a}"' for a in context.accounts) cmd = f"search /{account_id_prop} in [{account_list}] | " + cmd - cli_result = await self.cli.execute_cli_command(cmd, stream.list, CLIContext(env=env)) + cli_result = await self.cli.execute_cli_command(cmd, list_sink, CLIContext(env=env)) for result in cli_result[0]: yield result @@ -351,7 +353,7 @@ def to_result(cc: CheckCollection) -> CheckCollectionResult: id=benchmark.id, ) - async def __perform_checks( + async def __perform_checks( # type: ignore self, graph: GraphName, checks: List[ReportCheck], context: CheckContext ) -> Dict[str, SingleCheckResult]: # load model @@ -362,9 +364,10 @@ async def __perform_checks( async def perform_single(check: ReportCheck) -> Tuple[str, SingleCheckResult]: return check.id, await self.__perform_check(graph, model, check, cfg, context) - async with stream.map( - stream.iterate(checks), perform_single, ordered=False, task_limit=context.parallel_checks - ).stream() as streamer: + check_results: Stream[Tuple[str, SingleCheckResult]] = stream.iterate(checks) | pipe.map( + perform_single, ordered=False, task_limit=context.parallel_checks # type: ignore + ) + async with check_results.stream() as streamer: return {key: value async for key, value in streamer} async def __perform_check( diff --git a/resotocore/resotocore/task/task_handler.py b/resotocore/resotocore/task/task_handler.py index 3c2f3cfa6a..36fabb9cdb 100644 --- a/resotocore/resotocore/task/task_handler.py +++ b/resotocore/resotocore/task/task_handler.py @@ -495,7 +495,7 @@ async def execute_commands() -> None: results[command] = None elif isinstance(command, ExecuteOnCLI): ctx = evolve(self.cli_context, env={**command.env, **wi.descriptor.environment}) - result = await self.cli.execute_cli_command(command.command, stream.list, ctx) + result = await self.cli.execute_cli_command(command.command, stream.list, ctx) # type: ignore results[command] = result else: raise AttributeError(f"Does not understand this command: {wi.descriptor.name}: {command}") diff --git a/resotocore/resotocore/web/api.py b/resotocore/resotocore/web/api.py index f3689cf4f9..2590cb4fea 100644 --- a/resotocore/resotocore/web/api.py +++ b/resotocore/resotocore/web/api.py @@ -648,7 +648,7 @@ async def perform_benchmark_on_checks(self, request: Request, deps: TenantDepend ) return await single_result(request, to_js(result)) - async def perform_benchmark(self, request: Request, deps: TenantDependencies) -> StreamResponse: + async def perform_benchmark(self, request: Request, deps: TenantDependencies) -> StreamResponse: # type: ignore benchmark = request.match_info["benchmark"] graph = GraphName(request.match_info["graph_id"]) acc = request.query.get("accounts") @@ -1305,7 +1305,7 @@ async def execute(self, request: Request, deps: TenantDependencies) -> StreamRes if temp_dir: shutil.rmtree(temp_dir) - async def execute_parsed( + async def execute_parsed( # type: ignore self, request: Request, command: str, parsed: List[ParsedCommandLine], ctx: CLIContext ) -> StreamResponse: # what is the accepted content type diff --git a/resotocore/tests/resotocore/cli/cli_test.py b/resotocore/tests/resotocore/cli/cli_test.py index 16153763e6..174973c953 100644 --- a/resotocore/tests/resotocore/cli/cli_test.py +++ b/resotocore/tests/resotocore/cli/cli_test.py @@ -3,7 +3,7 @@ import pytest from aiostream import stream -from resotocore.cli import strip_quotes, js_value_at, args_parts_parser, args_parts_unquoted_parser +from resotocore.cli import strip_quotes, js_value_at, args_parts_parser, args_parts_unquoted_parser, list_sink from resotocore.cli.cli import multi_command_parser, CLIService from resotocore.cli.command import ( ExecuteSearchCommand, @@ -131,30 +131,30 @@ async def test_order_of_commands(cli: CLI) -> None: @pytest.mark.asyncio async def test_help(cli: CLI) -> None: - result = await cli.execute_cli_command("help", stream.list) + result = await cli.execute_cli_command("help", list_sink) assert len(result[0]) == 1 # help for command - result = await cli.execute_cli_command("help count", stream.list) + result = await cli.execute_cli_command("help count", list_sink) assert len(result[0]) == 1 # help for alias - result = await cli.execute_cli_command("help kind", stream.list) + result = await cli.execute_cli_command("help kind", list_sink) assert len(result[0]) == 1 # help for alias template - result = await cli.execute_cli_command("help discord", stream.list) + result = await cli.execute_cli_command("help discord", list_sink) assert len(result[0]) == 1 # help for infra app alias cli.register_infra_app_alias(InfraAppAlias("testcommand", "this is a test alias", "this is a readme", [])) - result = await cli.execute_cli_command("help testcommand", stream.list) + result = await cli.execute_cli_command("help testcommand", list_sink) assert len(result[0]) == 1 @pytest.mark.asyncio async def test_parse_env_vars(cli: CLI) -> None: - result = await cli.execute_cli_command('test=foo bla="bar" d=true env', stream.list) + result = await cli.execute_cli_command('test=foo bla="bar" d=true env', list_sink) # the env is allowed to have more items. Check only for this subset. assert {"test": "foo", "bla": "bar", "d": True}.items() <= result[0][0].items() diff --git a/resotocore/tests/resotocore/cli/command_test.py b/resotocore/tests/resotocore/cli/command_test.py index db6683da38..c86239c277 100644 --- a/resotocore/tests/resotocore/cli/command_test.py +++ b/resotocore/tests/resotocore/cli/command_test.py @@ -5,20 +5,19 @@ import sqlite3 from datetime import timedelta from functools import partial -from typing import List, Dict, Optional, Any, Tuple, Type, TypeVar, cast, Callable, Set +from typing import List, Dict, Optional, Any, Tuple, Type, TypeVar, cast, Callable, Set, Awaitable import pytest import yaml from _pytest.logging import LogCaptureFixture from aiohttp import ClientTimeout from aiohttp.web import Request -from aiostream import stream -from aiostream.core import Stream +from aiostream import stream, pipe from attrs import evolve from pytest import fixture from resotocore import version -from resotocore.cli import is_node +from resotocore.cli import is_node, JsGen, JsStream, list_sink from resotocore.cli.cli import CLIService from resotocore.cli.command import HttpCommand, JqCommand, AggregateCommand, all_commands from resotocore.dependencies import TenantDependencies @@ -61,91 +60,91 @@ def test_known_category(dependencies: TenantDependencies) -> None: @pytest.mark.asyncio async def test_echo_source(cli: CLI) -> None: # no arg passed to json - result = await cli.execute_cli_command("echo", stream.list) + result = await cli.execute_cli_command("echo", list_sink) assert result[0] == [""] # simple string passed to json - result = await cli.execute_cli_command("echo this is a string", stream.list) + result = await cli.execute_cli_command("echo this is a string", list_sink) assert result[0] == ["this is a string"] - result = await cli.execute_cli_command('echo "foo bla bar" ', stream.list) + result = await cli.execute_cli_command('echo "foo bla bar" ', list_sink) assert result[0] == ["foo bla bar"] @pytest.mark.asyncio async def test_json_source(cli: CLI) -> None: # json object passed to json - result = await cli.execute_cli_command('json {"a": 1}', stream.list) + result = await cli.execute_cli_command('json {"a": 1}', list_sink) assert result[0] == [{"a": 1}] # json array passed to json - result = await cli.execute_cli_command('json [{"a": 1}, {"b":2}]', stream.list) + result = await cli.execute_cli_command('json [{"a": 1}, {"b":2}]', list_sink) assert result[0] == [{"a": 1}, {"b": 2}] # json string passed to json - result = await cli.execute_cli_command('json "foo bla bar"', stream.list) + result = await cli.execute_cli_command('json "foo bla bar"', list_sink) assert result[0] == ["foo bla bar"] @pytest.mark.asyncio async def test_predecessors(cli: CLI) -> None: - r1 = await cli.execute_cli_command("search id(4_0) | predecessors", stream.list) + r1 = await cli.execute_cli_command("search id(4_0) | predecessors", list_sink) assert len(r1[0]) == 1 - r2 = await cli.execute_cli_command("search id(4_0) | predecessors --with-origin", stream.list) + r2 = await cli.execute_cli_command("search id(4_0) | predecessors --with-origin", list_sink) assert len(r2[0]) == 2 - r3 = await cli.execute_cli_command("search id(4_0) | predecessors --with-origin default", stream.list) + r3 = await cli.execute_cli_command("search id(4_0) | predecessors --with-origin default", list_sink) assert len(r3[0]) == 2 - r4 = await cli.execute_cli_command("search id(4_0) | predecessors delete", stream.list) + r4 = await cli.execute_cli_command("search id(4_0) | predecessors delete", list_sink) assert len(r4[0]) == 0 @pytest.mark.asyncio async def test_ancestors(cli: CLI) -> None: - r1 = await cli.execute_cli_command("search id(4_0) | ancestors", stream.list) + r1 = await cli.execute_cli_command("search id(4_0) | ancestors", list_sink) assert len(r1[0]) == 4 - r2 = await cli.execute_cli_command("search id(4_0) | ancestors --with-origin", stream.list) + r2 = await cli.execute_cli_command("search id(4_0) | ancestors --with-origin", list_sink) assert len(r2[0]) == 5 - r3 = await cli.execute_cli_command("search id(4_0) | ancestors --with-origin default", stream.list) + r3 = await cli.execute_cli_command("search id(4_0) | ancestors --with-origin default", list_sink) assert len(r3[0]) == 5 - r4 = await cli.execute_cli_command("search id(4_0) | ancestors delete", stream.list) + r4 = await cli.execute_cli_command("search id(4_0) | ancestors delete", list_sink) assert len(r4[0]) == 0 @pytest.mark.asyncio async def test_successors(cli: CLI) -> None: - r1 = await cli.execute_cli_command("search id(4) | successors", stream.list) + r1 = await cli.execute_cli_command("search id(4) | successors", list_sink) assert len(r1[0]) == 10 - r2 = await cli.execute_cli_command("search id(4) | successors --with-origin", stream.list) + r2 = await cli.execute_cli_command("search id(4) | successors --with-origin", list_sink) assert len(r2[0]) == 11 - r3 = await cli.execute_cli_command("search id(4) | successors --with-origin default", stream.list) + r3 = await cli.execute_cli_command("search id(4) | successors --with-origin default", list_sink) assert len(r3[0]) == 11 - r4 = await cli.execute_cli_command("search id(4) | successors delete", stream.list) + r4 = await cli.execute_cli_command("search id(4) | successors delete", list_sink) assert len(r4[0]) == 0 @pytest.mark.asyncio async def test_descendants(cli: CLI) -> None: - r1 = await cli.execute_cli_command("search id(4) | descendants", stream.list) + r1 = await cli.execute_cli_command("search id(4) | descendants", list_sink) assert len(r1[0]) == 10 - r2 = await cli.execute_cli_command("search id(4) | descendants --with-origin", stream.list) + r2 = await cli.execute_cli_command("search id(4) | descendants --with-origin", list_sink) assert len(r2[0]) == 11 - r3 = await cli.execute_cli_command("search id(4) | descendants --with-origin default", stream.list) + r3 = await cli.execute_cli_command("search id(4) | descendants --with-origin default", list_sink) assert len(r3[0]) == 11 - r4 = await cli.execute_cli_command("search id(4) | descendants delete", stream.list) + r4 = await cli.execute_cli_command("search id(4) | descendants delete", list_sink) assert len(r4[0]) == 0 @pytest.mark.asyncio async def test_search_source(cli: CLIService) -> None: - result = await cli.execute_cli_command('search is("foo") and some_int==0 --> identifier=~"9_"', stream.list) + result = await cli.execute_cli_command('search is("foo") and some_int==0 --> identifier=~"9_"', list_sink) assert len(result[0]) == 10 await cli.dependencies.template_expander.put_template( Template("test", 'is(foo) and some_int==0 --> identifier=~"{{fid}}"') ) - result2 = await cli.execute_cli_command('search expand(test, fid="9_")', stream.list) + result2 = await cli.execute_cli_command('search expand(test, fid="9_")', list_sink) assert len(result2[0]) == 10 - result3 = await cli.execute_cli_command("search --with-edges is(graph_root) -[0:1]->", stream.list) + result3 = await cli.execute_cli_command("search --with-edges is(graph_root) -[0:1]->", list_sink) # node: graph_root # node: collector # edge: graph_root -> collector @@ -153,7 +152,7 @@ async def test_search_source(cli: CLIService) -> None: # = 3 elements assert len(result3[0]) == 3 - result4 = await cli.execute_cli_command("search --explain --with-edges is(graph_root) -[0:1]->", stream.list) + result4 = await cli.execute_cli_command("search --explain --with-edges is(graph_root) -[0:1]->", list_sink) assert result4[0][0]["rating"] == "simple" # use absolute path syntax @@ -162,7 +161,7 @@ async def test_search_source(cli: CLIService) -> None: "is(foo) and not(/reported.some_int!=0) " "{child: --> /metadata!=null} some_int==0 " "with(any, --> /metadata!=null) sort /reported.name asc limit 1", - stream.list, + list_sink, ) assert result5 == [[{"group": {"kind": "foo"}, "si": 0}]] @@ -171,52 +170,52 @@ async def test_search_source(cli: CLIService) -> None: async def test_sleep_source(cli: CLI) -> None: with pytest.raises(CLIParseError): await cli.evaluate_cli_command("sleep forever") - result = await cli.execute_cli_command("sleep 0.001; echo hello", stream.list) + result = await cli.execute_cli_command("sleep 0.001; echo hello", list_sink) assert result == [[""], ["hello"]] @pytest.mark.asyncio async def test_count_command(cli: CLI, json_source: str) -> None: # count instances - result = await cli.execute_cli_command(f"{json_source} | count", stream.list) + result = await cli.execute_cli_command(f"{json_source} | count", list_sink) assert len(result[0]) == 2 assert result[0] == ["total matched: 200", "total unmatched: 0"] # count attributes - result = await cli.execute_cli_command(f"{json_source} | count num", stream.list) + result = await cli.execute_cli_command(f"{json_source} | count num", list_sink) assert len(result[0]) == 102 assert result[0][-2] == "total matched: 200" assert result[0][-1] == "total unmatched: 0" # count attributes with path - result = await cli.execute_cli_command(f"{json_source} | count inner.num", stream.list) + result = await cli.execute_cli_command(f"{json_source} | count inner.num", list_sink) assert len(result[0]) == 12 assert result[0][-2] == "total matched: 200" assert result[0][-1] == "total unmatched: 0" # count unknown attributes - result = await cli.execute_cli_command(f"{json_source} | count does_not_exist", stream.list) + result = await cli.execute_cli_command(f"{json_source} | count does_not_exist", list_sink) assert len(result[0]) == 2 assert result[0] == ["total matched: 0", "total unmatched: 200"] @pytest.mark.asyncio async def test_head_command(cli: CLI) -> None: - assert await cli.execute_cli_command("json [1,2,3,4,5] | head 2 | dump", stream.list) == [[1, 2]] - assert await cli.execute_cli_command("json [1,2,3,4,5] | head -2 | dump", stream.list) == [[1, 2]] - assert await cli.execute_cli_command("json [1,2,3,4,5] | head | dump", stream.list) == [[1, 2, 3, 4, 5]] + assert await cli.execute_cli_command("json [1,2,3,4,5] | head 2 | dump", list_sink) == [[1, 2]] + assert await cli.execute_cli_command("json [1,2,3,4,5] | head -2 | dump", list_sink) == [[1, 2]] + assert await cli.execute_cli_command("json [1,2,3,4,5] | head | dump", list_sink) == [[1, 2, 3, 4, 5]] @pytest.mark.asyncio async def test_tail_command(cli: CLI) -> None: - assert await cli.execute_cli_command("json [1,2,3,4,5] | tail 2 | dump", stream.list) == [[4, 5]] - assert await cli.execute_cli_command("json [1,2,3,4,5] | tail -2 | dump", stream.list) == [[4, 5]] - assert await cli.execute_cli_command("json [1,2,3,4,5] | tail | dump", stream.list) == [[1, 2, 3, 4, 5]] + assert await cli.execute_cli_command("json [1,2,3,4,5] | tail 2 | dump", list_sink) == [[4, 5]] + assert await cli.execute_cli_command("json [1,2,3,4,5] | tail -2 | dump", list_sink) == [[4, 5]] + assert await cli.execute_cli_command("json [1,2,3,4,5] | tail | dump", list_sink) == [[1, 2, 3, 4, 5]] @pytest.mark.asyncio async def test_chunk_command(cli: CLI, json_source: str) -> None: - result: List[List[str]] = await cli.execute_cli_command(f"{json_source} | chunk 50 | dump", stream.list) + result: List[List[str]] = await cli.execute_cli_command(f"{json_source} | chunk 50 | dump", list_sink) assert len(result[0]) == 4 # 200 in chunks of 50 for a in result[0]: assert len(a) == 50 @@ -224,19 +223,19 @@ async def test_chunk_command(cli: CLI, json_source: str) -> None: @pytest.mark.asyncio async def test_flatten_command(cli: CLI, json_source: str) -> None: - result = await cli.execute_cli_command(f"{json_source} | chunk 50 | flatten", stream.list) + result = await cli.execute_cli_command(f"{json_source} | chunk 50 | flatten", list_sink) assert len(result[0]) == 200 @pytest.mark.asyncio async def test_uniq_command(cli: CLI, json_source: str) -> None: - result = await cli.execute_cli_command(f"{json_source} | uniq", stream.list) + result = await cli.execute_cli_command(f"{json_source} | uniq", list_sink) assert len(result[0]) == 100 @pytest.mark.asyncio async def test_set_desired_command(cli: CLI) -> None: - result = await cli.execute_cli_command('search is("foo") | set_desired a="test" b=1 c=true | dump', stream.list) + result = await cli.execute_cli_command('search is("foo") | set_desired a="test" b=1 c=true | dump', list_sink) assert len(result[0]) == 10 for elem in result[0]: assert {"a": "test", "b": 1, "c": True}.items() <= elem["desired"].items() @@ -244,7 +243,7 @@ async def test_set_desired_command(cli: CLI) -> None: @pytest.mark.asyncio async def test_set_metadata_command(cli: CLI) -> None: - result = await cli.execute_cli_command('search is("foo") | set_metadata a="test" b=1 c=true | dump', stream.list) + result = await cli.execute_cli_command('search is("foo") | set_metadata a="test" b=1 c=true | dump', list_sink) assert len(result[0]) == 10 for elem in result[0]: assert {"a": "test", "b": 1, "c": True}.items() <= elem["metadata"].items() @@ -252,7 +251,7 @@ async def test_set_metadata_command(cli: CLI) -> None: @pytest.mark.asyncio async def test_clean_command(cli: CLI) -> None: - result = await cli.execute_cli_command('search is("foo") | clean | dump', stream.list) + result = await cli.execute_cli_command('search is("foo") | clean | dump', list_sink) assert len(result[0]) == 10 for elem in result[0]: assert {"clean": True}.items() <= elem["desired"].items() @@ -260,7 +259,7 @@ async def test_clean_command(cli: CLI) -> None: @pytest.mark.asyncio async def test_protect_command(cli: CLI) -> None: - result = await cli.execute_cli_command('search is("foo") | protect | dump', stream.list) + result = await cli.execute_cli_command('search is("foo") | protect | dump', list_sink) assert len(result[0]) == 10 for elem in result[0]: assert {"protected": True}.items() <= elem["metadata"].items() @@ -268,14 +267,14 @@ async def test_protect_command(cli: CLI) -> None: @pytest.mark.asyncio async def test_list_sink(cli: CLI, dependencies: TenantDependencies) -> None: - result = await cli.execute_cli_command("json [1,2,3] | dump", stream.list) + result = await cli.execute_cli_command("json [1,2,3] | dump", list_sink) assert result == [[1, 2, 3]] @pytest.mark.asyncio async def test_flat_sink(cli: CLI) -> None: parsed = await cli.evaluate_cli_command("json [1,2,3] | dump; json [4,5,6] | dump; json [7,8,9] | dump") - result = await stream.list(stream.concat(stream.iterate((await p.execute())[1] for p in parsed))) + result = await stream.list(stream.iterate((await p.execute())[1] for p in parsed) | pipe.concat()) assert result == [1, 2, 3, 4, 5, 6, 7, 8, 9] @@ -283,27 +282,27 @@ async def test_flat_sink(cli: CLI) -> None: async def test_format(cli: CLI) -> None: # access properties by name and path result = await cli.execute_cli_command( - 'json {"a":"b", "b": {"c":"d"}} | format a:{a} b:{b.c} na:{fuerty}', stream.list + 'json {"a":"b", "b": {"c":"d"}} | format a:{a} b:{b.c} na:{fuerty}', list_sink ) assert result[0] == ["a:b b:d na:null"] # use correct type props = dict(a="a", b=True, c=False, d=None, e=12, f=1.234) - result = await cli.execute_cli_command(f"json {json.dumps(props)}" " | format {a}:{b}:{c}:{d}:{e}:{f}", stream.list) + result = await cli.execute_cli_command(f"json {json.dumps(props)}" " | format {a}:{b}:{c}:{d}:{e}:{f}", list_sink) assert result[0] == ["a:true:false:null:12:1.234"] # access deeply nested properties with dict and array result = await cli.execute_cli_command( - 'json {"a":{"b":{"c":{"d":[0,1,2, {"e":"f"}]}}}} | format will be an >{a.b.c.d[3].e}<', stream.list + 'json {"a":{"b":{"c":{"d":[0,1,2, {"e":"f"}]}}}} | format will be an >{a.b.c.d[3].e}<', list_sink ) assert result[0] == ["will be an >f<"] # make sure any path that is not available leads to the null value - result = await cli.execute_cli_command("json {} | format {a}:{b.c.d}:{foo.bla[23].test}", stream.list) + result = await cli.execute_cli_command("json {} | format {a}:{b.c.d}:{foo.bla[23].test}", list_sink) assert result[0] == ["null:null:null"] # Queries that use the reported section, also interpret the format in the reported section result = await cli.execute_cli_command( "search id(sub_root) limit 1 | format {{aa}} {some_string} test}} {some_int} {/metadata.node_id} {{", - stream.list, + list_sink, ) assert result[0] == ["{aa} hello test} 0 sub_root {"] @@ -312,7 +311,7 @@ async def test_format(cli: CLI) -> None: async def test_workflows_command(cli: CLIService, task_handler: TaskHandlerService, test_workflow: Workflow) -> None: async def execute(cmd: str) -> List[JsonElement]: ctx = CLIContext(cli.cli_env) - return (await cli.execute_cli_command(cmd, stream.list, ctx))[0] # type: ignore + return (await cli.execute_cli_command(cmd, list_sink, ctx))[0] # type: ignore assert await execute("workflows list") == ["sleep_workflow", "wait_for_collect_done", "test_workflow"] assert await execute("workflows show test_workflow") == [to_js(test_workflow)] @@ -356,7 +355,7 @@ async def execute(cmd: str) -> List[JsonElement]: async def test_jobs_command(cli: CLIService, task_handler: TaskHandlerService, job_db: JobDb) -> None: async def execute(cmd: str) -> List[List[JsonElement]]: ctx = CLIContext(cli.cli_env) - return await cli.execute_cli_command(cmd, stream.list, ctx) + return await cli.execute_cli_command(cmd, list_sink, ctx) # add job with schedule result = await execute('jobs add --id hello --schedule "23 1 * * *" echo Hello World @NOW@') @@ -442,10 +441,10 @@ def nr_of_performed() -> int: nr_of_performed() # reset to 0 - assert await cli.execute_cli_command("echo id_does_not_exist | tag update foo bla", stream.list) == [[]] + assert await cli.execute_cli_command("echo id_does_not_exist | tag update foo bla", list_sink) == [[]] assert nr_of_performed() == 0 res1 = await cli.execute_cli_command( - 'json ["root", "collector"] | tag update foo "bla_{reported.some_int}" | dump', stream.list + 'json ["root", "collector"] | tag update foo "bla_{reported.some_int}" | dump', list_sink ) assert nr_of_performed() == 2 assert {a["id"] for a in res1[0]} == {"root", "collector"} @@ -457,18 +456,18 @@ def nr_of_performed() -> int: assert not data.node.metadata.is_none # the node metadata section is defined assert not data.node.ancestors.cloud.reported.is_none # the ancestors cloud section is defineda assert data["update"].foo == "bla_0" # using the renderer bla_{reported.some_int} - res2 = await cli.execute_cli_command('search is("foo") | tag update foo bla', stream.list) + res2 = await cli.execute_cli_command('search is("foo") | tag update foo bla', list_sink) assert nr_of_performed() == 10 assert len(res2[0]) == 10 - res2_tag_no_val = await cli.execute_cli_command('search is("foo") | tag update foobar', stream.list) + res2_tag_no_val = await cli.execute_cli_command('search is("foo") | tag update foobar', list_sink) assert nr_of_performed() == 10 assert len(res2_tag_no_val[0]) == 10 - res3 = await cli.execute_cli_command('search is("foo") | tag delete foo', stream.list) + res3 = await cli.execute_cli_command('search is("foo") | tag delete foo', list_sink) assert nr_of_performed() == 10 assert len(res3[0]) == 10 with caplog.at_level(logging.WARNING): caplog.clear() - res4 = await cli.execute_cli_command('search is("bla") limit 2 | tag delete foo', stream.list) + res4 = await cli.execute_cli_command('search is("bla") limit 2 | tag delete foo', list_sink) assert nr_of_performed() == 2 assert len(res4[0]) == 2 # make sure that 2 warnings are emitted @@ -476,7 +475,7 @@ def nr_of_performed() -> int: for res in caplog.records: assert res.message.startswith("Update not reflected in db. Wait until next collector run.") # tag updates can be put into background - res6 = await cli.execute_cli_command('json ["root", "collector"] | tag update --nowait foo bla', stream.list) + res6 = await cli.execute_cli_command('json ["root", "collector"] | tag update --nowait foo bla', list_sink) assert cli.dependencies.forked_tasks.qsize() == 2 for res in res6[0]: # in this case a message with the task id is emitted @@ -488,10 +487,10 @@ def nr_of_performed() -> int: @pytest.mark.asyncio async def test_kinds_command(cli: CLI, foo_model: Model) -> None: - result = await cli.execute_cli_command("kind", stream.list) + result = await cli.execute_cli_command("kind", list_sink) for kind in ["account", "bla", "child", "cloud", "parent", "region", "some_complex"]: assert kind in result[0] - result = await cli.execute_cli_command("kind foo", stream.list) + result = await cli.execute_cli_command("kind foo", list_sink) assert result[0][0] == { "name": "foo", "bases": ["base"], @@ -507,9 +506,9 @@ async def test_kinds_command(cli: CLI, foo_model: Model) -> None: }, "successors": ["bla"], } - result = await cli.execute_cli_command("kind string", stream.list) + result = await cli.execute_cli_command("kind string", list_sink) assert result[0][0] == {"name": "string", "runtime_kind": "string"} - result = await cli.execute_cli_command("kind -p reported.ctime", stream.list) + result = await cli.execute_cli_command("kind -p reported.ctime", list_sink) assert result[0][0] == { "name": "datetime", "runtime_kind": "datetime", @@ -527,13 +526,13 @@ async def test_kinds_command(cli: CLI, foo_model: Model) -> None: ], } with pytest.raises(Exception): - await cli.execute_cli_command("kind foo bla bar", stream.list) + await cli.execute_cli_command("kind foo bla bar", list_sink) @pytest.mark.asyncio async def test_sort_command(cli: CLI) -> None: async def identifiers(query: str) -> List[str]: - result = await cli.execute_cli_command(query + " | dump", stream.list) + result = await cli.execute_cli_command(query + " | dump", list_sink) return [r["reported"]["identifier"] for r in result[0]] id_wo = await identifiers("search is(bla) | sort identifier") @@ -548,7 +547,7 @@ async def identifiers(query: str) -> List[str]: @pytest.mark.asyncio async def test_limit_command(cli: CLI) -> None: async def identifiers(query: str) -> List[str]: - result = await cli.execute_cli_command(query + " | dump", stream.list) + result = await cli.execute_cli_command(query + " | dump", list_sink) return [r["reported"]["identifier"] for r in result[0]] assert await identifiers("search is(bla) sort identifier | limit 1") == ["0_0"] @@ -560,37 +559,37 @@ async def identifiers(query: str) -> List[str]: @pytest.mark.asyncio async def test_list_command(cli: CLI) -> None: - result = await cli.execute_cli_command('search is (foo) and identifier=="4" sort some_int | list', stream.list) + result = await cli.execute_cli_command('search is (foo) and identifier=="4" sort some_int | list', list_sink) assert len(result[0]) == 1 assert result[0][0].startswith("kind=foo, identifier=4, some_int=0, age=") list_cmd = "list some_int as si, some_string" - result = await cli.execute_cli_command(f'search is (foo) and identifier=="4" | {list_cmd}', stream.list) + result = await cli.execute_cli_command(f'search is (foo) and identifier=="4" | {list_cmd}', list_sink) assert result[0] == ["si=0, some_string=hello"] # list is added automatically when no output renderer is defined and has the same behaviour as if it was given - result = await cli.execute_cli_command('search is (foo) and identifier=="4" sort some_int', stream.list) + result = await cli.execute_cli_command('search is (foo) and identifier=="4" sort some_int', list_sink) assert result[0][0].startswith("kind=foo, identifier=4, some_int=0, age=") # List is using the correct type props = dict(id="test", a="a", b=True, c=False, d=None, e=12, f=1.234, reported={}) - result = await cli.execute_cli_command(f"json {json.dumps(props)} | list a,b,c,d,e,f", stream.list) + result = await cli.execute_cli_command(f"json {json.dumps(props)} | list a,b,c,d,e,f", list_sink) assert result[0] == ["a=a, b=true, c=false, e=12, f=1.234"] # Queries that use the reported section, also interpret the list format in the reported section result = await cli.execute_cli_command( - "search id(sub_root) limit 1 | list some_string, some_int, /metadata.node_id", stream.list + "search id(sub_root) limit 1 | list some_string, some_int, /metadata.node_id", list_sink ) assert result[0] == ["some_string=hello, some_int=0, node_id=sub_root"] # List supports csv output result = await cli.execute_cli_command( - f"json {json.dumps(props)} | list --csv a,`b`,c,`d`,e,`f`,non_existent", stream.list + f"json {json.dumps(props)} | list --csv a,`b`,c,`d`,e,`f`,non_existent", list_sink ) assert result[0] == ['"a","b","c","d","e","f","non_existent"', '"a",True,False,"",12,1.234,""'] # List supports markdown output result = await cli.execute_cli_command( - f"json {json.dumps(props)} | list --markdown a,b,c,d,e,f,non_existent", stream.list + f"json {json.dumps(props)} | list --markdown a,b,c,d,e,f,non_existent", list_sink ) assert result[0] == [ "|a|b |c |d |e |f |non_existent|", @@ -601,7 +600,7 @@ async def test_list_command(cli: CLI) -> None: # List supports markdown output result = await cli.execute_cli_command( 'json {"id": "foo", "reported":{}, "name": "a", "some_int": 1, "tags": {"foo․bla․bar.test.rest.best.":"yup"}} | list --json-table name, some_int, tags.`foo․bla․bar.test.rest.best.`', - stream.list, + list_sink, ) assert result[0] == [ { @@ -616,14 +615,14 @@ async def test_list_command(cli: CLI) -> None: # List supports only markdown or csv, but not both at the same time with pytest.raises(CLIParseError): - await cli.execute_cli_command(f"json {json.dumps(props)}" " | list --csv --markdown", stream.list) + await cli.execute_cli_command(f"json {json.dumps(props)}" " | list --csv --markdown", list_sink) # List command will make sure to make the column name unique props = dict(id="123", reported=props, ancestors={"account": {"reported": props}}) result = await cli.execute_cli_command( f"json {json.dumps(props)} | list reported.a, reported.b as a, reported.c as a, reported.c, " f"ancestors.account.reported.a, ancestors.account.reported.a, ancestors.account.reported.a as foo", - stream.list, + list_sink, ) # b as a ==> b, c as a ==> c, c ==> c_1, ancestors.account.reported.a ==> account_a, again ==> _1 assert result[0][0] == "a=a, b=true, c=false, c_1=false, account_a=a, account_a_1=a, foo=a" @@ -651,28 +650,28 @@ async def test_jq_command(cli: CLI) -> None: == ".reported.pod_status.container_statuses[].image_id" ) - result = await cli.execute_cli_command('json {"a":{"b":1}} | jq ".a.b"', stream.list) + result = await cli.execute_cli_command('json {"a":{"b":1}} | jq ".a.b"', list_sink) assert len(result[0]) == 1 assert result[0][0] == 1 # allow absolute paths as json path - result = await cli.execute_cli_command('json {"id":"123", "reported":{"b":1}} | jq "./reported"', stream.list) + result = await cli.execute_cli_command('json {"id":"123", "reported":{"b":1}} | jq "./reported"', list_sink) assert result == [[{"b": 1}]] # jq .kind is rewritten as .reported.kind - result = await cli.execute_cli_command("search is(foo) limit 2 | jq .kind", stream.list) + result = await cli.execute_cli_command("search is(foo) limit 2 | jq .kind", list_sink) assert result[0] == ["foo", "foo"] @pytest.mark.asyncio async def test_execute_search_command(cli: CLI) -> None: # regression test: this used to fail because the arg could not be parsed - await cli.execute_cli_command('execute_search (b= "0")', stream.list) + await cli.execute_cli_command('execute_search (b= "0")', list_sink) @pytest.mark.asyncio async def test_aggregation_to_count_command(cli: CLI) -> None: - r = await cli.execute_cli_command("search all | count kind", stream.list) + r = await cli.execute_cli_command("search all | count kind", list_sink) assert set(r[0]) == { "graph_root: 1", "cloud: 1", @@ -685,7 +684,7 @@ async def test_aggregation_to_count_command(cli: CLI) -> None: # exactly the same command as above (above search would be rewritten as this) r = await cli.execute_cli_command( "execute_search aggregate(reported.kind as name: sum(1) as count):all sort count asc | aggregate_to_count", - stream.list, + list_sink, ) assert set(r[0]) == { "graph_root: 1", @@ -701,7 +700,7 @@ async def test_aggregation_to_count_command(cli: CLI) -> None: @pytest.mark.skipif(not_in_path("arangodump"), reason="requires arangodump to be in path") @pytest.mark.asyncio async def test_system_backup_command(cli: CLI) -> None: - async def check_backup(res: Stream) -> None: + async def check_backup(res: JsStream) -> None: async with res.stream() as streamer: only_one = True async for s in streamer: @@ -717,7 +716,7 @@ async def check_backup(res: Stream) -> None: @pytest.mark.asyncio async def test_system_info_command(cli: CLI) -> None: - info = AccessJson.wrap_object((await cli.execute_cli_command("system info", stream.list))[0][0]) + info = AccessJson.wrap_object((await cli.execute_cli_command("system info", list_sink))[0][0]) assert info.version == version() assert info.name == "resotocore" assert info.cpus > 0 @@ -728,7 +727,7 @@ async def test_system_info_command(cli: CLI) -> None: async def test_system_restore_command(cli: CLI, tmp_directory: str) -> None: backup = os.path.join(tmp_directory, "backup") - async def move_backup(res: Stream) -> None: + async def move_backup(res: JsStream) -> None: async with res.stream() as streamer: async for s in streamer: path = FilePath.from_path(s) @@ -736,7 +735,7 @@ async def move_backup(res: Stream) -> None: await cli.execute_cli_command("system backup create", move_backup) ctx = CLIContext(uploaded_files={"backup": backup}) - restore = await cli.execute_cli_command(f"BACKUP_NO_SYS_EXIT=true system backup restore {backup}", stream.list, ctx) + restore = await cli.execute_cli_command(f"BACKUP_NO_SYS_EXIT=true system backup restore {backup}", list_sink, ctx) assert restore == [ [ "Database has been restored successfully!", @@ -749,25 +748,26 @@ async def move_backup(res: Stream) -> None: async def test_configs_command(cli: CLI, tmp_directory: str) -> None: config_file = os.path.join(tmp_directory, "config.yml") - async def check_file_is_yaml(res: Stream) -> None: + async def check_file_is_yaml(res: JsStream) -> None: async with res.stream() as streamer: async for s in streamer: + assert isinstance(s, str) with open(s, "r") as file: yaml.safe_load(file.read()) # create a new config entry - create_result = await cli.execute_cli_command("configs set test_config t1=1, t2=2, t3=3 ", stream.list) + create_result = await cli.execute_cli_command("configs set test_config t1=1, t2=2, t3=3 ", list_sink) assert create_result[0][0] == "t1: 1\nt2: 2\nt3: 3\n" # show the entry - should be the same as the created one - show_result = await cli.execute_cli_command("configs show test_config", stream.list) + show_result = await cli.execute_cli_command("configs show test_config", list_sink) assert show_result[0][0] == "t1: 1\nt2: 2\nt3: 3\n" # list all configs: only one is defined - list_result = await cli.execute_cli_command("configs list", stream.list) + list_result = await cli.execute_cli_command("configs list", list_sink) assert list_result[0] == ["test_config"] # copy the config - await cli.execute_cli_command("configs copy test_config test_config_copy", stream.list) - list_result = await cli.execute_cli_command("configs list", stream.list) + await cli.execute_cli_command("configs copy test_config test_config_copy", list_sink) + list_result = await cli.execute_cli_command("configs list", list_sink) assert list_result[0] == ["test_config", "test_config_copy"] # edit the config: will make the config available as file @@ -777,41 +777,41 @@ async def check_file_is_yaml(res: Stream) -> None: with open(config_file, "w") as file: file.write(update_doc) ctx = CLIContext(uploaded_files={"config.yaml": config_file}) - update_result = await cli.execute_cli_command(f"configs update test_config {config_file}", stream.list, ctx) + update_result = await cli.execute_cli_command(f"configs update test_config {config_file}", list_sink, ctx) assert update_result == [[]] # show the entry - should be the same as the created one - show_updated_result = await cli.execute_cli_command("configs show test_config", stream.list) + show_updated_result = await cli.execute_cli_command("configs show test_config", list_sink) assert show_updated_result[0][0] == update_doc # write a env var substitution to the config env_var_update = "foo: $(FOO)\n" with open(config_file, "w") as file: file.write(env_var_update) ctx = CLIContext(uploaded_files={"config.yaml": config_file}) - update_result = await cli.execute_cli_command(f"configs update test_config {config_file}", stream.list, ctx) + update_result = await cli.execute_cli_command(f"configs update test_config {config_file}", list_sink, ctx) # provide the env var os.environ["FOO"] = "bar" # check the configs: the env var should stay here and not be resolved when the user views the config - show_updated_result = await cli.execute_cli_command("configs show test_config", stream.list) + show_updated_result = await cli.execute_cli_command("configs show test_config", list_sink) assert show_updated_result[0][0] == env_var_update @pytest.mark.asyncio async def test_templates_command(cli: CLI) -> None: - result = await cli.execute_cli_command("templates test kind=volume is({{kind}})", stream.list) + result = await cli.execute_cli_command("templates test kind=volume is({{kind}})", list_sink) assert result == [["is(volume)"]] - result = await cli.execute_cli_command("templates add filter_kind is({{kind}})", stream.list) + result = await cli.execute_cli_command("templates add filter_kind is({{kind}})", list_sink) assert result == [["Template filter_kind added to the search library.\nis({{kind}})"]] - result = await cli.execute_cli_command("templates", stream.list) + result = await cli.execute_cli_command("templates", list_sink) assert result == [["filter_kind: is({{kind}})"]] - result = await cli.execute_cli_command("templates filter_kind", stream.list) + result = await cli.execute_cli_command("templates filter_kind", list_sink) assert result == [["is({{kind}})"]] - result = await cli.execute_cli_command("templates delete filter_kind", stream.list) + result = await cli.execute_cli_command("templates delete filter_kind", list_sink) assert result == [["Template filter_kind deleted from the search library."]] @pytest.mark.asyncio async def test_write_command(cli: CLI) -> None: - async def check_file(res: Stream, check_content: Optional[str] = None) -> None: + async def check_file(res: JsStream, check_content: Optional[str] = None) -> None: async with res.stream() as streamer: only_one = True async for s in streamer: @@ -832,14 +832,14 @@ async def check_file(res: Stream, check_content: Optional[str] = None) -> None: await cli.execute_cli_command("search all limit 3 | format --yaml | write write_test.yaml ", check_file) # throw an exception with pytest.raises(Exception): - await cli.execute_cli_command("echo hello | write", stream.list) # missing filename + await cli.execute_cli_command("echo hello | write", list_sink) # missing filename # write enforces unescaped output. env = {"now": utc_str()} # fix the time, so that replacements will stay equal truecolor = CLIContext(console_renderer=ConsoleRenderer(80, 25, ConsoleColorSystem.truecolor, True), env=env) monochrome = CLIContext(console_renderer=ConsoleRenderer.default_renderer(), env=env) # Make sure, that the truecolor output is different from monochrome output - mono_out = await cli.execute_cli_command("help", stream.list, monochrome) - assert await cli.execute_cli_command("help", stream.list, truecolor) != mono_out + mono_out = await cli.execute_cli_command("help", list_sink, monochrome) + assert await cli.execute_cli_command("help", list_sink, truecolor) != mono_out # We expect the content of the written file to contain monochrome output. assert await cli.execute_cli_command( "help | write write_test.txt", partial(check_file, check_content="".join(mono_out[0]) + "\n"), truecolor @@ -886,7 +886,7 @@ def test_if_set(prop: Any, value: Any) -> None: ) # take 3 instance of type bla and send it to the echo server - result = await cli.execute_cli_command(f"search is(bla) limit 3 | http :{port}/test", stream.list) + result = await cli.execute_cli_command(f"search is(bla) limit 3 | http :{port}/test", list_sink) # one line is returned to the user with a summary of the response types. assert result == [["3 requests with status 200 sent."]] # make sure all 3 requests have been received - the body is the complete json node @@ -897,7 +897,7 @@ def test_if_set(prop: Any, value: Any) -> None: # failing requests are retried requests.clear() - await cli.execute_cli_command(f"search is(bla) limit 1 | http --backoff-base 0.001 :{port}/fail", stream.list) + await cli.execute_cli_command(f"search is(bla) limit 1 | http --backoff-base 0.001 :{port}/fail", list_sink) # 1 request + 3 retries => 4 requests assert len(requests) == 4 @@ -907,7 +907,7 @@ async def test_jira_alias(cli: CLI, echo_http_server: Tuple[int, List[Tuple[Requ port, requests = echo_http_server result = await cli.execute_cli_command( f'search is(bla) | jira --url "http://localhost:{port}/success" --title test --message "test message" --username test --token test --project_id 10000 --reporter_id test', - stream.list, + list_sink, ) assert result == [["1 requests with status 200 sent."]] assert len(requests) == 1 @@ -929,7 +929,7 @@ async def test_pagerduty_alias(cli: CLI, echo_http_server: Tuple[int, List[Tuple port, requests = echo_http_server result = await cli.execute_cli_command( f'search id(0_0) | pagerduty --webhook-url "http://localhost:{port}/success" --summary test --routing-key 123 --dedup-key 234', - stream.list, + list_sink, ) assert result == [["1 requests with status 200 sent."]] assert len(requests) == 1 @@ -967,14 +967,14 @@ async def test_pagerduty_alias(cli: CLI, echo_http_server: Tuple[int, List[Tuple @pytest.mark.asyncio async def test_welcome(cli: CLI) -> None: ctx = CLIContext(console_renderer=ConsoleRenderer.default_renderer()) - result = await cli.execute_cli_command(f"welcome", stream.list, ctx) + result = await cli.execute_cli_command(f"welcome", list_sink, ctx) assert "Resoto" in result[0][0] @pytest.mark.asyncio async def test_tip_of_the_day(cli: CLI) -> None: ctx = CLIContext(console_renderer=ConsoleRenderer.default_renderer()) - result = await cli.execute_cli_command(f"totd", stream.list, ctx) + result = await cli.execute_cli_command(f"totd", list_sink, ctx) assert generic_tips[0].command_line in result[0][0] @@ -982,7 +982,7 @@ async def test_tip_of_the_day(cli: CLI) -> None: async def test_certificate(cli: CLI) -> None: result = await cli.execute_cli_command( f"certificate create --common-name foo.resoto.com --dns-names bla --ip-addresses 1.2.3.4 --days-valid 1", - stream.list, + list_sink, ) # will create 2 files assert len(result[0]) == 2 @@ -1001,14 +1001,14 @@ async def test_execute_task(cli: CLI) -> None: # execute-task in source position source_result = await cli.execute_cli_command( - f'execute-task --command success_task --arg "--foo bla test"', stream.list + f'execute-task --command success_task --arg "--foo bla test"', list_sink ) assert len(source_result[0]) == 1 assert source_result[0] == [{"result": "done!"}] # execute task in flow position: every incoming node creates a new task flow_result = await cli.execute_cli_command( - f'search all limit 3 | execute-task --command success_task --arg "--t {{id}}"', stream.list + f'search all limit 3 | execute-task --command success_task --arg "--t {{id}}"', list_sink ) assert len(flow_result[0]) == 3 @@ -1016,7 +1016,7 @@ async def test_execute_task(cli: CLI) -> None: @pytest.mark.asyncio async def test_history(cli: CLI, filled_graph_db: ArangoGraphDB) -> None: async def history_count(cmd: str) -> int: - result = await cli.execute_cli_command(cmd, stream.list) + result = await cli.execute_cli_command(cmd, list_sink) return len(result[0]) now = utc() @@ -1043,7 +1043,7 @@ async def test_aggregate(dependencies: TenantDependencies) -> None: [{"a": 1, "b": 1, "c": 1}, {"a": 2, "b": 1, "c": 1}, {"a": 3, "b": 2, "c": 1}, {"a": 4, "b": 2, "c": 1}] ) - async def aggregate(agg_str: str) -> List[Json]: + async def aggregate(agg_str: str) -> List[JsonElement]: # type: ignore res = AggregateCommand(dependencies).parse(agg_str) async with (await res.flow(in_stream)).stream() as flow: return [s async for s in flow] @@ -1071,7 +1071,7 @@ async def test_report(cli: CLI, inspector_service: Inspector, test_benchmark: Be T = TypeVar("T") async def execute(cmd: str, _: Type[T]) -> List[T]: - result = await cli.execute_cli_command(cmd, stream.list) + result = await cli.execute_cli_command(cmd, list_sink) return cast(List[T], result[0]) # all benchmarks are listed @@ -1099,12 +1099,13 @@ async def test_apps(cli: CLI, package_manager: PackageManager, infra_apps_runtim T = TypeVar("T") async def execute(cmd: str, _: Type[T]) -> List[T]: - result = await cli.execute_cli_command(cmd, stream.list) + result = await cli.execute_cli_command(cmd, list_sink) return cast(List[T], result[0]) - async def check_file_is_yaml(res: Stream) -> None: + async def check_file_is_yaml(res: JsStream) -> None: async with res.stream() as streamer: async for s in streamer: + assert isinstance(s, str) with open(s, "r") as file: yaml.safe_load(file.read()) @@ -1156,7 +1157,7 @@ async def check_file_is_yaml(res: Stream) -> None: with open(manifest_file, "w", encoding="utf-8") as file: file.write(updated_manifest_str) ctx = CLIContext(uploaded_files={"manifest.yaml": manifest_file}) - update_result = await cli.execute_cli_command(f"apps update cleanup-untagged {manifest_file}", stream.list, ctx) + update_result = await cli.execute_cli_command(f"apps update cleanup-untagged {manifest_file}", list_sink, ctx) assert update_result == [[]] # show the manifest - should be the same as the created one updated_result = await cli.dependencies.infra_apps_package_manager.get_manifest(InfraAppName("cleanup-untagged")) @@ -1175,7 +1176,7 @@ async def check_file_is_yaml(res: Stream) -> None: @pytest.mark.asyncio async def test_user(cli: CLI) -> None: async def execute(cmd: str) -> List[JsonElement]: - all_results = await cli.execute_cli_command(cmd, stream.list) + all_results = await cli.execute_cli_command(cmd, list_sink) return all_results[0] # type: ignore # remove all existing users @@ -1227,7 +1228,7 @@ async def test_graph(cli: CLI, graph_manager: GraphManager, tmp_directory: str) await graph_manager.delete(GraphName("graphtest_import")) async def execute(cmd: str, _: Type[T]) -> List[T]: - result = await cli.execute_cli_command(cmd, stream.list) + result = await cli.execute_cli_command(cmd, list_sink) return cast(List[T], result[0]) # cleanup everything @@ -1295,7 +1296,7 @@ async def execute(cmd: str, _: Type[T]) -> List[T]: dump = os.path.join(tmp_directory, "dump") - async def move_dump(res: Stream) -> None: + async def move_dump(res: JsStream) -> None: async with res.stream() as streamer: async for s in streamer: fp = FilePath.from_path(s) @@ -1307,7 +1308,7 @@ async def move_dump(res: Stream) -> None: ctx = CLIContext(uploaded_files={"dump": dump}) # graph import works too - await cli.execute_cli_command("graph import graphtest_import graphtest.backup", stream.list, ctx) + await cli.execute_cli_command("graph import graphtest_import graphtest.backup", list_sink, ctx) assert await graph_manager.list(GraphName("graphtest_import")) == [GraphName("graphtest_import")] # clean up @@ -1325,12 +1326,13 @@ async def sync_and_check( expected_table: Optional[Callable[[str, int], bool]] = None, expected_tables: Optional[Set[str]] = None, expected_table_count: Optional[int] = None, - ) -> str: - result: List[str] = [] + ) -> Json: + result: List[Json] = [] - async def check(in_: Stream) -> None: + async def check(in_: JsStream) -> None: async with in_.stream() as streamer: async for s in streamer: + assert isinstance(s, dict) path = FilePath.from_path(s) # open sqlite database conn = sqlite3.connect(path.local) @@ -1390,7 +1392,7 @@ async def check(in_: Stream) -> None: # calling db without command will yield an error with pytest.raises(Exception) as ex: - db_res = await cli.execute_cli_command("db", stream.list) + db_res = await cli.execute_cli_command("db", list_sink) assert "Execute `help db` to get more information." in str(ex.value) # make sure argsinfo is available @@ -1400,7 +1402,7 @@ async def check(in_: Stream) -> None: @pytest.mark.asyncio async def test_timeseries(cli: CLI) -> None: async def exec(cmd: str) -> List[JsonElement]: - res = await cli.execute_cli_command(cmd, stream.list) + res = await cli.execute_cli_command(cmd, list_sink) return cast(List[JsonElement], res[0]) tsdb = cli.dependencies.db_access.time_series_db diff --git a/resotolib/pyproject.toml b/resotolib/pyproject.toml index 7eab468ea0..3acb51bb5c 100644 --- a/resotolib/pyproject.toml +++ b/resotolib/pyproject.toml @@ -37,6 +37,7 @@ dependencies = [ "cattrs", "cryptography", "frozendict", + "isodate", "jsons", "networkx", "parsy",