diff --git a/pythonanywhere/api/files_api.py b/pythonanywhere/api/files_api.py index 97bef23..9129799 100644 --- a/pythonanywhere/api/files_api.py +++ b/pythonanywhere/api/files_api.py @@ -2,6 +2,7 @@ *Don't use* `Files` :class: in helper scripts, use `pythonanywhere.files` classes instead.""" import getpass +from os import path from urllib.parse import urljoin from pythonanywhere.api.base import call_api, get_api_endpoint @@ -14,23 +15,38 @@ class Files: which is stored in a class variable `Files.base_url`, then calls `call_api` with appropriate arguments to execute files action. - Covers 'GET' for files path endpoint. + Covers GET and POST for files path endpoint. ********************************** TODOS: - - POST, DELETE for path endpoint + - DELETE for path endpoint - POST for sharing - GET, DELETE for sharing path - GET for tree ********************************** - Use :method: `Files.get_path` to get contents of file or directory. + "path" methods: + - use :method: `Files.path_get` to get contents of file or directory from `path`, + - use :method: `Files.path_post` to upload or update file at given `dest_path` using contents + from `source`. """ base_url = get_api_endpoint().format(username=getpass.getuser(), flavor="files") + path_endpoint = urljoin(base_url, "path") + + def _error_msg(self, result): + if "application/json" in result.headers.get("content-type", ""): + return ": " + result.json()["detail"] + return "" + + def path_get(self, path): + """Returns dictionary of directory contents when `path` is an absolute path + to of an existing directory or file contents if `path` is an absolute path to an existing + file -- both available to the PythonAnywhere user. + Raises when `path` is invalid or unavailable.""" + + url = f"{self.path_endpoint}{path}" - def get_path(self, path): - url = urljoin(self.base_url, path) result = call_api(url, "GET") if result.status_code == 200: @@ -38,7 +54,35 @@ def get_path(self, path): return result.json() return result.content - if not result.ok: - raise Exception( - f"GET to fetch contents of {url} failed, got {result}: {result.text}" - ) + raise Exception( + f"GET to fetch contents of {url} failed, got {result}{self._error_msg(result)}" + ) + + def path_post(self, dest_path, source, as_string=False): + """Uploads contents of `source` to `dest_path` which should be a valid absolute path + of a file available to a PythonAnywhere user. If `dest_path` contains directories which + don't exist yet they will be created. + + With `as_string` optional keyword set to `True`, method interprets `source` as string + containing file contents, otherwise `source` is expected to be a valid path to e file. + + Returns 200 if existing file on PythonAnywhere has been updated with `source` contents, + or 201 if file from `dest_path` has been created with those contents.""" + + url = f"{self.path_endpoint}{dest_path}" + + if as_string: + content = source + else: + if not path.isfile(source): + raise Exception("Source should be an existing file or a string") + content = open(source, "rb") + + result = call_api(url, "POST", files={"content": content}) + + if result.ok: + return result.status_code + + raise Exception( + f"POST to upload contents to {url} failed, got {result}{self._error_msg(result)}" + ) diff --git a/tests/test_api_files.py b/tests/test_api_files.py index 6ed3182..428284c 100644 --- a/tests/test_api_files.py +++ b/tests/test_api_files.py @@ -1,5 +1,6 @@ import getpass import json +from unittest.mock import patch from urllib.parse import urljoin import pytest @@ -9,79 +10,149 @@ from pythonanywhere.api.files_api import Files -@pytest.fixture -def files_base_url(): - return get_api_endpoint().format(username=getpass.getuser(), flavor="files") - - -@pytest.fixture -def home_dir_path(files_base_url): - return urljoin(files_base_url, f"path/home/{getpass.getuser()}/") - - -@pytest.fixture -def default_home_dir_files(home_dir_path): - return { - ".bashrc": {"type": "file", "url": urljoin(home_dir_path, ".bashrc")}, - ".gitconfig": {"type": "file", "url": urljoin(home_dir_path, ".gitconfig")}, - ".local": {"type": "directory", "url": urljoin(home_dir_path, ".local")}, - ".profile": {"type": "file", "url": urljoin(home_dir_path, ".profile")}, - "README.txt": {"type": "file", "url": urljoin(home_dir_path, "README.txt")}, +class TestFiles: + username = getpass.getuser() + base_url = get_api_endpoint().format(username=username, flavor="files") + home_dir_path = f"/home/{username}" + default_home_dir_files = { + ".bashrc": {"type": "file", "url": f"{base_url}path{home_dir_path}/.bashrc"}, + ".gitconfig": {"type": "file", "url": f"{base_url}path{home_dir_path}/.gitconfig"}, + ".local": {"type": "directory", "url": f"{base_url}path{home_dir_path}/.local"}, + ".profile": {"type": "file", "url": f"{base_url}path{home_dir_path}/.profile"}, + "README.txt": {"type": "file", "url": f"{base_url}path{home_dir_path}/README.txt"}, } @pytest.mark.files -class TestFilesPath: +class TestFilesGetPath(TestFiles): def test_returns_contents_of_directory_when_path_to_dir_provided( - self, api_token, api_responses, home_dir_path, default_home_dir_files + self, api_token, api_responses, ): + dir_url = urljoin(self.base_url, f"path{self.home_dir_path}") api_responses.add( responses.GET, - url=home_dir_path, + url=dir_url, status=200, - body=json.dumps(default_home_dir_files), + body=json.dumps(self.default_home_dir_files), headers={"Content-Type": "application/json"}, ) - assert Files().get_path(home_dir_path) == default_home_dir_files + assert Files().path_get(self.home_dir_path) == self.default_home_dir_files - def test_returns_file_contents_when_file_path_provided( - self, api_token, api_responses, home_dir_path - ): - filepath = urljoin(home_dir_path, "README.txt") + def test_returns_file_contents_when_file_path_provided(self, api_token, api_responses): + filepath = urljoin(self.home_dir_path, "README.txt") + file_url = urljoin(self.base_url, f"path{filepath}") body = ( - b'# vim: set ft=rst:\n\nSee https://help.pythonanywhere.com/ ' + b"# vim: set ft=rst:\n\nSee https://help.pythonanywhere.com/ " b'(or click the "Help" link at the top\nright) ' - b'for help on how to use PythonAnywhere, including tips on copying and\n' - b'pasting from consoles, and writing your own web applications.\n' + b"for help on how to use PythonAnywhere, including tips on copying and\n" + b"pasting from consoles, and writing your own web applications.\n" ) api_responses.add( responses.GET, - url=filepath, + url=file_url, status=200, body=body, headers={"Content-Type": "application/octet-stream; charset=utf-8"}, ) - assert Files().get_path(filepath) == body + assert Files().path_get(filepath) == body - def test_raises_because_wrong_path_provided( - self, api_token, api_responses, home_dir_path, default_home_dir_files - ): - wrong_path = urljoin(home_dir_path, "foo") - body = f"{{'detail':'No such file or directory: {wrong_path}'}}" + def test_raises_because_wrong_path_provided(self, api_token, api_responses): + wrong_path = "/foo" + wrong_url = urljoin(self.base_url, f"path{wrong_path}") + body = bytes(f'{{"detail": "No such file or directory: {wrong_path}"}}', "utf") api_responses.add( responses.GET, - url=wrong_path, + url=wrong_url, status=404, body=body, + headers={"Content-Type": "application/json"}, + ) + + with pytest.raises(Exception) as e: + Files().path_get(wrong_path) + + expected_error_msg = ( + f"GET to fetch contents of {wrong_url} failed, got : " + f"No such file or directory: {wrong_path}" + ) + assert str(e.value) == expected_error_msg + + +@pytest.mark.files +class TestFilesPostPath(TestFiles): + def test_returns_200_when_file_updated(self, api_token, api_responses): + existing_file_path = f"{self.home_dir_path}/README.txt" + existing_file_url = self.default_home_dir_files["README.txt"]["url"] + api_responses.add( + responses.POST, + url=existing_file_url, + status=200, + ) + + result = Files().path_post(existing_file_path, "new contents\n", as_string=True) + + assert result == 200 + + def test_returns_201_when_file_uploaded(self, api_token, api_responses): + new_file_path = f"{self.home_dir_path}/new.txt" + new_file_url = f"{self.base_url}path{self.home_dir_path}/new.txt" + api_responses.add( + responses.POST, + url=new_file_url, + status=201, + ) + + result = Files().path_post(new_file_path, "new contents\n", as_string=True) + + assert result == 201 + + def test_raises_when_wrong_path(self, api_token, api_responses): + invalid_path = "foo" + url_with_invalid_path = urljoin(self.base_url, f"path{invalid_path}") + api_responses.add( + responses.POST, + url=url_with_invalid_path, + status=404, ) with pytest.raises(Exception) as e: - Files().get_path(wrong_path) + Files().path_post(invalid_path, "contents", as_string=True) expected_error_msg = ( - f"GET to fetch contents of {wrong_path} failed, got : {body}" + f"POST to upload contents to {url_with_invalid_path} failed, got " ) assert str(e.value) == expected_error_msg + @patch("os.path.isfile") + def test_raises_if_source_is_not_a_file_when_not_using_string(self, mock_isfile): + mock_isfile.return_value = False + dest_filepath = urljoin(self.home_dir_path, "README.txt") + valid_endpoint = urljoin(self.base_url, f"path{dest_filepath}") + + with pytest.raises(Exception) as e: + Files().path_post(valid_endpoint, "/xyz/zyx", as_string=False) + + assert str(e.value) == "Source should be an existing file or a string" + + def test_raises_when_no_contents(self, api_token, api_responses): + valid_path = f"{self.home_dir_path}/README.txt" + valid_url = urljoin(self.base_url, f"path{valid_path}") + body = bytes('{"detail": "You must provide a file with the name \'content\'."}', "utf") + api_responses.add( + responses.POST, + url=valid_url, + status=400, + body=body, + headers={"Content-Type": "application/json"}, + ) + + with pytest.raises(Exception) as e: + Files().path_post(valid_path, None, as_string=True) + + expected_error_msg = ( + f"POST to upload contents to {valid_url} failed, got : " + "You must provide a file with the name 'content'." + ) + assert str(e.value) == expected_error_msg