Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add plugins for fsspec caching and cache cleaning #205

Merged
merged 17 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions doc/source/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,20 @@ Settings:
The S3 connection options are handled by the
`fsspec <https://filesystem-spec.readthedocs.io/en/latest/features.html#configuration>`_
configuration mechanism.

Caching remotely read data
**************************

The ``use_fsspec_cache`` plugin can be used to turn on cachcing for remotely read files. See below for
pnuu marked this conversation as resolved.
Show resolved Hide resolved
cache cleaning.

Settings used from the product list:
- ``fsspec_cache`` - name of the cache implementation. Can be one of ``blockcache``, ``filecache`` or
``simplecache``
- ``fsspec_cache_dir`` - location where the caching is done. If it does not exist, it will be created
mraspaud marked this conversation as resolved.
Show resolved Hide resolved

Cleaning file cache
*******************

If remotely read files are cached, this plugin can be used to automatically clean the caches.
No additional settings are available.
34 changes: 34 additions & 0 deletions trollflow2/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1114,3 +1114,37 @@ def callback_close(obj, targs, job, fmat_config):
for targ in targs:
targ.close()
return obj


def use_fsspec_cache(job):
"""Use the caching from fsspec for (remote) files."""
import fsspec
from satpy.readers import FSFile

cache = job["product_list"]["fsspec_cache"]
cache_dir = job["product_list"].get("fsspec_cache_dir")
filenames = job["input_filenames"]
cached_filenames = [f"{cache}::{f}" for f in filenames]
if cache_dir:
if cache == "blockcache":
open_files = fsspec.open_files(cached_filenames,
blockcache={"cache_storage": cache_dir})
elif cache == "filecache":
open_files = fsspec.open_files(cached_filenames,
filecache={"cache_storage": cache_dir})
elif cache == "simplecache":
open_files = fsspec.open_files(cached_filenames,
simplecache={"cache_storage": cache_dir})
mraspaud marked this conversation as resolved.
Show resolved Hide resolved
else:
open_files = fsspec.open_files(cached_filenames)
fs_files = [FSFile(open_file) for open_file in open_files]
job["input_filenames"] = fs_files


def clear_fsspec_cache(job):
"""Clear all files in fsspec cache directory."""
filenames = job["input_filenames"]

for f in filenames:
if hasattr(f, "_fs"):
f._fs.clear_cache()
mraspaud marked this conversation as resolved.
Show resolved Hide resolved
83 changes: 82 additions & 1 deletion trollflow2/tests/test_trollflow2.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import pathlib
import queue
import unittest
from contextlib import suppress
from functools import partial
from unittest import mock

Expand Down Expand Up @@ -2018,7 +2019,8 @@ def test_filepublisher_kwargs_direct_instance_no_nameserver(self):

_ = FilePublisher(port=40000, nameservers=False)
NoisyPublisher.assert_not_called()
Publisher.assert_called_once_with('tcp://*:40000', name='l2processor', min_port=None, max_port=None)
assert "tcp://*:40000" in Publisher.mock_calls[0].args
mraspaud marked this conversation as resolved.
Show resolved Hide resolved
assert Publisher.mock_calls[0].kwargs["name"] == "l2processor"

def test_filepublisher_kwargs(self):
"""Test filepublisher keyword argument usage.
Expand Down Expand Up @@ -2298,5 +2300,84 @@ def test_format_decoration_plain_text():
assert format_decoration(fmat, fmat_config) == fmat_config


@pytest.fixture
def local_test_file(tmp_path):
"""Create a local test file for fsspec tests."""
fname = tmp_path / "file.nc"
with open(fname, "w") as fid:
fid.write("42\n")
return fname


def _get_fsspec_job(tmp_path, test_file, fsspec_cache, use_cache_dir=True):
input_filenames = [f"file://{os.fspath(test_file)}"]
product_list = {"fsspec_cache": fsspec_cache}
if use_cache_dir:
cache_dir = os.fspath(tmp_path / fsspec_cache)
product_list["fsspec_cache_dir"] = cache_dir
job = {
"product_list": product_list,
"input_filenames": input_filenames,
}
return job


def test_use_fsspec_cache(local_test_file, tmp_path):
"""Test that the configured cache method is applied to the given input files."""
import fsspec
from satpy.readers import FSFile

from trollflow2.plugins import use_fsspec_cache

job = _get_fsspec_job(tmp_path, local_test_file, "simplecache", use_cache_dir=False)

use_fsspec_cache(job)

for f in job["input_filenames"]:
assert isinstance(f, FSFile)
assert isinstance(f._fs, fsspec.implementations.cached.SimpleCacheFileSystem)


def _access_fsspec_file(fname):
# For blockcache we need to ignore the AttributeError
with suppress(AttributeError):
with fname.open("r") as fid:
_ = fid.read()


@pytest.mark.parametrize("fsspec_cache", ("blockcache", "filecache", "simplecache"))
def test_use_fsspec_cache_dir(local_test_file, tmp_path, fsspec_cache):
"""Test that the configured cache directory is used."""
from trollflow2.plugins import use_fsspec_cache

job = _get_fsspec_job(tmp_path, local_test_file, fsspec_cache)

use_fsspec_cache(job)

# Access the file and check the data has been cached
_access_fsspec_file(job["input_filenames"][0])

assert os.listdir(job["product_list"]["fsspec_cache_dir"])


@pytest.mark.parametrize("fsspec_cache", ("blockcache", "filecache", "simplecache"))
def test_clear_fsspec_cache(tmp_path, local_test_file, fsspec_cache):
"""Test clearing fsspec created caches."""
from trollflow2.plugins import clear_fsspec_cache, use_fsspec_cache

# Access some data and use fsspec caching
job = _get_fsspec_job(tmp_path, local_test_file, fsspec_cache)
use_fsspec_cache(job)
_access_fsspec_file(job["input_filenames"][0])
# Make sure the caches exist
assert os.listdir(job["product_list"]["fsspec_cache_dir"])

clear_fsspec_cache(job)

# simplecache cleaning removes the whole cache directory so we need to account for that
with suppress(FileNotFoundError):
assert os.listdir(job["product_list"]["fsspec_cache_dir"]) == []


if __name__ == '__main__':
unittest.main()