Skip to content

Commit

Permalink
Merge pull request #11 from iterative/refactor-asyncfs
Browse files Browse the repository at this point in the history
Refactor asyncfs
  • Loading branch information
skshetry authored Sep 27, 2022
2 parents 6481513 + 8ed3ced commit fe9e3ff
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 84 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tests =
pytest-sugar==0.9.5
pytest-cov==3.0.0
pytest-mock==3.8.2
pytest-asyncio==0.19.0
pylint==2.15.0
mypy==0.971
%(all)s
Expand Down
148 changes: 64 additions & 84 deletions src/morefs/asyn_local.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,21 @@
import asyncio
import datetime
import errno
import functools
import os
import posixpath
import shutil
from contextlib import asynccontextmanager

import aiofile
import aiofiles.os
from aiofiles.os import wrap # type: ignore[attr-defined]
from fsspec import AbstractFileSystem
from fsspec.asyn import AbstractBufferedFile, AsyncFileSystem
from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem
from fsspec.implementations.local import LocalFileSystem

aiofiles.os.chmod = wrap(os.chmod) # type: ignore[attr-defined]
aiofiles.os.utime = wrap(os.utime) # type: ignore[attr-defined]
aiofiles.os.path.islink = wrap(os.path.islink) # type: ignore[attr-defined]
aiofiles.os.path.lexists = wrap(os.path.lexists) # type: ignore[attr-defined]
async_rmtree = wrap(shutil.rmtree) # type: ignore[attr-defined]
async_move = wrap(shutil.move) # type: ignore[attr-defined]
async_copyfile = wrap(shutil.copyfile) # type: ignore[attr-defined]


def _copy_to_fobj(fs, path1, fdst):
with fs.open(path1, "rb") as fsrc:
shutil.copyfileobj(fsrc, fdst)


async_copy_to_fobj = wrap(_copy_to_fobj)
async_utime = wrap(os.utime)
async_islink = wrap(os.path.islink)
async_rmtree = wrap(shutil.rmtree)
async_copyfile = wrap(shutil.copyfile)
async_get_file = wrap(LocalFileSystem.get_file)


async def copy_asyncfileobj(fsrc, fdst, length=shutil.COPY_BUFSIZE):
Expand All @@ -36,40 +25,31 @@ async def copy_asyncfileobj(fsrc, fdst, length=shutil.COPY_BUFSIZE):
await fdst_write(buf)


# pylint: disable=arguments-renamed

# pylint: disable=abstract-method

def wrapped(func):
@functools.wraps(func)
def inner(self, *args, **kwargs):
return func(self, *args, **kwargs)

return inner
class AsyncLocalFileSystem(AsyncFileSystem, LocalFileSystem):
# temporary hack, upstream should support `mirror_sync_methods` instead.
async_impl = False


class AsyncLocalFileSystem(AsyncFileSystem): # pylint: disable=abstract-method
find = wrapped(AbstractFileSystem.find)
walk = wrapped(AbstractFileSystem.walk)
exists = wrapped(AbstractFileSystem.exists)
isdir = wrapped(AbstractFileSystem.isdir)
isfile = wrapped(AbstractFileSystem.isfile)
lexists = staticmethod(LocalFileSystem.lexists)

ls = wrapped(LocalFileSystem.ls)
info = wrapped(LocalFileSystem.info)
_chmod = wrap(LocalFileSystem.chmod)
_created = wrap(LocalFileSystem.created)
_info = wrap(LocalFileSystem.info)
_lexists = wrap(LocalFileSystem.lexists)
_makedirs = wrap(LocalFileSystem.makedirs)
_modified = wrap(LocalFileSystem.modified)
_mv_file = wrap(LocalFileSystem.mv_file)
_rm_file = wrap(LocalFileSystem.rm_file)
_rmdir = wrap(LocalFileSystem.rmdir)

async def _ls(self, path, detail=True, **kwargs):
path = self._strip_protocol(path)
if detail:
entries = await aiofiles.os.scandir(path)
return [await self._info(f) for f in entries]
return [os.path.join(path, f) for f in await aiofiles.os.listdir(path)]

async def _rm_file(self, path, **kwargs):
await aiofiles.os.remove(path)

async def _rmdir(self, path):
await aiofiles.os.rmdir(path)
with await aiofiles.os.scandir(path) as entries:
return [await self._info(f) for f in entries]
return [
posixpath.join(path, f) for f in await aiofiles.os.listdir(path)
]

