Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified file source stage #1184

Open
wants to merge 20 commits into
base: branch-23.11
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies:
- automake=1.16.5
- benchmark=1.6.1
- boost-cpp=1.74
- boto3
- cachetools=5.0.0
- ccache>=3.7
- clangdev=14
Expand Down Expand Up @@ -91,6 +92,7 @@ dependencies:
- pytorch=2.0.1
- rapidjson=1.1.0
- requests=2.31
- s3fs>=2023.6
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
- scikit-build=0.17.1
- scikit-learn=1.2.2
- sphinx
Expand Down
1 change: 1 addition & 0 deletions morpheus/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ def post_pipeline(ctx: click.Context, *args, **kwargs):
add_command("delay", "morpheus.stages.general.delay_stage.DelayStage", modes=ALL)
add_command("deserialize", "morpheus.stages.preprocess.deserialize_stage.DeserializeStage", modes=NOT_AE)
add_command("dropna", "morpheus.stages.preprocess.drop_null_stage.DropNullStage", modes=NOT_AE)
add_command("file-source", "morpheus.stages.input.file_source.FileSource", modes=NOT_AE)
add_command("filter", "morpheus.stages.postprocess.filter_detections_stage.FilterDetectionsStage", modes=ALL)
add_command("from-azure", "morpheus.stages.input.azure_source_stage.AzureSourceStage", modes=AE_ONLY)
add_command("from-appshield", "morpheus.stages.input.appshield_source_stage.AppShieldSourceStage", modes=FIL_ONLY)
Expand Down
273 changes: 273 additions & 0 deletions morpheus/stages/input/file_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""File source stage."""

import logging
import time
import typing
from functools import partial
from urllib.parse import urlsplit

import fsspec
import mrc
import s3fs
from mrc.core import operators as ops

from morpheus.cli import register_stage
from morpheus.common import FileTypes
from morpheus.config import Config
from morpheus.io.deserializers import read_file_to_df
from morpheus.messages import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair

logger = logging.getLogger(__name__)


@register_stage("file-source")
class FileSource(PreallocatorMixin, SingleOutputSource):
"""
Load messages from a file.

FileSource is used to produce messages loaded from a file. Useful for testing performance and
accuracy of a pipeline.

