From acfd4254a7ca79308718ce4e42bb645a92c6ef91 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Mon, 11 Sep 2023 08:18:06 -0500 Subject: [PATCH 01/15] added unified filesource stage --- morpheus/cli/commands.py | 1 + morpheus/stages/input/file_source.py | 314 +++++++++++++++++++++++++++ 2 files changed, 315 insertions(+) create mode 100644 morpheus/stages/input/file_source.py diff --git a/morpheus/cli/commands.py b/morpheus/cli/commands.py index fae791cf0f..8fc7096f1a 100644 --- a/morpheus/cli/commands.py +++ b/morpheus/cli/commands.py @@ -650,6 +650,7 @@ def post_pipeline(ctx: click.Context, *args, **kwargs): add_command("delay", "morpheus.stages.general.delay_stage.DelayStage", modes=ALL) add_command("deserialize", "morpheus.stages.preprocess.deserialize_stage.DeserializeStage", modes=NOT_AE) add_command("dropna", "morpheus.stages.preprocess.drop_null_stage.DropNullStage", modes=NOT_AE) +add_command("file-source", "morpheus.stages.input.file_source.FileSource", modes=NOT_AE) add_command("filter", "morpheus.stages.postprocess.filter_detections_stage.FilterDetectionsStage", modes=ALL) add_command("from-azure", "morpheus.stages.input.azure_source_stage.AzureSourceStage", modes=AE_ONLY) add_command("from-appshield", "morpheus.stages.input.appshield_source_stage.AppShieldSourceStage", modes=FIL_ONLY) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py new file mode 100644 index 0000000000..8b58a5a8e0 --- /dev/null +++ b/morpheus/stages/input/file_source.py @@ -0,0 +1,314 @@ +# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""File source stage.""" + +import logging +import time +import typing +from functools import partial +from urllib.parse import urlsplit + +import fsspec +import mrc +from mrc.core import operators as ops + +from morpheus.cli import register_stage +from morpheus.common import FileTypes +from morpheus.config import Config +from morpheus.config import PipelineModes +from morpheus.io.deserializers import read_file_to_df +from morpheus.messages import MessageMeta +from morpheus.pipeline.preallocator_mixin import PreallocatorMixin +from morpheus.pipeline.single_output_source import SingleOutputSource +from morpheus.pipeline.stream_pair import StreamPair +from morpheus.utils.directory_watcher import DirectoryWatcher + +logger = logging.getLogger(__name__) + + +@register_stage("file-source", modes=[PipelineModes.FIL, PipelineModes.NLP, PipelineModes.OTHER]) +class FileSource(PreallocatorMixin, SingleOutputSource): + """ + Load messages from a file. + + Source stage is used to load messages from a file and dumping the contents into the pipeline immediately. Useful for + testing performance and accuracy of a pipeline. + + Parameters + ---------- + c : `morpheus.config.Config` + Pipeline configuration instance. + files : List[str] + List of paths to be read from, can be a list of S3 URLs (`s3://path`) and can include wildcard characters `*` + as defined by `fsspec`: + https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files + watch : bool, default = False + When True, will check `files` for new files and emit them as they appear. (Note: `watch_interval` is + applicable when `watch` is True and there are no remote paths in `files`.) + watch_interval : float, default = 1.0 + When `watch` is True, this is the time in seconds between polling the paths in `files` for new files. + (Note: Applicable when path in `files` are remote and when `watch` is True) + sort_glob : bool, default = False + If true, the list of files matching `input_glob` will be processed in sorted order. + (Note: Applicable when all paths in `files` are local.) + recursive : bool, default = True + If true, events will be emitted for the files in subdirectories matching `input_glob`. + (Note: Applicable when all paths in `files` are local.) + queue_max_size : int, default = 128 + Maximum queue size to hold the file paths to be processed that match `input_glob`. + (Note: Applicable when all paths in `files` are local.) + batch_timeout : float, default = 5.0 + Timeout to retrieve batch messages from the queue. + (Note: Applicable when all paths in `files` are local.) + file_type : `morpheus.common.FileTypes`, optional, case_sensitive = False + Indicates what type of file to read. Specifying 'auto' will determine the file type from the extension. + Supported extensions: 'csv', 'json', 'jsonlines' and 'parquet'. + repeat : int, default = 1, min = 1 + Repeats the input dataset multiple times. Useful to extend small datasets for debugging. + filter_null : bool, default = True + Whether or not to filter rows with a null 'data' column. Null values in the 'data' column can cause issues down + the line with processing. Setting this to True is recommended. + parser_kwargs : dict, default = {} + Extra options to pass to the file parser. + """ + + def __init__(self, + c: Config, + files: typing.List[str], + watch: bool = False, + watch_interval: float = 1.0, + sort_glob: bool = False, + recursive: bool = True, + queue_max_size: int = 128, + batch_timeout: float = 5.0, + file_type: FileTypes = FileTypes.Auto, + repeat: int = 1, + filter_null: bool = True, + parser_kwargs: dict = None): + + super().__init__(c) + + self._batch_size = c.pipeline_batch_size + + if not files: + raise ValueError("The 'files' cannot be empty.") + + if watch and len(files) != 1: + raise ValueError("When 'watch' is True, the 'files' should contain exactly one file path.") + + self._files = list(files) + self._watch = watch + self._sort_glob = sort_glob + self._recursive = recursive + self._queue_max_size = queue_max_size + self._batch_timeout = batch_timeout + self._file_type = file_type + self._filter_null = filter_null + self._parser_kwargs = parser_kwargs or {} + self._watch_interval = watch_interval + self._repeat_count = repeat + + @property + def name(self) -> str: + """Return the name of the stage""" + return "file-source" + + def supports_cpp_node(self) -> bool: + """Indicates whether or not this stage supports a C++ node""" + return True + + def _has_remote_paths(self): + return any(urlsplit(file).scheme for file in self._files if "://" in file) + + def _build_source(self, builder: mrc.Builder) -> StreamPair: + if self._build_cpp_node(): + raise RuntimeError("Does not support C++ nodes") + + if self._watch and not self._has_remote_paths(): + # When watching a directory, we use the directory path for monitoring. + input_glob = self._files[0] + watcher = DirectoryWatcher(input_glob=input_glob, + watch_directory=self._watch, + max_files=None, + sort_glob=self._sort_glob, + recursive=self._recursive, + queue_max_size=self._queue_max_size, + batch_timeout=self._batch_timeout) + out_stream = watcher.build_node(self.unique_name, builder) + + out_type = typing.List[str] + else: + if self._watch: + generator_function = self._polling_generate_frames_fsspec + else: + generator_function = self._generate_frames_fsspec + + out_stream = builder.make_source(self.unique_name, generator_function()) + out_type = fsspec.core.OpenFiles + + # Supposed to just return a source here + return out_stream, out_type + + def _generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]: + + files: fsspec.core.OpenFiles = fsspec.open_files(self._files) + + if (len(files) == 0): + raise RuntimeError(f"No files matched input strings: '{self._files}'. " + "Check your input pattern and ensure any credentials are correct") + + yield files + + def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]: + files_seen = set() + curr_time = time.monotonic() + next_update_epoch = curr_time + + while (True): + # Before doing any work, find the next update epoch after the current time + while (next_update_epoch <= curr_time): + # Only ever add `self._watch_interval` to next_update_epoch so all updates are at repeating intervals + next_update_epoch += self._watch_interval + + file_set = set() + filtered_files = [] + + files = fsspec.open_files(self._files) + for file in files: + file_set.add(file.full_name) + if file.full_name not in files_seen: + filtered_files.append(file) + + # Replace files_seen with the new set of files. This prevents a memory leak that could occurr if files are + # deleted from the input directory. In addition if a file with a given name was created, seen/processed by + # the stage, and then deleted, and a new file with the same name appeared sometime later, the stage will + # need to re-ingest that new file. + files_seen = file_set + + if len(filtered_files) > 0: + yield fsspec.core.OpenFiles(filtered_files, fs=files.fs) + + curr_time = time.monotonic() + + # If we spent more than `self._watch_interval` doing work and/or yielding to the output channel blocked, + # then we should only sleep for the remaining time until the next update epoch. + sleep_duration = next_update_epoch - curr_time + if (sleep_duration > 0): + time.sleep(sleep_duration) + curr_time = time.monotonic() + + @staticmethod + def generate_frames(file: fsspec.core.OpenFiles, + file_type: FileTypes, + filter_null: bool, + parser_kwargs: dict, + repeat_count: int) -> list[MessageMeta]: + """ + Generate message frames from a file. + + This function reads data from a file and generates message frames (MessageMeta) based on the file's content. + It can be used to load and process messages from a file for testing and analysis within a Morpheus pipeline. + + Parameters + ---------- + file : fsspec.core.OpenFiles + An open file object obtained using fsspec's `open_files` function. + file_type : FileTypes + Indicates the type of the file to read. Supported types include 'csv', 'json', 'jsonlines', and 'parquet'. + filter_null : bool + Determines whether to filter out rows with null values in the 'data' column. Filtering null values is + recommended to prevent potential issues during processing. + parser_kwargs : dict + Additional keyword arguments to pass to the file parser. + repeat_count : int + The number of times to repeat the data reading process. Each repetition generates a new set of message + frames. + + Returns + ------- + List[MessageMeta] + MessageMeta objects, each containing a dataframe of messages from the file. + """ + df = read_file_to_df( + file.full_name, + file_type=file_type, + filter_nulls=filter_null, + parser_kwargs=parser_kwargs, + df_type="cudf", + ) + + metas = [] + + for i in range(repeat_count): + + x = MessageMeta(df) + + # If we are looping, copy the object. Do this before we push the object in case it changes + if (i + 1 < repeat_count): + df = df.copy() + + # Shift the index to allow for unique indices without reading more data + df.index += len(df) + + metas.append(x) + + return metas + + @staticmethod + def convert_list_to_fsspec_files( + files: typing.Union[typing.List[str], fsspec.core.OpenFiles]) -> fsspec.core.OpenFiles: + """ + Convert a list of file paths to fsspec OpenFiles. + + This static method takes a list of file paths or an existing fsspec OpenFiles object and ensures that the + input is converted to an OpenFiles object for uniform handling in Morpheus pipeline stages. + + Parameters + ---------- + files : Union[List[str], fsspec.core.OpenFiles] + A list of file paths or an existing fsspec OpenFiles object. + + Returns + ------- + fsspec.core.OpenFiles + An fsspec OpenFiles object representing the input files. + """ + + if isinstance(files, list): + files: fsspec.core.OpenFiles = fsspec.open_files(files) + + return files + + def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> StreamPair: + + out_stream = out_pair[0] + + post_node = builder.make_node( + self.unique_name + "-post", + ops.map(self.convert_list_to_fsspec_files), + ops.flatten(), + ops.map( + partial(self.generate_frames, + file_type=self._file_type, + filter_null=self._filter_null, + parser_kwargs=self._parser_kwargs, + repeat_count=self._repeat_count)), + ops.flatten()) + + builder.make_edge(out_stream, post_node) + + out_stream = post_node + out_type = MessageMeta + + return super()._post_build_single(builder, (out_stream, out_type)) From dca5e9160c8481302b9c4cc35f77e43081b1c6be Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Mon, 11 Sep 2023 08:27:18 -0500 Subject: [PATCH 02/15] type annotation correction --- morpheus/stages/input/file_source.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py index 8b58a5a8e0..6e03c36b58 100644 --- a/morpheus/stages/input/file_source.py +++ b/morpheus/stages/input/file_source.py @@ -128,7 +128,7 @@ def supports_cpp_node(self) -> bool: """Indicates whether or not this stage supports a C++ node""" return True - def _has_remote_paths(self): + def _has_remote_paths(self) -> bool: return any(urlsplit(file).scheme for file in self._files if "://" in file) def _build_source(self, builder: mrc.Builder) -> StreamPair: @@ -147,7 +147,7 @@ def _build_source(self, builder: mrc.Builder) -> StreamPair: batch_timeout=self._batch_timeout) out_stream = watcher.build_node(self.unique_name, builder) - out_type = typing.List[str] + out_type = list[str] else: if self._watch: generator_function = self._polling_generate_frames_fsspec @@ -267,7 +267,7 @@ def generate_frames(file: fsspec.core.OpenFiles, @staticmethod def convert_list_to_fsspec_files( - files: typing.Union[typing.List[str], fsspec.core.OpenFiles]) -> fsspec.core.OpenFiles: + files: typing.Union[list[str], fsspec.core.OpenFiles]) -> fsspec.core.OpenFiles: """ Convert a list of file paths to fsspec OpenFiles. From d2f8ff8ead2eebac1add8b1e5273850e2ee5895b Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Mon, 11 Sep 2023 10:28:02 -0500 Subject: [PATCH 03/15] added unified filesource stage --- morpheus/stages/input/file_source.py | 15 +++--- tests/test_file_source.py | 77 ++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 9 deletions(-) create mode 100644 tests/test_file_source.py diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py index 6e03c36b58..1908cf7ca8 100644 --- a/morpheus/stages/input/file_source.py +++ b/morpheus/stages/input/file_source.py @@ -126,14 +126,12 @@ def name(self) -> str: def supports_cpp_node(self) -> bool: """Indicates whether or not this stage supports a C++ node""" - return True + return False def _has_remote_paths(self) -> bool: return any(urlsplit(file).scheme for file in self._files if "://" in file) def _build_source(self, builder: mrc.Builder) -> StreamPair: - if self._build_cpp_node(): - raise RuntimeError("Does not support C++ nodes") if self._watch and not self._has_remote_paths(): # When watching a directory, we use the directory path for monitoring. @@ -209,7 +207,7 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil curr_time = time.monotonic() @staticmethod - def generate_frames(file: fsspec.core.OpenFiles, + def generate_frames(file: fsspec.core.OpenFile, file_type: FileTypes, filter_null: bool, parser_kwargs: dict, @@ -222,8 +220,8 @@ def generate_frames(file: fsspec.core.OpenFiles, Parameters ---------- - file : fsspec.core.OpenFiles - An open file object obtained using fsspec's `open_files` function. + file : fsspec.core.OpenFile + An open file object using fsspec. file_type : FileTypes Indicates the type of the file to read. Supported types include 'csv', 'json', 'jsonlines', and 'parquet'. filter_null : bool @@ -266,8 +264,7 @@ def generate_frames(file: fsspec.core.OpenFiles, return metas @staticmethod - def convert_list_to_fsspec_files( - files: typing.Union[list[str], fsspec.core.OpenFiles]) -> fsspec.core.OpenFiles: + def convert_to_fsspec_files(files: typing.Union[list[str], fsspec.core.OpenFiles]) -> fsspec.core.OpenFiles: """ Convert a list of file paths to fsspec OpenFiles. @@ -296,7 +293,7 @@ def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> Stre post_node = builder.make_node( self.unique_name + "-post", - ops.map(self.convert_list_to_fsspec_files), + ops.map(self.convert_to_fsspec_files), ops.flatten(), ops.map( partial(self.generate_frames, diff --git a/tests/test_file_source.py b/tests/test_file_source.py new file mode 100644 index 0000000000..d2a30585af --- /dev/null +++ b/tests/test_file_source.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os + +import fsspec +import pytest + +from _utils import TEST_DIRS +from morpheus.common import FileTypes +from morpheus.stages.input.file_source import FileSource + + +@pytest.fixture(name="input_file", scope="function") +def file_fixture(): + return fsspec.open(os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.json")) + + +@pytest.mark.use_python +def test_constructor(config): + file_source = FileSource( + config, + files=["path/to/file.json", "path/to/another.json"], + watch=True, + sort_glob=True, + recursive=False, + queue_max_size=256, + batch_timeout=10.0, + file_type="json", + repeat=3, + filter_null=False, + parser_kwargs={"key": "value"}, + watch_interval=2.0, + ) + + assert file_source._files == ["path/to/file.json", "path/to/another.json"] + assert file_source._watch + assert file_source._sort_glob + assert not file_source._recursive + assert file_source._queue_max_size == 256 + assert file_source._batch_timeout == 10.0 + assert file_source._file_type == "json" + assert not file_source._filter_null + assert file_source._parser_kwargs == {"key": "value"} + assert file_source._watch_interval == 2.0 + assert file_source._repeat_count == 3 + + +@pytest.mark.use_python +@pytest.mark.parametrize("input_files", [["file1.json", "file2.json"], []]) +def test_constructor_with_invalid_params(config, input_files): + with pytest.raises(ValueError): + # 'watch' is True, but multiple files are provided + FileSource(config, files=input_files, watch=True) + + +@pytest.mark.parametrize("input_files", [["file1.json", "file2.json"]]) +def test_convert_to_fsspec_files(input_files): + actual_output = FileSource.convert_to_fsspec_files(files=input_files) + + assert isinstance(actual_output, fsspec.core.OpenFiles) + assert os.path.basename(actual_output[0].full_name) == input_files[0] + assert os.path.basename(actual_output[1].full_name) == input_files[1] From 51cdc0c31340d2c6b21251e6fe97c8786d6cb15d Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Mon, 11 Sep 2023 17:35:05 -0500 Subject: [PATCH 04/15] Added tests to file source stage --- morpheus/stages/input/file_source.py | 34 +++++++----- tests/test_file_source.py | 78 +++++++++++++++++++++++++--- 2 files changed, 91 insertions(+), 21 deletions(-) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py index 1908cf7ca8..02fad45cc7 100644 --- a/morpheus/stages/input/file_source.py +++ b/morpheus/stages/input/file_source.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2023, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -99,8 +99,6 @@ def __init__(self, super().__init__(c) - self._batch_size = c.pipeline_batch_size - if not files: raise ValueError("The 'files' cannot be empty.") @@ -133,18 +131,21 @@ def _has_remote_paths(self) -> bool: def _build_source(self, builder: mrc.Builder) -> StreamPair: + if self._build_cpp_node(): + raise RuntimeError("Does not support C++ nodes") + if self._watch and not self._has_remote_paths(): - # When watching a directory, we use the directory path for monitoring. input_glob = self._files[0] - watcher = DirectoryWatcher(input_glob=input_glob, - watch_directory=self._watch, - max_files=None, - sort_glob=self._sort_glob, - recursive=self._recursive, - queue_max_size=self._queue_max_size, - batch_timeout=self._batch_timeout) - out_stream = watcher.build_node(self.unique_name, builder) + watcher = DirectoryWatcher( + input_glob=input_glob, + watch_directory=self._watch, + max_files=None, # This is not being used in the latest version. + sort_glob=self._sort_glob, + recursive=self._recursive, + queue_max_size=self._queue_max_size, + batch_timeout=self._batch_timeout) + out_stream = watcher.build_node(self.unique_name, builder) out_type = list[str] else: if self._watch: @@ -166,6 +167,9 @@ def _generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]: raise RuntimeError(f"No files matched input strings: '{self._files}'. " "Check your input pattern and ensure any credentials are correct") + if self._sort_glob: + files = sorted(files, key=lambda f: f.full_name) + yield files def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]: @@ -195,6 +199,8 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil files_seen = file_set if len(filtered_files) > 0: + if self._sort_glob: + filtered_files = sorted(filtered_files, key=lambda f: f.full_name) yield fsspec.core.OpenFiles(filtered_files, fs=files.fs) curr_time = time.monotonic() @@ -281,8 +287,8 @@ def convert_to_fsspec_files(files: typing.Union[list[str], fsspec.core.OpenFiles fsspec.core.OpenFiles An fsspec OpenFiles object representing the input files. """ - - if isinstance(files, list): + # Check if the list contains string items by checking the type of the first element + if files and isinstance(files[0], str): files: fsspec.core.OpenFiles = fsspec.open_files(files) return files diff --git a/tests/test_file_source.py b/tests/test_file_source.py index d2a30585af..1b811bbea1 100644 --- a/tests/test_file_source.py +++ b/tests/test_file_source.py @@ -14,19 +14,24 @@ # See the License for the specific language governing permissions and # limitations under the License. - import os import fsspec import pytest +import cudf + from _utils import TEST_DIRS from morpheus.common import FileTypes +from morpheus.messages.message_meta import MessageMeta +from morpheus.pipeline.pipeline import Pipeline from morpheus.stages.input.file_source import FileSource +from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage + +@pytest.fixture(name="files", scope="function") +def files_fixture(): -@pytest.fixture(name="input_file", scope="function") -def file_fixture(): return fsspec.open(os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.json")) @@ -34,26 +39,26 @@ def file_fixture(): def test_constructor(config): file_source = FileSource( config, - files=["path/to/file.json", "path/to/another.json"], + files=["path/to/*.json"], watch=True, sort_glob=True, recursive=False, queue_max_size=256, batch_timeout=10.0, - file_type="json", + file_type=FileTypes.JSON, repeat=3, filter_null=False, parser_kwargs={"key": "value"}, watch_interval=2.0, ) - assert file_source._files == ["path/to/file.json", "path/to/another.json"] + assert file_source._files == ["path/to/*.json"] assert file_source._watch assert file_source._sort_glob assert not file_source._recursive assert file_source._queue_max_size == 256 assert file_source._batch_timeout == 10.0 - assert file_source._file_type == "json" + assert file_source._file_type == FileTypes.JSON assert not file_source._filter_null assert file_source._parser_kwargs == {"key": "value"} assert file_source._watch_interval == 2.0 @@ -75,3 +80,62 @@ def test_convert_to_fsspec_files(input_files): assert isinstance(actual_output, fsspec.core.OpenFiles) assert os.path.basename(actual_output[0].full_name) == input_files[0] assert os.path.basename(actual_output[1].full_name) == input_files[1] + + +@pytest.mark.use_python +@pytest.mark.parametrize( + "input_file,filetypes,filter_null,parser_kwargs, repeat_count, expected_count, expected_df_count", + [("filter_probs.json", FileTypes.Auto, False, { + "lines": False + }, 1, 1, 20), ("filter_probs.csv", FileTypes.CSV, False, {}, 2, 2, 20), + ("filter_probs.jsonlines", FileTypes.JSON, False, { + "lines": True + }, 1, 1, 20)]) +def test_generate_frames(input_file, + filetypes, + filter_null, + parser_kwargs, + repeat_count, + expected_count, + expected_df_count): + in_file = fsspec.open(os.path.join(TEST_DIRS.tests_data_dir, input_file)) + + metas = FileSource.generate_frames(file=in_file, + file_type=filetypes, + filter_null=filter_null, + parser_kwargs=parser_kwargs, + repeat_count=repeat_count) + + assert len(metas) == expected_count + assert len(metas[0].df.columns) == 4 + assert len(metas[0].df) == expected_df_count + assert isinstance(metas[0], MessageMeta) + assert isinstance(metas[0].df, cudf.DataFrame) + + +@pytest.mark.use_python +@pytest.mark.parametrize("input_files,parser_kwargs,repeat,expected_count", + [([ + "s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T00_05_06.806Z.json", + "s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T12_09_47.901Z.json" + ], { + "lines": False, "orient": "records" + }, + 1, + 2), ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, 1, 3), + ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, 2, 6)]) +def test_filesource_with_watch_false(config, input_files, parser_kwargs, repeat, expected_count): + + pipe = Pipeline(config) + + file_source_stage = FileSource(config, files=input_files, watch=False, parser_kwargs=parser_kwargs, repeat=repeat) + sink_stage = InMemorySinkStage(config) + + pipe.add_stage(file_source_stage) + pipe.add_stage(sink_stage) + + pipe.add_edge(file_source_stage, sink_stage) + + pipe.run() + + assert len(sink_stage.get_messages()) == expected_count From 32883c0097094bc5672039689353dcfdc5866770 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Mon, 11 Sep 2023 20:40:23 -0500 Subject: [PATCH 05/15] Added tests to file source stage --- morpheus/stages/input/file_source.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py index 02fad45cc7..175cf7be57 100644 --- a/morpheus/stages/input/file_source.py +++ b/morpheus/stages/input/file_source.py @@ -47,7 +47,7 @@ class FileSource(PreallocatorMixin, SingleOutputSource): Parameters ---------- - c : `morpheus.config.Config` + config : `morpheus.config.Config` Pipeline configuration instance. files : List[str] List of paths to be read from, can be a list of S3 URLs (`s3://path`) and can include wildcard characters `*` @@ -84,7 +84,7 @@ class FileSource(PreallocatorMixin, SingleOutputSource): """ def __init__(self, - c: Config, + config: Config, files: typing.List[str], watch: bool = False, watch_interval: float = 1.0, @@ -97,7 +97,7 @@ def __init__(self, filter_null: bool = True, parser_kwargs: dict = None): - super().__init__(c) + super().__init__(config) if not files: raise ValueError("The 'files' cannot be empty.") @@ -201,6 +201,7 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil if len(filtered_files) > 0: if self._sort_glob: filtered_files = sorted(filtered_files, key=lambda f: f.full_name) + yield fsspec.core.OpenFiles(filtered_files, fs=files.fs) curr_time = time.monotonic() @@ -287,9 +288,10 @@ def convert_to_fsspec_files(files: typing.Union[list[str], fsspec.core.OpenFiles fsspec.core.OpenFiles An fsspec OpenFiles object representing the input files. """ - # Check if the list contains string items by checking the type of the first element - if files and isinstance(files[0], str): - files: fsspec.core.OpenFiles = fsspec.open_files(files) + + # Convert fsspec open files + if not isinstance(files, fsspec.core.OpenFiles): + files = fsspec.open_files(files) return files @@ -300,13 +302,13 @@ def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> Stre post_node = builder.make_node( self.unique_name + "-post", ops.map(self.convert_to_fsspec_files), - ops.flatten(), + ops.flatten(), # Flatten list of open fsspec files ops.map( partial(self.generate_frames, file_type=self._file_type, filter_null=self._filter_null, parser_kwargs=self._parser_kwargs, - repeat_count=self._repeat_count)), + repeat_count=self._repeat_count)), # Generate dataframe for each file ops.flatten()) builder.make_edge(out_stream, post_node) From 5aa68e508a2109b8473199afc3aad55b3e0ab881 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Tue, 12 Sep 2023 16:40:33 -0500 Subject: [PATCH 06/15] Update morpheus/stages/input/file_source.py Co-authored-by: Christopher Harris --- morpheus/stages/input/file_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py index 175cf7be57..81834fdc7e 100644 --- a/morpheus/stages/input/file_source.py +++ b/morpheus/stages/input/file_source.py @@ -47,7 +47,7 @@ class FileSource(PreallocatorMixin, SingleOutputSource): Parameters ---------- - config : `morpheus.config.Config` + config : morpheus.config.Config Pipeline configuration instance. files : List[str] List of paths to be read from, can be a list of S3 URLs (`s3://path`) and can include wildcard characters `*` From bc73877efc90ebc1ff69243b9eb1165adf33a26b Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Tue, 12 Sep 2023 16:40:43 -0500 Subject: [PATCH 07/15] Update morpheus/stages/input/file_source.py Co-authored-by: Christopher Harris --- morpheus/stages/input/file_source.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py index 81834fdc7e..fad6ae20c5 100644 --- a/morpheus/stages/input/file_source.py +++ b/morpheus/stages/input/file_source.py @@ -42,8 +42,7 @@ class FileSource(PreallocatorMixin, SingleOutputSource): """ Load messages from a file. - Source stage is used to load messages from a file and dumping the contents into the pipeline immediately. Useful for - testing performance and accuracy of a pipeline. + FileSource is used to produce messages loaded from a file. Useful for testing performance and accuracy of a pipeline. Parameters ---------- From f27ea806d3db53eb764230cea270c25ede357143 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Tue, 12 Sep 2023 16:50:38 -0500 Subject: [PATCH 08/15] Update morpheus/stages/input/file_source.py Co-authored-by: Christopher Harris --- morpheus/stages/input/file_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py index fad6ae20c5..42253e7fb7 100644 --- a/morpheus/stages/input/file_source.py +++ b/morpheus/stages/input/file_source.py @@ -98,7 +98,7 @@ def __init__(self, super().__init__(config) - if not files: + if not files or len(files) == 0: raise ValueError("The 'files' cannot be empty.") if watch and len(files) != 1: From ea56c5f647a2247d5cd887529118983409c00d80 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Tue, 12 Sep 2023 23:14:39 -0500 Subject: [PATCH 09/15] updates to file source stage --- morpheus/stages/input/file_source.py | 196 +++++++++++---------------- tests/test_file_source.py | 91 +++++-------- 2 files changed, 109 insertions(+), 178 deletions(-) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py index 175cf7be57..8ba2eaba09 100644 --- a/morpheus/stages/input/file_source.py +++ b/morpheus/stages/input/file_source.py @@ -21,6 +21,7 @@ import fsspec import mrc +import s3fs from mrc.core import operators as ops from morpheus.cli import register_stage @@ -32,7 +33,6 @@ from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.single_output_source import SingleOutputSource from morpheus.pipeline.stream_pair import StreamPair -from morpheus.utils.directory_watcher import DirectoryWatcher logger = logging.getLogger(__name__) @@ -54,33 +54,18 @@ class FileSource(PreallocatorMixin, SingleOutputSource): as defined by `fsspec`: https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files watch : bool, default = False - When True, will check `files` for new files and emit them as they appear. (Note: `watch_interval` is - applicable when `watch` is True and there are no remote paths in `files`.) + When True, will check `files` for new files and emit them as they appear. watch_interval : float, default = 1.0 When `watch` is True, this is the time in seconds between polling the paths in `files` for new files. - (Note: Applicable when path in `files` are remote and when `watch` is True) - sort_glob : bool, default = False - If true, the list of files matching `input_glob` will be processed in sorted order. - (Note: Applicable when all paths in `files` are local.) - recursive : bool, default = True - If true, events will be emitted for the files in subdirectories matching `input_glob`. - (Note: Applicable when all paths in `files` are local.) - queue_max_size : int, default = 128 - Maximum queue size to hold the file paths to be processed that match `input_glob`. - (Note: Applicable when all paths in `files` are local.) - batch_timeout : float, default = 5.0 - Timeout to retrieve batch messages from the queue. - (Note: Applicable when all paths in `files` are local.) + sort : bool, default = False + If true, the list of files will be processed in sorted order. file_type : `morpheus.common.FileTypes`, optional, case_sensitive = False Indicates what type of file to read. Specifying 'auto' will determine the file type from the extension. Supported extensions: 'csv', 'json', 'jsonlines' and 'parquet'. - repeat : int, default = 1, min = 1 - Repeats the input dataset multiple times. Useful to extend small datasets for debugging. - filter_null : bool, default = True - Whether or not to filter rows with a null 'data' column. Null values in the 'data' column can cause issues down - the line with processing. Setting this to True is recommended. parser_kwargs : dict, default = {} Extra options to pass to the file parser. + max_files: int + Max number of files to read. Useful for debugging to limit startup time. Default value of -1 is unlimited. """ def __init__(self, @@ -88,34 +73,32 @@ def __init__(self, files: typing.List[str], watch: bool = False, watch_interval: float = 1.0, - sort_glob: bool = False, - recursive: bool = True, - queue_max_size: int = 128, - batch_timeout: float = 5.0, + sort: bool = False, file_type: FileTypes = FileTypes.Auto, - repeat: int = 1, - filter_null: bool = True, - parser_kwargs: dict = None): + parser_kwargs: dict = None, + max_files: int = -1): super().__init__(config) - if not files: + if not files and len(files) > 0: raise ValueError("The 'files' cannot be empty.") if watch and len(files) != 1: raise ValueError("When 'watch' is True, the 'files' should contain exactly one file path.") self._files = list(files) + self._protocols = self._extract_unique_protocols() + + if len(self._protocols) > 1: + raise ValueError(f"Supports single protocol input files., but received multiple {self._protocols}") + self._watch = watch - self._sort_glob = sort_glob - self._recursive = recursive - self._queue_max_size = queue_max_size - self._batch_timeout = batch_timeout + self._sort = sort self._file_type = file_type - self._filter_null = filter_null self._parser_kwargs = parser_kwargs or {} self._watch_interval = watch_interval - self._repeat_count = repeat + self._max_files = max_files + self._stop_requested = False @property def name(self) -> str: @@ -126,35 +109,40 @@ def supports_cpp_node(self) -> bool: """Indicates whether or not this stage supports a C++ node""" return False - def _has_remote_paths(self) -> bool: - return any(urlsplit(file).scheme for file in self._files if "://" in file) + def stop(self): + """ + Performs cleanup steps when pipeline is stopped. + """ + + # Indicate we need to stop + self._stop_requested = True + + return super().stop() + + def _extract_unique_protocols(self): + protocols = set() + + for file in self._files: + scheme = urlsplit(file).scheme + if scheme: + protocols.add(scheme.lower()) + else: + protocols.add("file") + + return protocols def _build_source(self, builder: mrc.Builder) -> StreamPair: if self._build_cpp_node(): raise RuntimeError("Does not support C++ nodes") - if self._watch and not self._has_remote_paths(): - input_glob = self._files[0] - watcher = DirectoryWatcher( - input_glob=input_glob, - watch_directory=self._watch, - max_files=None, # This is not being used in the latest version. - sort_glob=self._sort_glob, - recursive=self._recursive, - queue_max_size=self._queue_max_size, - batch_timeout=self._batch_timeout) - - out_stream = watcher.build_node(self.unique_name, builder) - out_type = list[str] + if self._watch: + generator_function = self._polling_generate_frames_fsspec else: - if self._watch: - generator_function = self._polling_generate_frames_fsspec - else: - generator_function = self._generate_frames_fsspec + generator_function = self._generate_frames_fsspec - out_stream = builder.make_source(self.unique_name, generator_function()) - out_type = fsspec.core.OpenFiles + out_stream = builder.make_source(self.unique_name, generator_function()) + out_type = fsspec.core.OpenFiles # Supposed to just return a source here return out_stream, out_type @@ -167,17 +155,22 @@ def _generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]: raise RuntimeError(f"No files matched input strings: '{self._files}'. " "Check your input pattern and ensure any credentials are correct") - if self._sort_glob: + if self._sort: files = sorted(files, key=lambda f: f.full_name) + if self._max_files > 0: + files = files[:self._max_files] + yield files def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]: files_seen = set() curr_time = time.monotonic() next_update_epoch = curr_time + processed_files_count = 0 + has_s3_protocol = "s3" in self._protocols - while (True): + while (not self._stop_requested): # Before doing any work, find the next update epoch after the current time while (next_update_epoch <= curr_time): # Only ever add `self._watch_interval` to next_update_epoch so all updates are at repeating intervals @@ -186,7 +179,12 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil file_set = set() filtered_files = [] + # Clear cached instance, otherwise we don't receive newly touched files. + if has_s3_protocol: + s3fs.S3FileSystem.clear_instance_cache() + files = fsspec.open_files(self._files) + for file in files: file_set.add(file.full_name) if file.full_name not in files_seen: @@ -198,10 +196,17 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil # need to re-ingest that new file. files_seen = file_set - if len(filtered_files) > 0: - if self._sort_glob: + filtered_files_count = len(filtered_files) + + if filtered_files_count > 0: + + if self._sort: filtered_files = sorted(filtered_files, key=lambda f: f.full_name) + if self._max_files > 0: + filtered_files = filtered_files[:self._max_files - processed_files_count] + processed_files_count += len(filtered_files) + yield fsspec.core.OpenFiles(filtered_files, fs=files.fs) curr_time = time.monotonic() @@ -213,12 +218,12 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil time.sleep(sleep_duration) curr_time = time.monotonic() + if self._max_files > 0 and self._max_files <= processed_files_count: + logger.debug("Maximum file limit reached. Exiting directory watcher...") + self._stop_requested = True + @staticmethod - def generate_frames(file: fsspec.core.OpenFile, - file_type: FileTypes, - filter_null: bool, - parser_kwargs: dict, - repeat_count: int) -> list[MessageMeta]: + def generate_frames(file: fsspec.core.OpenFile, file_type: FileTypes, parser_kwargs: dict) -> list[MessageMeta]: """ Generate message frames from a file. @@ -231,14 +236,8 @@ def generate_frames(file: fsspec.core.OpenFile, An open file object using fsspec. file_type : FileTypes Indicates the type of the file to read. Supported types include 'csv', 'json', 'jsonlines', and 'parquet'. - filter_null : bool - Determines whether to filter out rows with null values in the 'data' column. Filtering null values is - recommended to prevent potential issues during processing. parser_kwargs : dict Additional keyword arguments to pass to the file parser. - repeat_count : int - The number of times to repeat the data reading process. Each repetition generates a new set of message - frames. Returns ------- @@ -248,52 +247,14 @@ def generate_frames(file: fsspec.core.OpenFile, df = read_file_to_df( file.full_name, file_type=file_type, - filter_nulls=filter_null, + filter_nulls=False, parser_kwargs=parser_kwargs, df_type="cudf", ) - metas = [] - - for i in range(repeat_count): + meta = MessageMeta(df) - x = MessageMeta(df) - - # If we are looping, copy the object. Do this before we push the object in case it changes - if (i + 1 < repeat_count): - df = df.copy() - - # Shift the index to allow for unique indices without reading more data - df.index += len(df) - - metas.append(x) - - return metas - - @staticmethod - def convert_to_fsspec_files(files: typing.Union[list[str], fsspec.core.OpenFiles]) -> fsspec.core.OpenFiles: - """ - Convert a list of file paths to fsspec OpenFiles. - - This static method takes a list of file paths or an existing fsspec OpenFiles object and ensures that the - input is converted to an OpenFiles object for uniform handling in Morpheus pipeline stages. - - Parameters - ---------- - files : Union[List[str], fsspec.core.OpenFiles] - A list of file paths or an existing fsspec OpenFiles object. - - Returns - ------- - fsspec.core.OpenFiles - An fsspec OpenFiles object representing the input files. - """ - - # Convert fsspec open files - if not isinstance(files, fsspec.core.OpenFiles): - files = fsspec.open_files(files) - - return files + return meta def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> StreamPair: @@ -301,15 +262,10 @@ def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> Stre post_node = builder.make_node( self.unique_name + "-post", - ops.map(self.convert_to_fsspec_files), ops.flatten(), # Flatten list of open fsspec files - ops.map( - partial(self.generate_frames, - file_type=self._file_type, - filter_null=self._filter_null, - parser_kwargs=self._parser_kwargs, - repeat_count=self._repeat_count)), # Generate dataframe for each file - ops.flatten()) + ops.map(partial(self.generate_frames, file_type=self._file_type, + parser_kwargs=self._parser_kwargs)) # Generate dataframe for each file + ) builder.make_edge(out_stream, post_node) diff --git a/tests/test_file_source.py b/tests/test_file_source.py index 1b811bbea1..7af7e0da3b 100644 --- a/tests/test_file_source.py +++ b/tests/test_file_source.py @@ -15,6 +15,7 @@ # limitations under the License. import os +from unittest import mock import fsspec import pytest @@ -29,40 +30,24 @@ from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage -@pytest.fixture(name="files", scope="function") -def files_fixture(): - - return fsspec.open(os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.json")) - - @pytest.mark.use_python def test_constructor(config): file_source = FileSource( config, files=["path/to/*.json"], watch=True, - sort_glob=True, - recursive=False, - queue_max_size=256, - batch_timeout=10.0, + sort=True, file_type=FileTypes.JSON, - repeat=3, - filter_null=False, parser_kwargs={"key": "value"}, watch_interval=2.0, ) assert file_source._files == ["path/to/*.json"] assert file_source._watch - assert file_source._sort_glob - assert not file_source._recursive - assert file_source._queue_max_size == 256 - assert file_source._batch_timeout == 10.0 + assert file_source._sort assert file_source._file_type == FileTypes.JSON - assert not file_source._filter_null assert file_source._parser_kwargs == {"key": "value"} assert file_source._watch_interval == 2.0 - assert file_source._repeat_count == 3 @pytest.mark.use_python @@ -73,62 +58,38 @@ def test_constructor_with_invalid_params(config, input_files): FileSource(config, files=input_files, watch=True) -@pytest.mark.parametrize("input_files", [["file1.json", "file2.json"]]) -def test_convert_to_fsspec_files(input_files): - actual_output = FileSource.convert_to_fsspec_files(files=input_files) - - assert isinstance(actual_output, fsspec.core.OpenFiles) - assert os.path.basename(actual_output[0].full_name) == input_files[0] - assert os.path.basename(actual_output[1].full_name) == input_files[1] - - @pytest.mark.use_python -@pytest.mark.parametrize( - "input_file,filetypes,filter_null,parser_kwargs, repeat_count, expected_count, expected_df_count", - [("filter_probs.json", FileTypes.Auto, False, { - "lines": False - }, 1, 1, 20), ("filter_probs.csv", FileTypes.CSV, False, {}, 2, 2, 20), - ("filter_probs.jsonlines", FileTypes.JSON, False, { - "lines": True - }, 1, 1, 20)]) -def test_generate_frames(input_file, - filetypes, - filter_null, - parser_kwargs, - repeat_count, - expected_count, - expected_df_count): +@pytest.mark.parametrize("input_file,filetypes,parser_kwargs,expected_df_count", + [("filter_probs.json", FileTypes.Auto, { + "lines": False + }, 20), ("filter_probs.jsonlines", FileTypes.JSON, { + "lines": True + }, 20)]) +def test_generate_frames(input_file, filetypes, parser_kwargs, expected_df_count): in_file = fsspec.open(os.path.join(TEST_DIRS.tests_data_dir, input_file)) - metas = FileSource.generate_frames(file=in_file, - file_type=filetypes, - filter_null=filter_null, - parser_kwargs=parser_kwargs, - repeat_count=repeat_count) + meta = FileSource.generate_frames(file=in_file, file_type=filetypes, parser_kwargs=parser_kwargs) - assert len(metas) == expected_count - assert len(metas[0].df.columns) == 4 - assert len(metas[0].df) == expected_df_count - assert isinstance(metas[0], MessageMeta) - assert isinstance(metas[0].df, cudf.DataFrame) + assert len(meta.df.columns) == 4 + assert len(meta.df) == expected_df_count + assert isinstance(meta, MessageMeta) + assert isinstance(meta.df, cudf.DataFrame) @pytest.mark.use_python -@pytest.mark.parametrize("input_files,parser_kwargs,repeat,expected_count", +@pytest.mark.parametrize("input_files,parser_kwargs,expected_count", [([ "s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T00_05_06.806Z.json", "s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T12_09_47.901Z.json" ], { "lines": False, "orient": "records" }, - 1, - 2), ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, 1, 3), - ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, 2, 6)]) -def test_filesource_with_watch_false(config, input_files, parser_kwargs, repeat, expected_count): + 2), ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, 3)]) +def test_filesource_with_watch_false(config, input_files, parser_kwargs, expected_count): pipe = Pipeline(config) - file_source_stage = FileSource(config, files=input_files, watch=False, parser_kwargs=parser_kwargs, repeat=repeat) + file_source_stage = FileSource(config, files=input_files, watch=False, parser_kwargs=parser_kwargs) sink_stage = InMemorySinkStage(config) pipe.add_stage(file_source_stage) @@ -139,3 +100,17 @@ def test_filesource_with_watch_false(config, input_files, parser_kwargs, repeat, pipe.run() assert len(sink_stage.get_messages()) == expected_count + + +@pytest.mark.use_python +def test_build_source_watch_remote_files(config): + files = ["s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022*.json"] + source = FileSource(config=config, files=files, watch=True) + + mock_node = mock.MagicMock() + mock_builder = mock.MagicMock() + mock_builder.make_source.return_value = mock_node + out_stream, out_type = source._build_source(mock_builder) + + assert out_stream == mock_node + assert out_type == fsspec.core.OpenFiles From 7bd158458544648bb75af38832ee5214a5d507b5 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 13 Sep 2023 01:18:43 -0500 Subject: [PATCH 10/15] updated filesource tests --- morpheus/stages/input/file_source.py | 39 +++++----- tests/test_file_source.py | 102 +++++++++++++++++---------- 2 files changed, 82 insertions(+), 59 deletions(-) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py index 83f2941a75..dbc28aaaff 100644 --- a/morpheus/stages/input/file_source.py +++ b/morpheus/stages/input/file_source.py @@ -27,7 +27,6 @@ from morpheus.cli import register_stage from morpheus.common import FileTypes from morpheus.config import Config -from morpheus.config import PipelineModes from morpheus.io.deserializers import read_file_to_df from morpheus.messages import MessageMeta from morpheus.pipeline.preallocator_mixin import PreallocatorMixin @@ -37,12 +36,13 @@ logger = logging.getLogger(__name__) -@register_stage("file-source", modes=[PipelineModes.FIL, PipelineModes.NLP, PipelineModes.OTHER]) +@register_stage("file-source") class FileSource(PreallocatorMixin, SingleOutputSource): """ Load messages from a file. - FileSource is used to produce messages loaded from a file. Useful for testing performance and accuracy of a pipeline. + FileSource is used to produce messages loaded from a file. Useful for testing performance and + accuracy of a pipeline. Parameters ---------- @@ -58,12 +58,12 @@ class FileSource(PreallocatorMixin, SingleOutputSource): When `watch` is True, this is the time in seconds between polling the paths in `files` for new files. sort : bool, default = False If true, the list of files will be processed in sorted order. - file_type : `morpheus.common.FileTypes`, optional, case_sensitive = False + file_type : morpheus.common.FileTypes, optional, default = `FileTypes.Auto`; case_sensitive = False Indicates what type of file to read. Specifying 'auto' will determine the file type from the extension. Supported extensions: 'csv', 'json', 'jsonlines' and 'parquet'. - parser_kwargs : dict, default = {} + parser_kwargs : dict, default = None Extra options to pass to the file parser. - max_files: int + max_files: int, default = -1 Max number of files to read. Useful for debugging to limit startup time. Default value of -1 is unlimited. """ @@ -89,7 +89,7 @@ def __init__(self, self._protocols = self._extract_unique_protocols() if len(self._protocols) > 1: - raise ValueError(f"Supports single protocol input files., but received multiple {self._protocols}") + raise ValueError("Accepts same protocol input files, but it received multiple protocols.") self._watch = watch self._sort = sort @@ -101,24 +101,23 @@ def __init__(self, @property def name(self) -> str: - """Return the name of the stage""" + """Return the name of the stage.""" return "file-source" def supports_cpp_node(self) -> bool: - """Indicates whether or not this stage supports a C++ node""" + """Indicates whether or not this stage supports a C++ node.""" return False def stop(self): - """ - Performs cleanup steps when pipeline is stopped. - """ + """Performs cleanup steps when pipeline is stopped.""" # Indicate we need to stop self._stop_requested = True return super().stop() - def _extract_unique_protocols(self): + def _extract_unique_protocols(self) -> set: + """Extracts unique protocols from the given file paths.""" protocols = set() for file in self._files: @@ -133,7 +132,7 @@ def _extract_unique_protocols(self): def _build_source(self, builder: mrc.Builder) -> StreamPair: if self._build_cpp_node(): - raise RuntimeError("Does not support C++ nodes") + raise RuntimeError("Does not support C++ nodes.") if self._watch: generator_function = self._polling_generate_frames_fsspec @@ -152,7 +151,7 @@ def _generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]: if (len(files) == 0): raise RuntimeError(f"No files matched input strings: '{self._files}'. " - "Check your input pattern and ensure any credentials are correct") + "Check your input pattern and ensure any credentials are correct.") if self._sort: files = sorted(files, key=lambda f: f.full_name) @@ -218,13 +217,13 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil curr_time = time.monotonic() if self._max_files > 0 and self._max_files <= processed_files_count: - logger.debug("Maximum file limit reached. Exiting directory watcher...") + logger.debug("Maximum file limit reached. Exiting polling service...") self._stop_requested = True @staticmethod - def generate_frames(file: fsspec.core.OpenFile, file_type: FileTypes, parser_kwargs: dict) -> list[MessageMeta]: + def generate_frames(file: fsspec.core.OpenFile, file_type: FileTypes, parser_kwargs: dict) -> MessageMeta: """ - Generate message frames from a file. + Generate message frame from a file. This function reads data from a file and generates message frames (MessageMeta) based on the file's content. It can be used to load and process messages from a file for testing and analysis within a Morpheus pipeline. @@ -240,8 +239,8 @@ def generate_frames(file: fsspec.core.OpenFile, file_type: FileTypes, parser_kwa Returns ------- - List[MessageMeta] - MessageMeta objects, each containing a dataframe of messages from the file. + MessageMeta + MessageMeta object, each containing a dataframe of messages from the file. """ df = read_file_to_df( file.full_name, diff --git a/tests/test_file_source.py b/tests/test_file_source.py index 7af7e0da3b..9fd23994bf 100644 --- a/tests/test_file_source.py +++ b/tests/test_file_source.py @@ -31,31 +31,19 @@ @pytest.mark.use_python -def test_constructor(config): - file_source = FileSource( - config, - files=["path/to/*.json"], - watch=True, - sort=True, - file_type=FileTypes.JSON, - parser_kwargs={"key": "value"}, - watch_interval=2.0, - ) - - assert file_source._files == ["path/to/*.json"] - assert file_source._watch - assert file_source._sort - assert file_source._file_type == FileTypes.JSON - assert file_source._parser_kwargs == {"key": "value"} - assert file_source._watch_interval == 2.0 - - -@pytest.mark.use_python -@pytest.mark.parametrize("input_files", [["file1.json", "file2.json"], []]) -def test_constructor_with_invalid_params(config, input_files): - with pytest.raises(ValueError): - # 'watch' is True, but multiple files are provided - FileSource(config, files=input_files, watch=True) +@pytest.mark.parametrize( + "input_files,watch,error_msg", + [(["file1.json", "file2.json"], True, "When 'watch' is True, the 'files' should contain exactly one file path."), + ([], True, "The 'files' cannot be empty."), (None, True, "The 'files' cannot be empty."), + (["file1.json", "s3://test_data/file2.json"], + True, + "When 'watch' is True, the 'files' should contain exactly one file path."), + (["file1.json", "s3://test_data/file2.json"], + False, + "Accepts same protocol input files, but it received multiple protocols.")]) +def test_constructor_with_invalid_params(config, input_files, watch, error_msg): + with pytest.raises(ValueError, match=error_msg): + FileSource(config, files=input_files, watch=watch) @pytest.mark.use_python @@ -77,19 +65,39 @@ def test_generate_frames(input_file, filetypes, parser_kwargs, expected_df_count @pytest.mark.use_python -@pytest.mark.parametrize("input_files,parser_kwargs,expected_count", - [([ - "s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T00_05_06.806Z.json", - "s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T12_09_47.901Z.json" - ], { - "lines": False, "orient": "records" - }, - 2), ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, 3)]) -def test_filesource_with_watch_false(config, input_files, parser_kwargs, expected_count): +@pytest.mark.parametrize( + "input_files,parser_kwargs,max_files,watch,expected_result", + [([ + "s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T00_05_06.806Z.json", + "s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T12_09_47.901Z.json" + ], { + "lines": False, "orient": "records" + }, + -1, + False, + 2), ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, -1, False, 3), + ([f'file:/{os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")}'], None, -1, False, RuntimeError), + ([f'file:/{os.path.join(TEST_DIRS.tests_data_dir, "triton_abp_inf_results.csv")}'], + None, + -1, + False, + FileNotFoundError), ([f'file://{os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")}'], None, -1, False, 3), + (["s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T00_05_06.806Z.json"], { + "lines": False, "orient": "records" + }, + 1, + True, + 1), ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, 2, False, 2), + ([f'file://{os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")}'], None, 3, True, 3)]) +def test_filesource_pipe(config, input_files, parser_kwargs, max_files, watch, expected_result): pipe = Pipeline(config) - file_source_stage = FileSource(config, files=input_files, watch=False, parser_kwargs=parser_kwargs) + file_source_stage = FileSource(config, + files=input_files, + watch=watch, + max_files=max_files, + parser_kwargs=parser_kwargs) sink_stage = InMemorySinkStage(config) pipe.add_stage(file_source_stage) @@ -97,20 +105,36 @@ def test_filesource_with_watch_false(config, input_files, parser_kwargs, expecte pipe.add_edge(file_source_stage, sink_stage) - pipe.run() + if expected_result in (RuntimeError, FileNotFoundError): + with pytest.raises(expected_result): + pipe.run() + else: + pipe.run() - assert len(sink_stage.get_messages()) == expected_count + assert len(sink_stage.get_messages()) == expected_result @pytest.mark.use_python -def test_build_source_watch_remote_files(config): +@pytest.mark.parametrize("watch", [True, False]) +@mock.patch.object(FileSource, '_polling_generate_frames_fsspec') +@mock.patch.object(FileSource, '_generate_frames_fsspec') +def test_build_source(mock_generate_frames_fsspec, mock_polling_generate_frames_fsspec, watch, config): files = ["s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022*.json"] - source = FileSource(config=config, files=files, watch=True) + source = FileSource(config=config, files=files, watch=watch) mock_node = mock.MagicMock() mock_builder = mock.MagicMock() mock_builder.make_source.return_value = mock_node out_stream, out_type = source._build_source(mock_builder) + if watch: + mock_polling_generate_frames_fsspec.assert_called_once() + with pytest.raises(Exception): + mock_generate_frames_fsspec.assert_called_once() + else: + mock_generate_frames_fsspec.assert_called_once() + with pytest.raises(Exception): + mock_polling_generate_frames_fsspec.assert_called_once() + assert out_stream == mock_node assert out_type == fsspec.core.OpenFiles From e6093ec2fa7b8a2bbb1373be3d930136c5dbbf9d Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 13 Sep 2023 09:54:54 -0500 Subject: [PATCH 11/15] added s3fs dependency --- docker/conda/environments/cuda11.8_dev.yml | 2 ++ tests/test_file_source.py | 17 +++++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/docker/conda/environments/cuda11.8_dev.yml b/docker/conda/environments/cuda11.8_dev.yml index 9f835123c9..d8a918e11d 100644 --- a/docker/conda/environments/cuda11.8_dev.yml +++ b/docker/conda/environments/cuda11.8_dev.yml @@ -26,6 +26,7 @@ dependencies: - automake=1.16.5 - benchmark=1.6.1 - boost-cpp=1.74 + - boto3 - cachetools=5.0.0 - ccache>=3.7 - clangdev=14 @@ -91,6 +92,7 @@ dependencies: - pytorch=2.0.1 - rapidjson=1.1.0 - requests=2.31 + - s3fs>=2023.6 - scikit-build=0.17.1 - scikit-learn=1.2.2 - sphinx diff --git a/tests/test_file_source.py b/tests/test_file_source.py index 9fd23994bf..81b28a76d6 100644 --- a/tests/test_file_source.py +++ b/tests/test_file_source.py @@ -30,18 +30,31 @@ from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage +@pytest.mark.use_python +@pytest.mark.parametrize("input_files,watch, protocols", + [(["file1.json", "file2.json"], False, ["file"]), + (["file://file1.json", "file2.json"], False, ["file"]), + (["file:///file1.json"], False, ["file"]), (["test_data/*.json"], True, ["file"]), + (["s3://test_data/file1.json", "s3://test_data/file2.json"], False, ["s3"]), + (["s3://test_data/*.json"], True, ["s3"])]) +def test_constructor(config, input_files, watch, protocols): + source = FileSource(config, files=input_files, watch=watch) + assert sorted(source._protocols) == protocols + + @pytest.mark.use_python @pytest.mark.parametrize( "input_files,watch,error_msg", [(["file1.json", "file2.json"], True, "When 'watch' is True, the 'files' should contain exactly one file path."), - ([], True, "The 'files' cannot be empty."), (None, True, "The 'files' cannot be empty."), + ([], True, "The 'files' cannot be empty."), ([], False, "The 'files' cannot be empty."), + (None, True, "The 'files' cannot be empty."), (None, False, "The 'files' cannot be empty."), (["file1.json", "s3://test_data/file2.json"], True, "When 'watch' is True, the 'files' should contain exactly one file path."), (["file1.json", "s3://test_data/file2.json"], False, "Accepts same protocol input files, but it received multiple protocols.")]) -def test_constructor_with_invalid_params(config, input_files, watch, error_msg): +def test_constructor_error(config, input_files, watch, error_msg): with pytest.raises(ValueError, match=error_msg): FileSource(config, files=input_files, watch=watch) From 0feb8487db894dc19f546a6b496a716ce6083b8f Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 13 Sep 2023 10:08:21 -0500 Subject: [PATCH 12/15] added s3fs dependency --- morpheus/stages/input/file_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py index dbc28aaaff..e17fc9165f 100644 --- a/morpheus/stages/input/file_source.py +++ b/morpheus/stages/input/file_source.py @@ -58,7 +58,7 @@ class FileSource(PreallocatorMixin, SingleOutputSource): When `watch` is True, this is the time in seconds between polling the paths in `files` for new files. sort : bool, default = False If true, the list of files will be processed in sorted order. - file_type : morpheus.common.FileTypes, optional, default = `FileTypes.Auto`; case_sensitive = False + file_type : morpheus.common.FileTypes, optional, case_sensitive = False Indicates what type of file to read. Specifying 'auto' will determine the file type from the extension. Supported extensions: 'csv', 'json', 'jsonlines' and 'parquet'. parser_kwargs : dict, default = None From 9a6b9e6cc1d4027ba6625b259574fe0872c14c0e Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 13 Sep 2023 16:18:53 -0500 Subject: [PATCH 13/15] updates to tests --- morpheus/stages/input/file_source.py | 14 ++++++---- tests/test_file_source.py | 38 ++++++++++++++++++++++------ 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py index e17fc9165f..bc63631b1d 100644 --- a/morpheus/stages/input/file_source.py +++ b/morpheus/stages/input/file_source.py @@ -57,14 +57,16 @@ class FileSource(PreallocatorMixin, SingleOutputSource): watch_interval : float, default = 1.0 When `watch` is True, this is the time in seconds between polling the paths in `files` for new files. sort : bool, default = False - If true, the list of files will be processed in sorted order. + When True, the list of files will be processed in sorted order. file_type : morpheus.common.FileTypes, optional, case_sensitive = False Indicates what type of file to read. Specifying 'auto' will determine the file type from the extension. Supported extensions: 'csv', 'json', 'jsonlines' and 'parquet'. parser_kwargs : dict, default = None Extra options to pass to the file parser. - max_files: int, default = -1 + max_files : int, default = -1 Max number of files to read. Useful for debugging to limit startup time. Default value of -1 is unlimited. + storage_connection_kwargs : dict, default = None + Extra settings that are relevant to a specific storage connection used by `fsspec.open_files`. """ def __init__(self, @@ -75,7 +77,8 @@ def __init__(self, sort: bool = False, file_type: FileTypes = FileTypes.Auto, parser_kwargs: dict = None, - max_files: int = -1): + max_files: int = -1, + storage_connection_kwargs: dict = None): super().__init__(config) @@ -95,6 +98,7 @@ def __init__(self, self._sort = sort self._file_type = file_type self._parser_kwargs = parser_kwargs or {} + self._storage_connection_kwargs = storage_connection_kwargs or {} self._watch_interval = watch_interval self._max_files = max_files self._stop_requested = False @@ -147,7 +151,7 @@ def _build_source(self, builder: mrc.Builder) -> StreamPair: def _generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]: - files: fsspec.core.OpenFiles = fsspec.open_files(self._files) + files: fsspec.core.OpenFiles = fsspec.open_files(self._files, **self._storage_connection_kwargs) if (len(files) == 0): raise RuntimeError(f"No files matched input strings: '{self._files}'. " @@ -181,7 +185,7 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil if has_s3_protocol: s3fs.S3FileSystem.clear_instance_cache() - files = fsspec.open_files(self._files) + files = fsspec.open_files(self._files, **self._storage_connection_kwargs) for file in files: file_set.add(file.full_name) diff --git a/tests/test_file_source.py b/tests/test_file_source.py index 81b28a76d6..07d5557f0a 100644 --- a/tests/test_file_source.py +++ b/tests/test_file_source.py @@ -79,7 +79,7 @@ def test_generate_frames(input_file, filetypes, parser_kwargs, expected_df_count @pytest.mark.use_python @pytest.mark.parametrize( - "input_files,parser_kwargs,max_files,watch,expected_result", + "input_files,parser_kwargs,max_files,watch,storage_connection_kwargs,expected_result", [([ "s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T00_05_06.806Z.json", "s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T12_09_47.901Z.json" @@ -88,21 +88,42 @@ def test_generate_frames(input_file, filetypes, parser_kwargs, expected_df_count }, -1, False, - 2), ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, -1, False, 3), - ([f'file:/{os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")}'], None, -1, False, RuntimeError), + None, + 2), + ([ + "/rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T00_05_06.806Z.json", + "/rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T12_09_47.901Z.json" + ], { + "lines": False, "orient": "records" + }, + -1, + False, { + "protocol": "s3" + }, + 2), ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, -1, False, None, 3), + ([f'file:/{os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")}'], None, -1, False, None, RuntimeError), ([f'file:/{os.path.join(TEST_DIRS.tests_data_dir, "triton_abp_inf_results.csv")}'], None, -1, False, - FileNotFoundError), ([f'file://{os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")}'], None, -1, False, 3), + None, + FileNotFoundError), + ([f'file://{os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")}'], None, -1, False, None, 3), (["s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T00_05_06.806Z.json"], { "lines": False, "orient": "records" }, 1, True, - 1), ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, 2, False, 2), - ([f'file://{os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")}'], None, 3, True, 3)]) -def test_filesource_pipe(config, input_files, parser_kwargs, max_files, watch, expected_result): + None, + 1), ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, 2, False, None, 2), + ([f'file://{os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")}'], None, 3, True, None, 3)]) +def test_filesource_pipe(config, + input_files, + parser_kwargs, + max_files, + watch, + storage_connection_kwargs, + expected_result): pipe = Pipeline(config) @@ -110,7 +131,8 @@ def test_filesource_pipe(config, input_files, parser_kwargs, max_files, watch, e files=input_files, watch=watch, max_files=max_files, - parser_kwargs=parser_kwargs) + parser_kwargs=parser_kwargs, + storage_connection_kwargs=storage_connection_kwargs) sink_stage = InMemorySinkStage(config) pipe.add_stage(file_source_stage) From f5d051003773c7571b307f87e3c764a88da1814d Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 13 Sep 2023 16:37:16 -0500 Subject: [PATCH 14/15] Update morpheus/stages/input/file_source.py Co-authored-by: Christopher Harris --- morpheus/stages/input/file_source.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py index e17fc9165f..de5e8d21ad 100644 --- a/morpheus/stages/input/file_source.py +++ b/morpheus/stages/input/file_source.py @@ -194,9 +194,7 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil # need to re-ingest that new file. files_seen = file_set - filtered_files_count = len(filtered_files) - - if filtered_files_count > 0: + if len(filtered_files) > 0: if self._sort: filtered_files = sorted(filtered_files, key=lambda f: f.full_name) From 54e1f5cace3166b6521631b67d3cc0ecf2ebfaa1 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 13 Sep 2023 18:21:14 -0500 Subject: [PATCH 15/15] updated filesource --- morpheus/stages/input/file_source.py | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py index bc63631b1d..2e9873f3bb 100644 --- a/morpheus/stages/input/file_source.py +++ b/morpheus/stages/input/file_source.py @@ -101,7 +101,6 @@ def __init__(self, self._storage_connection_kwargs = storage_connection_kwargs or {} self._watch_interval = watch_interval self._max_files = max_files - self._stop_requested = False @property def name(self) -> str: @@ -112,14 +111,6 @@ def supports_cpp_node(self) -> bool: """Indicates whether or not this stage supports a C++ node.""" return False - def stop(self): - """Performs cleanup steps when pipeline is stopped.""" - - # Indicate we need to stop - self._stop_requested = True - - return super().stop() - def _extract_unique_protocols(self) -> set: """Extracts unique protocols from the given file paths.""" protocols = set() @@ -172,7 +163,7 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil processed_files_count = 0 has_s3_protocol = "s3" in self._protocols - while (not self._stop_requested): + while (True): # Before doing any work, find the next update epoch after the current time while (next_update_epoch <= curr_time): # Only ever add `self._watch_interval` to next_update_epoch so all updates are at repeating intervals @@ -198,9 +189,7 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil # need to re-ingest that new file. files_seen = file_set - filtered_files_count = len(filtered_files) - - if filtered_files_count > 0: + if len(filtered_files) > 0: if self._sort: filtered_files = sorted(filtered_files, key=lambda f: f.full_name) @@ -209,6 +198,11 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil filtered_files = filtered_files[:self._max_files - processed_files_count] processed_files_count += len(filtered_files) + if self._max_files <= processed_files_count: + logger.debug("Maximum file limit reached. Exiting polling service...") + yield fsspec.core.OpenFiles(filtered_files, fs=files.fs) + break + yield fsspec.core.OpenFiles(filtered_files, fs=files.fs) curr_time = time.monotonic() @@ -220,10 +214,6 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil time.sleep(sleep_duration) curr_time = time.monotonic() - if self._max_files > 0 and self._max_files <= processed_files_count: - logger.debug("Maximum file limit reached. Exiting polling service...") - self._stop_requested = True - @staticmethod def generate_frames(file: fsspec.core.OpenFile, file_type: FileTypes, parser_kwargs: dict) -> MessageMeta: """