From e83ce850f433fd8bbf8ff4e8d7649b942639db31 Mon Sep 17 00:00:00 2001 From: Barney Gale Date: Wed, 5 Jun 2024 18:54:50 +0100 Subject: [PATCH] pathlib ABCs: remove duplicate `realpath()` implementation. (#119178) Add private `posixpath._realpath()` function, which is a generic version of `realpath()` that can be parameterised with string tokens (`sep`, `curdir`, `pardir`) and query functions (`getcwd`, `lstat`, `readlink`). Also add support for limiting the number of symlink traversals. In the private `pathlib._abc.PathBase` class, call `posixpath._realpath()` and remove our re-implementation of the same algorithm. No change to any public APIs, either in `posixpath` or `pathlib`. Co-authored-by: Nice Zombies --- Lib/pathlib/_abc.py | 87 +++++++++++++++------------------------------ Lib/posixpath.py | 40 +++++++++++++++------ 2 files changed, 57 insertions(+), 70 deletions(-) diff --git a/Lib/pathlib/_abc.py b/Lib/pathlib/_abc.py index d7471b6927331d..1a74f457c3f5a7 100644 --- a/Lib/pathlib/_abc.py +++ b/Lib/pathlib/_abc.py @@ -12,8 +12,8 @@ """ import functools +import posixpath from glob import _Globber, _no_recurse_symlinks -from errno import ENOTDIR, ELOOP from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO @@ -696,65 +696,34 @@ def resolve(self, strict=False): """ if self._resolving: return self - path_root, parts = self._stack - path = self.with_segments(path_root) - try: - path = path.absolute() - except UnsupportedOperation: - path_tail = [] - else: - path_root, path_tail = path._stack - path_tail.reverse() - - # If the user has *not* overridden the `readlink()` method, then symlinks are unsupported - # and (in non-strict mode) we can improve performance by not calling `stat()`. - querying = strict or getattr(self.readlink, '_supported', True) - link_count = 0 - while parts: - part = parts.pop() - if not part or part == '.': - continue - if part == '..': - if not path_tail: - if path_root: - # Delete '..' segment immediately following root - continue - elif path_tail[-1] != '..': - # Delete '..' segment and its predecessor - path_tail.pop() - continue - path_tail.append(part) - if querying and part != '..': - path = self.with_segments(path_root + self.parser.sep.join(path_tail)) + + def getcwd(): + return str(self.with_segments().absolute()) + + if strict or getattr(self.readlink, '_supported', True): + def lstat(path_str): + path = self.with_segments(path_str) path._resolving = True - try: - st = path.stat(follow_symlinks=False) - if S_ISLNK(st.st_mode): - # Like Linux and macOS, raise OSError(errno.ELOOP) if too many symlinks are - # encountered during resolution. - link_count += 1 - if link_count >= self._max_symlinks: - raise OSError(ELOOP, "Too many symbolic links in path", self._raw_path) - target_root, target_parts = path.readlink()._stack - # If the symlink target is absolute (like '/etc/hosts'), set the current - # path to its uppermost parent (like '/'). - if target_root: - path_root = target_root - path_tail.clear() - else: - path_tail.pop() - # Add the symlink target's reversed tail parts (like ['hosts', 'etc']) to - # the stack of unresolved path parts. - parts.extend(target_parts) - continue - elif parts and not S_ISDIR(st.st_mode): - raise NotADirectoryError(ENOTDIR, "Not a directory", self._raw_path) - except OSError: - if strict: - raise - else: - querying = False - return self.with_segments(path_root + self.parser.sep.join(path_tail)) + return path.lstat() + + def readlink(path_str): + path = self.with_segments(path_str) + path._resolving = True + return str(path.readlink()) + else: + # If the user has *not* overridden the `readlink()` method, then + # symlinks are unsupported and (in non-strict mode) we can improve + # performance by not calling `path.lstat()`. + def skip(path_str): + # This exception will be internally consumed by `_realpath()`. + raise OSError("Operation skipped.") + + lstat = readlink = skip + + return self.with_segments(posixpath._realpath( + str(self), strict, self.parser.sep, + getcwd=getcwd, lstat=lstat, readlink=readlink, + maxlinks=self._max_symlinks)) def symlink_to(self, target, target_is_directory=False): """ diff --git a/Lib/posixpath.py b/Lib/posixpath.py index 47b2aa572e5c65..fccca4e066b76f 100644 --- a/Lib/posixpath.py +++ b/Lib/posixpath.py @@ -22,6 +22,7 @@ altsep = None devnull = '/dev/null' +import errno import os import sys import stat @@ -401,7 +402,10 @@ def realpath(filename, *, strict=False): curdir = '.' pardir = '..' getcwd = os.getcwd + return _realpath(filename, strict, sep, curdir, pardir, getcwd) +def _realpath(filename, strict=False, sep=sep, curdir=curdir, pardir=pardir, + getcwd=os.getcwd, lstat=os.lstat, readlink=os.readlink, maxlinks=None): # The stack of unresolved path parts. When popped, a special value of None # indicates that a symlink target has been resolved, and that the original # symlink path can be retrieved by popping again. The [::-1] slice is a @@ -418,6 +422,10 @@ def realpath(filename, *, strict=False): # the same links. seen = {} + # Number of symlinks traversed. When the number of traversals is limited + # by *maxlinks*, this is used instead of *seen* to detect symlink loops. + link_count = 0 + while rest: name = rest.pop() if name is None: @@ -436,11 +444,19 @@ def realpath(filename, *, strict=False): else: newpath = path + sep + name try: - st = os.lstat(newpath) + st = lstat(newpath) if not stat.S_ISLNK(st.st_mode): path = newpath continue - if newpath in seen: + elif maxlinks is not None: + link_count += 1 + if link_count > maxlinks: + if strict: + raise OSError(errno.ELOOP, os.strerror(errno.ELOOP), + newpath) + path = newpath + continue + elif newpath in seen: # Already seen this path path = seen[newpath] if path is not None: @@ -448,26 +464,28 @@ def realpath(filename, *, strict=False): continue # The symlink is not resolved, so we must have a symlink loop. if strict: - # Raise OSError(errno.ELOOP) - os.stat(newpath) + raise OSError(errno.ELOOP, os.strerror(errno.ELOOP), + newpath) path = newpath continue - target = os.readlink(newpath) + target = readlink(newpath) except OSError: if strict: raise path = newpath continue # Resolve the symbolic link - seen[newpath] = None # not resolved symlink if target.startswith(sep): # Symlink target is absolute; reset resolved path. path = sep - # Push the symlink path onto the stack, and signal its specialness by - # also pushing None. When these entries are popped, we'll record the - # fully-resolved symlink target in the 'seen' mapping. - rest.append(newpath) - rest.append(None) + if maxlinks is None: + # Mark this symlink as seen but not fully resolved. + seen[newpath] = None + # Push the symlink path onto the stack, and signal its specialness + # by also pushing None. When these entries are popped, we'll + # record the fully-resolved symlink target in the 'seen' mapping. + rest.append(newpath) + rest.append(None) # Push the unresolved symlink target parts onto the stack. rest.extend(target.split(sep)[::-1])