Parameters
----------
config : morpheus.config.Config
Pipeline configuration instance.
files : List[str]
List of paths to be read from, can be a list of S3 URLs (`s3://path`) and can include wildcard characters `*`
as defined by `fsspec`:
https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
watch : bool, default = False
When True, will check `files` for new files and emit them as they appear.
watch_interval : float, default = 1.0
When `watch` is True, this is the time in seconds between polling the paths in `files` for new files.
sort : bool, default = False
If true, the list of files will be processed in sorted order.
file_type : morpheus.common.FileTypes, optional, case_sensitive = False
Indicates what type of file to read. Specifying 'auto' will determine the file type from the extension.
Supported extensions: 'csv', 'json', 'jsonlines' and 'parquet'.
parser_kwargs : dict, default = None
Extra options to pass to the file parser.
max_files: int, default = -1
Max number of files to read. Useful for debugging to limit startup time. Default value of -1 is unlimited.
"""

def __init__(self,
config: Config,
files: typing.List[str],
watch: bool = False,
watch_interval: float = 1.0,
sort: bool = False,
file_type: FileTypes = FileTypes.Auto,
parser_kwargs: dict = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: Adding this argument makes me uneasy, since it will be difficult to deprecate in the future if necessary.

Question: Is this being added as a new feature, or is this something that existed on any of the other file source implementations?

max_files: int = -1):

super().__init__(config)

if not files or len(files) == 0:
raise ValueError("The 'files' cannot be empty.")

if watch and len(files) != 1:
raise ValueError("When 'watch' is True, the 'files' should contain exactly one file path.")

self._files = list(files)
self._protocols = self._extract_unique_protocols()

if len(self._protocols) > 1:
raise ValueError("Accepts same protocol input files, but it received multiple protocols.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: It appears that == 0 and < -1 are invalid values for max_files.

Important: Check that max_files is in a valid range (if you decide to keep -1 as the default, adjust accordingly).

Suggested change
if max_files and max_files <= 0:
raise ValueError(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raising an error if self._files is None or []. We will get at least one value in the self._protocols, so i didn't put an extra check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but what about max_files? If max_files == 0 or max_files < -1, then this stage won't produce any files. In that case we should either warn or raise an exception.

Copy link
Contributor Author

@bsuryadevara bsuryadevara Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max_files flag takes effect only when set to a value greater than zero; otherwise, it is treated as continuous polling without any imposed limit. Default value is -1, so I thought raising an error or warn would not needed. Let me know if you still want to add the warning message.

self._watch = watch
self._sort = sort
self._file_type = file_type
self._parser_kwargs = parser_kwargs or {}
self._watch_interval = watch_interval
self._max_files = max_files
self._stop_requested = False

@property
def name(self) -> str:
"""Return the name of the stage."""
return "file-source"

def supports_cpp_node(self) -> bool:
"""Indicates whether or not this stage supports a C++ node."""
return False

def stop(self):
"""Performs cleanup steps when pipeline is stopped."""

# Indicate we need to stop
self._stop_requested = True

return super().stop()

def _extract_unique_protocols(self) -> set:
"""Extracts unique protocols from the given file paths."""
protocols = set()

for file in self._files:
scheme = urlsplit(file).scheme
if scheme:
protocols.add(scheme.lower())
else:
protocols.add("file")

return protocols

def _build_source(self, builder: mrc.Builder) -> StreamPair:

if self._build_cpp_node():
raise RuntimeError("Does not support C++ nodes.")

if self._watch:
generator_function = self._polling_generate_frames_fsspec
else:
generator_function = self._generate_frames_fsspec

out_stream = builder.make_source(self.unique_name, generator_function())
out_type = fsspec.core.OpenFiles

# Supposed to just return a source here
return out_stream, out_type

def _generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]:

files: fsspec.core.OpenFiles = fsspec.open_files(self._files)

if (len(files) == 0):
raise RuntimeError(f"No files matched input strings: '{self._files}'. "
"Check your input pattern and ensure any credentials are correct.")

if self._sort:
files = sorted(files, key=lambda f: f.full_name)

if self._max_files > 0:
files = files[:self._max_files]

yield files

def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]:
files_seen = set()
curr_time = time.monotonic()
next_update_epoch = curr_time
processed_files_count = 0
has_s3_protocol = "s3" in self._protocols

while (not self._stop_requested):
# Before doing any work, find the next update epoch after the current time
while (next_update_epoch <= curr_time):
# Only ever add `self._watch_interval` to next_update_epoch so all updates are at repeating intervals
next_update_epoch += self._watch_interval

file_set = set()
filtered_files = []

# Clear cached instance, otherwise we don't receive newly touched files.
if has_s3_protocol:
s3fs.S3FileSystem.clear_instance_cache()

files = fsspec.open_files(self._files)

for file in files:
file_set.add(file.full_name)
if file.full_name not in files_seen:
filtered_files.append(file)

# Replace files_seen with the new set of files. This prevents a memory leak that could occurr if files are
# deleted from the input directory. In addition if a file with a given name was created, seen/processed by
# the stage, and then deleted, and a new file with the same name appeared sometime later, the stage will
# need to re-ingest that new file.
files_seen = file_set

filtered_files_count = len(filtered_files)

if filtered_files_count > 0:
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved

if self._sort:
filtered_files = sorted(filtered_files, key=lambda f: f.full_name)

if self._max_files > 0:
filtered_files = filtered_files[:self._max_files - processed_files_count]
Copy link
Contributor

@cwharris cwharris Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: If processed_files_count > self._max_files we get filtered_files[:n] where n < 0, meaning we'll take the last n files, which doesn't sound like what we want.

Important: make sure we don't accidentally read from the end of the list of filtered_files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see we won't get a negative number because processed_files_count is calculated based on _max_files. My bad. No change needed.

processed_files_count += len(filtered_files)
cwharris marked this conversation as resolved.
Show resolved Hide resolved

yield fsspec.core.OpenFiles(filtered_files, fs=files.fs)

curr_time = time.monotonic()

# If we spent more than `self._watch_interval` doing work and/or yielding to the output channel blocked,
# then we should only sleep for the remaining time until the next update epoch.
sleep_duration = next_update_epoch - curr_time
if (sleep_duration > 0):
time.sleep(sleep_duration)
curr_time = time.monotonic()

if self._max_files > 0 and self._max_files <= processed_files_count:
logger.debug("Maximum file limit reached. Exiting polling service...")
self._stop_requested = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: Oh, I see. This is how we are stopping the source when we reach the max file limit. This is fine, but in general cancellation tokens are reserved for flagging from outside of the function that checks them. I think we can move this logic up in to the previous if self._max_files > 0 condition and use break or return there rather than flagging the cancellation token. Up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated as suggested. I tried to avoid break and yield (multiple times), which is the reason i choosed this approach.


@staticmethod
def generate_frames(file: fsspec.core.OpenFile, file_type: FileTypes, parser_kwargs: dict) -> MessageMeta:
"""
Generate message frame from a file.

This function reads data from a file and generates message frames (MessageMeta) based on the file's content.
It can be used to load and process messages from a file for testing and analysis within a Morpheus pipeline.

Parameters
----------
file : fsspec.core.OpenFile
An open file object using fsspec.
file_type : FileTypes
Indicates the type of the file to read. Supported types include 'csv', 'json', 'jsonlines', and 'parquet'.
parser_kwargs : dict
Additional keyword arguments to pass to the file parser.

Returns
-------
MessageMeta
MessageMeta object, each containing a dataframe of messages from the file.
"""
df = read_file_to_df(
file.full_name,
file_type=file_type,
filter_nulls=False,
parser_kwargs=parser_kwargs,
df_type="cudf",
)

meta = MessageMeta(df)

return meta

def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> StreamPair:

out_stream = out_pair[0]

post_node = builder.make_node(
self.unique_name + "-post",
ops.flatten(), # Flatten list of open fsspec files
ops.map(partial(self.generate_frames, file_type=self._file_type,
parser_kwargs=self._parser_kwargs)) # Generate dataframe for each file
)

builder.make_edge(out_stream, post_node)

out_stream = post_node
out_type = MessageMeta

return super()._post_build_single(builder, (out_stream, out_type))
Loading