async def _mkdir(self, path, create_parents=True, **kwargs):
if create_parents:
Expand All @@ -78,11 +58,9 @@ async def _mkdir(self, path, create_parents=True, **kwargs):
errno.EEXIST, os.strerror(errno.EEXIST), path
)
return await self._makedirs(path, exist_ok=True)
path = self._strip_protocol(path)
await aiofiles.os.mkdir(path)

async def _makedirs(self, path, exist_ok=False):
await aiofiles.os.makedirs(path, exist_ok=exist_ok)

async def _cat_file(self, path, start=None, end=None, **kwargs):
async with self.open_async(path, "rb") as f:
if start is not None:
Expand All @@ -100,76 +78,78 @@ async def _pipe_file(self, path, value, **kwargs):
async with self.open_async(path, "wb") as f:
await f.write(value)

async def _put_file(self, path1, path2, **kwargs):
await self._cp_file(path1, path2, **kwargs)

async def _get_file(self, path1, path2, **kwargs):
async def _get_file( # pylint: disable=arguments-renamed
self, path1, path2, **kwargs
):
write_method = getattr(path2, "write", None)
if not write_method:
return await self._cp_file(path1, path2, **kwargs)
if isinstance(
path2, AbstractBufferedFile
path2, AbstractAsyncStreamedFile
) or asyncio.iscoroutinefunction(write_method):
async with self.open_async(path1, "rb") as fsrc:
return await async_copy_to_fobj(fsrc, path2)
return await async_copy_to_fobj(path1, path2)
return await copy_asyncfileobj(fsrc, path2)

path1 = self._strip_protocol(path1)
return await async_get_file(self, path1, path2)

async def _cp_file(self, path1, path2, **kwargs):
path1 = self._strip_protocol(path1)
path2 = self._strip_protocol(path2)
if self.auto_mkdir:
await self._makedirs(self._parent(path2), exist_ok=True)
if await self._isfile(path1):
return await async_copyfile(path1, path2)
if await self._isdir(path1):
return await self._makedirs(path2, exist_ok=True)
raise FileNotFoundError

async def _mv_file(self, path1, path2, **kwargs):
await async_move(path1, path2)

async def _lexists(self, path, **kwargs):
return await aiofiles.os.path.lexists(path)
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path1)

async def _created(self, path):
info = await self._info(path=path)
return datetime.datetime.utcfromtimestamp(info["created"])

async def _modified(self, path):
info = await self._info(path=path)
return datetime.datetime.utcfromtimestamp(info["mtime"])
_put_file = _cp_file

async def _rm(
self, path, recursive=False, maxdepth=None
): # pylint: disable=arguments-differ, unused-argument
if isinstance(path, str):
self, path, recursive=False, batch_size=None, maxdepth=None, **kwargs
):
if isinstance(path, (str, os.PathLike)):
path = [path]

assert not maxdepth and not batch_size
for p in path:
p = self._strip_protocol(p)
if recursive and await self._isdir(p):
if os.path.abspath(p) == os.getcwd():
raise ValueError("Cannot delete current working directory")
await async_rmtree(p)
else:
await aiofiles.os.remove(p)

async def _chmod(self, path, mode):
await aiofiles.os.chmod(path, mode)

async def _link(self, src, dst):
src = self._strip_protocol(src)
dst = self._strip_protocol(dst)
await aiofiles.os.link(src, dst)

async def _symlink(self, src, dst):
src = self._strip_protocol(src)
dst = self._strip_protocol(dst)
await aiofiles.os.symlink(src, dst)

async def _islink(self, path):
return await aiofiles.os.path.islink(path)
path = self._strip_protocol(path)
return await async_islink(path)

async def _touch(self, path, **kwargs):
if self._exists(path):
return await aiofiles.os.utime(path, None)
if self.auto_mkdir:
await self._makedirs(self._parent(path), exist_ok=True)
if await self._exists(path):
path = self._strip_protocol(path)
return await async_utime(path, None)
async with self.open_async(path, "a"):
pass

_open = LocalFileSystem._open # pylint: disable=protected-access
@asynccontextmanager
async def open_async(self, path, mode="rb", **kwargs):
path = self._strip_protocol(path)
if self.auto_mkdir and "w" in mode:
await self._makedirs(self._parent(path), exist_ok=True)

def open_async( # pylint: disable=invalid-overridden-method
self, path, mode="rb", **kwargs
):
return aiofile.async_open(path, mode, **kwargs)
async with aiofile.async_open(path, mode, **kwargs) as f:
yield f
Loading

0 comments on commit fe9e3ff

Please sign in to comment.