diff --git a/src/snowflake/cli/plugins/nativeapp/commands.py b/src/snowflake/cli/plugins/nativeapp/commands.py index 4b340281fa..2154dd768e 100644 --- a/src/snowflake/cli/plugins/nativeapp/commands.py +++ b/src/snowflake/cli/plugins/nativeapp/commands.py @@ -14,11 +14,12 @@ from __future__ import annotations +import itertools import logging from enum import Enum from pathlib import Path from textwrap import dedent -from typing import List, Optional +from typing import Generator, Iterable, List, Optional, cast import typer from click import ClickException @@ -420,6 +421,11 @@ class RecordType(Enum): SPAN_EVENT = "span_event" +# The default number of lines to print before streaming when running +# snow app events --follow +DEFAULT_EVENT_FOLLOW_LAST = 20 + + @app.command("events", hidden=True, requires_connection=True) @with_project_definition() @nativeapp_definition_v2_to_v1 @@ -445,39 +451,79 @@ def app_events( help="Restrict results to a specific scope name. Can be specified multiple times.", ), first: int = typer.Option( - default=0, help="Fetch only the first N events. Cannot be used with --last." + default=-1, + show_default=False, + help="Fetch only the first N events. Cannot be used with --last.", ), last: int = typer.Option( - default=0, help="Fetch only the last N events. Cannot be used with --first." + default=-1, + show_default=False, + help="Fetch only the last N events. Cannot be used with --first.", + ), + follow: bool = typer.Option( + False, + "--follow", + "-f", + help=( + f"Continue polling for events. Implies --last {DEFAULT_EVENT_FOLLOW_LAST} " + f"unless overridden or the --since flag is used." + ), + ), + follow_interval: int = typer.Option( + 10, + help=f"Polling interval in seconds when using the --follow flag.", ), **options, ): """Fetches events for this app from the event table configured in Snowflake.""" - if first and last: + if first >= 0 and last >= 0: raise ClickException("--first and --last cannot be used together.") + if follow: + if until: + raise ClickException("--follow and --until cannot be used together.") + if first >= 0: + raise ClickException("--follow and --first cannot be used together.") + assert_project_type("native_app") + record_type_names = [r.name for r in record_types] manager = NativeAppManager( project_definition=get_cli_context().project_definition.native_app, project_root=get_cli_context().project_root, ) - events = manager.get_events( - since_interval=since, - until_interval=until, - record_types=[r.name for r in record_types], - scopes=scopes, - first=first, - last=last, - ) - if not events: - return MessageResult("No events found.") - - def g(): - for event in events: - yield EventResult(event) + if follow: + if last == -1 and not since: + # If we don't have a value for --last or --since, assume a value + # for --last so we at least print something before starting the stream + last = DEFAULT_EVENT_FOLLOW_LAST + stream: Iterable[CommandResult] = ( + EventResult(event) + for event in manager.stream_events( + since=since, + last=last, + interval_seconds=follow_interval, + record_types=record_type_names, + scopes=scopes, + ) + ) + # Append a newline at the end to make the CLI output clean when we hit Ctrl-C + stream = itertools.chain(stream, [MessageResult("")]) + else: + stream = ( + EventResult(event) + for event in manager.get_events( + since=since, + until=until, + record_types=record_type_names, + scopes=scopes, + first=first, + last=last, + ) + ) - return StreamResult(g()) + # Cast the stream to a Generator since that's what StreamResult wants + return StreamResult(cast(Generator[CommandResult, None, None], stream)) class EventResult(ObjectResult, MessageResult): diff --git a/src/snowflake/cli/plugins/nativeapp/manager.py b/src/snowflake/cli/plugins/nativeapp/manager.py index 01392cb20e..10334756b4 100644 --- a/src/snowflake/cli/plugins/nativeapp/manager.py +++ b/src/snowflake/cli/plugins/nativeapp/manager.py @@ -16,12 +16,14 @@ import json import os +import time from abc import ABC, abstractmethod from contextlib import contextmanager +from datetime import datetime from functools import cached_property from pathlib import Path from textwrap import dedent -from typing import Any, List, NoReturn, Optional, TypedDict +from typing import Any, Generator, List, NoReturn, Optional, TypedDict import jinja2 from click import ClickException @@ -714,17 +716,17 @@ def get_validation_result(self, use_scratch_stage: bool): def get_events( self, - since_interval: str = "", - until_interval: str = "", + since: str | datetime | None = None, + until: str | datetime | None = None, record_types: list[str] | None = None, scopes: list[str] | None = None, - first: int = 0, - last: int = 0, + first: int = -1, + last: int = -1, ) -> list[dict]: record_types = record_types or [] scopes = scopes or [] - if first and last: + if first >= 0 and last >= 0: raise ValueError("first and last cannot be used together") if not self.account_event_table: @@ -732,16 +734,18 @@ def get_events( # resource_attributes:"snow.database.name" uses the unquoted/uppercase app name app_name = unquote_identifier(self.app_name) - since_clause = ( - f"and timestamp >= sysdate() - interval '{since_interval}'" - if since_interval - else "" - ) - until_clause = ( - f"and timestamp <= sysdate() - interval '{until_interval}'" - if until_interval - else "" - ) + if isinstance(since, datetime): + since_clause = f"and timestamp >= '{since}'" + elif isinstance(since, str) and since: + since_clause = f"and timestamp >= sysdate() - interval '{since}'" + else: + since_clause = "" + if isinstance(until, datetime): + until_clause = f"and timestamp <= '{until}'" + elif isinstance(until, str) and until: + until_clause = f"and timestamp <= sysdate() - interval '{until}'" + else: + until_clause = "" type_in_values = ",".join(f"'{v}'" for v in record_types) types_clause = ( f"and record_type in ({type_in_values})" if type_in_values else "" @@ -750,8 +754,8 @@ def get_events( scopes_clause = ( f"and scope:name in ({scope_in_values})" if scope_in_values else "" ) - first_clause = f"limit {first}" if first else "" - last_clause = f"limit {last}" if last else "" + first_clause = f"limit {first}" if first >= 0 else "" + last_clause = f"limit {last}" if last >= 0 else "" query = dedent( f"""\ select * from ( @@ -773,6 +777,56 @@ def get_events( except ProgrammingError as err: generic_sql_error_handler(err) + def stream_events( + self, + interval_seconds: int, + since: str | datetime | None = None, + record_types: list[str] | None = None, + scopes: list[str] | None = None, + last: int = -1, + ) -> Generator[dict, None, None]: + try: + events = self.get_events( + since=since, record_types=record_types, scopes=scopes, last=last + ) + yield from events # Yield the initial batch of events + last_event_time = events[-1]["TIMESTAMP"] + + while True: # Then infinite poll for new events + time.sleep(interval_seconds) + previous_events = events + events = self.get_events( + since=last_event_time, record_types=record_types, scopes=scopes + ) + if not events: + continue + + yield from _new_events_only(previous_events, events) + last_event_time = events[-1]["TIMESTAMP"] + except KeyboardInterrupt: + return + + +def _new_events_only(previous_events: list[dict], new_events: list[dict]) -> list[dict]: + # The timestamp that overlaps between both sets of events + overlap_time = new_events[0]["TIMESTAMP"] + + # Remove all the events from the new result set + # if they were already printed. We iterate and remove + # instead of filtering in order to handle duplicates + # (i.e. if an event is present 3 times in new_events + # but only once in previous_events, it should still + # appear twice in new_events at the end + new_events = new_events.copy() + for event in reversed(previous_events): + if event["TIMESTAMP"] < overlap_time: + break + # No need to handle ValueError here since we know + # that events that pass the above if check will + # either be in both lists or in new_events only + new_events.remove(event) + return new_events + def _validation_item_to_str(item: dict[str, str | int]): s = item["message"] diff --git a/tests/__snapshots__/test_help_messages.ambr b/tests/__snapshots__/test_help_messages.ambr index 572bf00050..62df3f2862 100644 --- a/tests/__snapshots__/test_help_messages.ambr +++ b/tests/__snapshots__/test_help_messages.ambr @@ -253,31 +253,46 @@ Fetches events for this app from the event table configured in Snowflake. +- Options --------------------------------------------------------------------+ - | --since TEXT Fetch events that are newer than | - | this time ago, in Snowflake | - | interval syntax. | - | --until TEXT Fetch events that are older than | - | this time ago, in Snowflake | - | interval syntax. | - | --type [log|span|span_event] Restrict results to specific | - | record type. Can be specified | - | multiple times. | - | --scope TEXT Restrict results to a specific | - | scope name. Can be specified | - | multiple times. | - | --first INTEGER Fetch only the first N events. | - | Cannot be used with --last. | - | [default: 0] | - | --last INTEGER Fetch only the last N events. | - | Cannot be used with --first. | - | [default: 0] | - | --project -p TEXT Path where Snowflake project | - | resides. Defaults to current | - | working directory. | - | --env TEXT String in format of key=value. | - | Overrides variables from env | - | section used for templating. | - | --help -h Show this message and exit. | + | --since TEXT Fetch events that are | + | newer than this time ago, | + | in Snowflake interval | + | syntax. | + | --until TEXT Fetch events that are | + | older than this time ago, | + | in Snowflake interval | + | syntax. | + | --type [log|span|span_event] Restrict results to | + | specific record type. Can | + | be specified multiple | + | times. | + | --scope TEXT Restrict results to a | + | specific scope name. Can | + | be specified multiple | + | times. | + | --first INTEGER Fetch only the first N | + | events. Cannot be used | + | with --last. | + | --last INTEGER Fetch only the last N | + | events. Cannot be used | + | with --first. | + | --follow -f Continue polling for | + | events. Implies --last 20 | + | unless overridden or the | + | --since flag is used. | + | --follow-interval INTEGER Polling interval in | + | seconds when using the | + | --follow flag. | + | [default: 10] | + | --project -p TEXT Path where Snowflake | + | project resides. Defaults | + | to current working | + | directory. | + | --env TEXT String in format of | + | key=value. Overrides | + | variables from env section | + | used for templating. | + | --help -h Show this message and | + | exit. | +------------------------------------------------------------------------------+ +- Connection configuration ---------------------------------------------------+ | --connection,--environment -c TEXT Name of the connection, as defined | diff --git a/tests/nativeapp/test_manager.py b/tests/nativeapp/test_manager.py index 597e5125c1..01bb8b0561 100644 --- a/tests/nativeapp/test_manager.py +++ b/tests/nativeapp/test_manager.py @@ -15,6 +15,7 @@ import json import os +from datetime import datetime from pathlib import Path from textwrap import dedent from typing import Optional @@ -1358,6 +1359,7 @@ def test_account_event_table_not_set_up(mock_execute, temp_dir, mock_cursor): [ ("", ""), ("1 hour", "and timestamp >= sysdate() - interval '1 hour'"), + (datetime(2024, 1, 1), "and timestamp >= '2024-01-01 00:00:00'"), ], ) @pytest.mark.parametrize( @@ -1365,6 +1367,7 @@ def test_account_event_table_not_set_up(mock_execute, temp_dir, mock_cursor): [ ("", ""), ("20 minutes", "and timestamp <= sysdate() - interval '20 minutes'"), + (datetime(2024, 1, 1), "and timestamp <= '2024-01-01 00:00:00'"), ], ) @pytest.mark.parametrize( @@ -1386,14 +1389,16 @@ def test_account_event_table_not_set_up(mock_execute, temp_dir, mock_cursor): @pytest.mark.parametrize( ["first", "expected_first_clause"], [ - (0, ""), + (-1, ""), + (0, "limit 0"), (10, "limit 10"), ], ) @pytest.mark.parametrize( ["last", "expected_last_clause"], [ - (0, ""), + (-1, ""), + (0, "limit 0"), (20, "limit 20"), ], ) @@ -1427,7 +1432,7 @@ def test_get_events( contents=[mock_snowflake_yml_file], ) - events = [dict(TIMESTAMP="2020-01-01T00:00:00Z", VALUE="test")] * 100 + events = [dict(TIMESTAMP=datetime(2024, 1, 1), VALUE="test")] * 100 side_effects, expected = mock_execute_helper( [ ( @@ -1459,15 +1464,15 @@ def test_get_events( def get_events(): native_app_manager = _get_na_manager() return native_app_manager.get_events( - since_interval=since, - until_interval=until, + since=since, + until=until, record_types=types, scopes=scopes, first=first, last=last, ) - if first and last: + if first >= 0 and last >= 0: # Filtering on first and last events at the same time doesn't make sense with pytest.raises(ValueError): get_events() @@ -1496,7 +1501,7 @@ def test_get_events_quoted_app_name( contents=[quoted_override_yml_file], ) - events = [dict(TIMESTAMP="2020-01-01T00:00:00Z", VALUE="test")] * 100 + events = [dict(TIMESTAMP=datetime(2024, 1, 1), VALUE="test")] * 100 side_effects, expected = mock_execute_helper( [ ( @@ -1545,3 +1550,91 @@ def test_get_events_no_event_table(mock_account_event_table, temp_dir, mock_curs native_app_manager = _get_na_manager() with pytest.raises(NoEventTableForAccount): native_app_manager.get_events() + + +@mock.patch( + NATIVEAPP_MANAGER_ACCOUNT_EVENT_TABLE, + return_value="db.schema.event_table", + new_callable=mock.PropertyMock, +) +@mock.patch(NATIVEAPP_MANAGER_EXECUTE) +def test_stream_events(mock_execute, mock_account_event_table, temp_dir, mock_cursor): + create_named_file( + file_name="snowflake.yml", + dir_name=temp_dir, + contents=[mock_snowflake_yml_file], + ) + + events = [ + [dict(TIMESTAMP=datetime(2024, 1, 1), VALUE="test")] * 10, + [dict(TIMESTAMP=datetime(2024, 1, 2), VALUE="test")] * 10, + ] + side_effects, expected = mock_execute_helper( + [ + ( + mock_cursor(events[0], []), + mock.call( + dedent( + f"""\ + select * from ( + select timestamp, value::varchar value + from db.schema.event_table + where resource_attributes:"snow.database.name" = 'MYAPP' + + + + + order by timestamp desc + limit {len(events[0])} + ) order by timestamp asc + + """ + ), + cursor_class=DictCursor, + ), + ), + ( + mock_cursor(events[1], []), + mock.call( + dedent( + """\ + select * from ( + select timestamp, value::varchar value + from db.schema.event_table + where resource_attributes:"snow.database.name" = 'MYAPP' + and timestamp >= '2024-01-01 00:00:00' + + + + order by timestamp desc + + ) order by timestamp asc + + """ + ), + cursor_class=DictCursor, + ), + ), + ] + ) + mock_execute.side_effect = side_effects + + native_app_manager = _get_na_manager() + stream = native_app_manager.stream_events(interval_seconds=0, last=len(events[0])) + for i in range(len(events[0])): + # Exhaust the initial set of events + assert next(stream) == events[0][i] + assert mock_execute.call_count == 1 + + for i in range(len(events[1])): + # Then it'll make another query which returns the second set of events + assert next(stream) == events[1][i] + assert mock_execute.call_count == 2 + assert mock_execute.mock_calls == expected + + try: + stream.throw(KeyboardInterrupt) + except StopIteration: + pass + else: + pytest.fail("stream_events didn't end when receiving a KeyboardInterrupt") diff --git a/tests_integration/nativeapp/test_events.py b/tests_integration/nativeapp/test_events.py index bfed23363d..7216cd9a52 100644 --- a/tests_integration/nativeapp/test_events.py +++ b/tests_integration/nativeapp/test_events.py @@ -26,18 +26,38 @@ @pytest.mark.integration @enable_definition_v2_feature_flag @pytest.mark.parametrize("test_project", ["napp_init_v1", "napp_init_v2"]) -def test_app_events_cant_specify_first_and_last( - test_project, runner, project_directory +@pytest.mark.parametrize( + ["flag_names", "command"], + [ + [ + ["--first", "--last"], + ["--first", "10", "--last", "20"], + ], + [ + ["--follow", "--first"], + ["--first", "10", "--follow"], + ], + [ + ["--follow", "--until"], + ["--until", "5 minutes", "--follow"], + ], + ], +) +def test_app_events_mutually_exclusive_options( + test_project, runner, project_directory, flag_names, command ): with project_directory(test_project): # The integration test account doesn't have an event table set up # but this test is still useful to validate the negative case result = runner.invoke_with_connection( - ["app", "events", "--first", "10", "--last", "20"], + ["app", "events", *command], env=TEST_ENV, ) assert result.exit_code == 1, result.output - assert "--first and --last cannot be used together." in result.output + assert ( + f"{flag_names[0]} and {flag_names[1]} cannot be used together." + in result.output + ) @pytest.mark.integration