From c9664953dce9a52499a9a6b49138257c2bb43e7f Mon Sep 17 00:00:00 2001 From: ScottDemarest <38106816+ScottDemarest@users.noreply.github.com> Date: Fri, 22 Jul 2022 14:36:06 -0400 Subject: [PATCH] File ops feature (#7) * file system functions added * Passes old tests * mkdir fixed mkdir and mkdirs fixed update mode removed passes tests * stuff * passes broken server test * info() and exists() * working dir cache * timeouts added * invalidate cache * exceptions changed * cache issues fixed * walk, find, glob, du tests added * touch() works * Most changes made * lint * all changes implemented * lint --- src/fsspec_xrootd/xrootd.py | 212 +++++++++++++++++++++++++++++------ tests/test_basicio.py | 215 +++++++++++++++++++++++++++++++++--- 2 files changed, 380 insertions(+), 47 deletions(-) diff --git a/src/fsspec_xrootd/xrootd.py b/src/fsspec_xrootd/xrootd.py index 3217e06..0b39a7a 100644 --- a/src/fsspec_xrootd/xrootd.py +++ b/src/fsspec_xrootd/xrootd.py @@ -1,30 +1,56 @@ from __future__ import annotations import io +import os.path import warnings +from enum import IntEnum from typing import Any +from fsspec.dircache import DirCache # type: ignore[import] from fsspec.spec import AbstractBufferedFile, AbstractFileSystem # type: ignore[import] from XRootD import client # type: ignore[import] from XRootD.client.flags import ( # type: ignore[import] DirListFlags, + MkDirFlags, OpenFlags, StatInfoFlags, ) +class ErrorCodes(IntEnum): + INVALID_PATH = 400 + + class XRootDFileSystem(AbstractFileSystem): # type: ignore[misc] protocol = "root" root_marker = "/" + default_timeout = 60 - def __init__(self, *args: list[Any], **storage_options: str) -> None: + def __init__(self, *args: list[Any], **storage_options: Any) -> None: + self.timeout = storage_options.get("timeout", XRootDFileSystem.default_timeout) self._path = storage_options["path"] self._myclient = client.FileSystem( storage_options["protocol"] + "://" + storage_options["hostid"] ) - self.storage_options = storage_options + status, _n = self._myclient.ping(15) + if not status.ok: + raise OSError(f"Could not connect to server {storage_options['hostid']}") self._intrans = False + self.dircache = DirCache( + use_listings_cache=True, + listings_expiry_time=storage_options.get("listings_expiry_time", 0), + ) + self.storage_options = storage_options + + def invalidate_cache(self, path: str | None = None) -> None: + if path is None: + self.dircache.clear() + else: + try: + del self.dircache[path] + except KeyError: + pass @staticmethod def _get_kwargs_from_urls(u: str) -> dict[Any, Any]: @@ -43,39 +69,157 @@ def _get_kwargs_from_urls(u: str) -> dict[Any, Any]: } @classmethod - def _strip_protocol(cls, path: str) -> Any: - url = client.URL(path) + def _strip_protocol(cls, path: str | list[str]) -> Any: + if type(path) == str: + return client.URL(path).path + elif type(path) == list: + return [client.URL(item).path for item in path] + else: + raise ValueError("Strip protocol not given string or list") - return url.path + def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None: + if create_parents: + status, n = self._myclient.mkdir( + path, MkDirFlags.MAKEPATH, timeout=self.timeout + ) + else: + status, n = self._myclient.mkdir(path, timeout=self.timeout) + if not status.ok: + raise OSError(f"Directory not made properly: {status.message}") - def ls(self, path: str, detail: bool = True, **kwargs: Any) -> list[Any]: + def makedirs(self, path: str, exist_ok: bool = False) -> None: + if not exist_ok: + if self.exists(path): + raise OSError( + "Location already exists and exist_ok arg was set to false" + ) + status, n = self._myclient.mkdir( + path, MkDirFlags.MAKEPATH, timeout=self.timeout + ) + if not status.ok and not (status.code == ErrorCodes.INVALID_PATH and exist_ok): + raise OSError(f"Directory not made properly: {status.message}") - stats, deets = self._myclient.dirlist(path, DirListFlags.STAT) + def rmdir(self, path: str) -> None: + status, n = self._myclient.rmdir(path, timeout=self.timeout) + if not status.ok: + raise OSError(f"Directory not removed properly: {status.message}") - listing = [] + def _rm(self, path: str) -> None: + status, n = self._myclient.rm(path, timeout=self.timeout) + if not status.ok: + raise OSError(f"File not removed properly: {status.message}") - if detail: - for item in deets: - t = "" - if item.statinfo.flags and StatInfoFlags.IS_DIR: - t = "directory" - elif item.statinfo.flags and StatInfoFlags.OTHER: - t = "other" - else: - t = "file" + def touch(self, path: str, truncate: bool = False, **kwargs: Any) -> None: + if truncate or not self.exists(path): + status, _ = self._myclient.truncate(path, 0, timeout=self.timeout) + if not status.ok: + raise OSError(f"File not touched properly: {status.message}") + else: + status, _ = self._myclient.truncate( + path, self.info(path).get("size"), timeout=self.timeout + ) + if not status.ok: + raise OSError(f"File not touched properly: {status.message}") - listing.append( - { - "name": path + "/" + item.name, - "size": item.statinfo.size, - "type": t, + def modified(self, path: str) -> Any: + status, statInfo = self._myclient.stat(path, timeout=self.timeout) + return statInfo.modtime + + def exists(self, path: str, **kwargs: Any) -> bool: + if path in self.dircache: + return True + else: + status, _ = self._myclient.stat(path, timeout=self.timeout) + if status.code == ErrorCodes.INVALID_PATH: + return False + elif not status.ok: + raise OSError(f"status check failed with message: {status.message}") + return True + + def info(self, path: str, **kwargs: Any) -> dict[str, Any]: + spath = os.path.split(path) + deet = self._ls_from_cache(spath[0]) + if deet is not None: + for item in deet: + if item["name"] == path: + return { + "name": path, + "size": item["size"], + "type": item["type"], } - ) + raise OSError("_ls_from_cache() failed to function") else: - for item in deets: - listing.append(item.name) + status, deet = self._myclient.stat(path, timeout=self.timeout) + if not status.ok: + raise OSError(f"File stat request failed: {status.message}") + if deet.flags & StatInfoFlags.IS_DIR: + ret = { + "name": path, + "size": deet.size, + "type": "directory", + } + elif deet.flags & StatInfoFlags.OTHER: + ret = { + "name": path, + "size": deet.size, + "type": "other", + } + else: + ret = { + "name": path, + "size": deet.size, + "type": "file", + } + return ret - return listing + def ls(self, path: str, detail: bool = True, **kwargs: Any) -> list[Any]: + listing = [] + if path in self.dircache and not kwargs.get("force_update", False): + if detail: + listing = self._ls_from_cache(path) + return listing + else: + return [ + os.path.basename(item["name"]) for item in self._ls_from_cache(path) + ] + else: + status, deets = self._myclient.dirlist( + path, DirListFlags.STAT, timeout=self.timeout + ) + if not status.ok: + raise OSError( + f"Server failed to provide directory info: {status.message}" + ) + for item in deets: + if item.statinfo.flags & StatInfoFlags.IS_DIR: + listing.append( + { + "name": path + "/" + item.name, + "size": item.statinfo.size, + "type": "directory", + } + ) + elif item.statinfo.flags & StatInfoFlags.OTHER: + listing.append( + { + "name": path + "/" + item.name, + "size": item.statinfo.size, + "type": "other", + } + ) + else: + listing.append( + { + "name": path + "/" + item.name, + "size": item.statinfo.size, + "type": "file", + } + ) + self.dircache[path] = listing + if detail: + return listing + else: + return [os.path.basename(item["name"].rstrip("/")) for item in listing] def _open( self, @@ -166,13 +310,13 @@ def __init__( ) -> None: from fsspec.core import caches + self.timeout = fs.timeout # by this point, mode will have a "b" in it + # update "+" mode removed for now since seek() is read only if "x" in mode: self.mode = OpenFlags.NEW elif "a" in mode: self.mode = OpenFlags.UPDATE - elif "+" in mode: - self.mode = OpenFlags.UPDATE elif "w" in mode: self.mode = OpenFlags.DELETE elif "r" in mode: @@ -187,6 +331,7 @@ def __init__( + "/" + path, self.mode, + timeout=self.timeout, ) if not status.ok: @@ -194,7 +339,7 @@ def __init__( self.metaOffset = 0 if "a" in mode: - _stats, _deets = self._myFile.stat() + _stats, _deets = self._myFile.stat(timeout=self.timeout) self.metaOffset = _deets.size self.path = path @@ -241,7 +386,7 @@ def __init__( def _fetch_range(self, start: int, end: int) -> Any: status, data = self._myFile.read( - self.metaOffset + start, self.metaOffset + end - start + self.metaOffset + start, self.metaOffset + end - start, timeout=self.timeout ) if not status.ok: raise OSError(f"File did not read properly: {status.message}") @@ -269,7 +414,10 @@ def flush(self, force: bool = False) -> None: def _upload_chunk(self, final: bool = False) -> Any: status, _n = self._myFile.write( - self.buffer.getvalue(), self.offset + self.metaOffset, self.buffer.tell() + self.buffer.getvalue(), + self.offset + self.metaOffset, + self.buffer.tell(), + timeout=self.timeout, ) if final: self.closed @@ -292,7 +440,7 @@ def close(self) -> None: if self.fs is not None: self.fs.invalidate_cache(self.path) self.fs.invalidate_cache(self.fs._parent(self.path)) - status, _n = self._myFile.close() + status, _n = self._myFile.close(timeout=self.timeout) if not status.ok: raise OSError(f"File did not close properly: {status.message}") self.closed = True diff --git a/tests/test_basicio.py b/tests/test_basicio.py index bd18b24..c613d40 100644 --- a/tests/test_basicio.py +++ b/tests/test_basicio.py @@ -8,15 +8,17 @@ import fsspec import pytest -TESTDATA = "apple\nbanana\norange\ngrape" -TESTWRITEDATA = "the end is never the end is never the end" +TESTDATA1 = "apple\nbanana\norange\ngrape" +TESTDATA2 = "red\ngreen\nyellow\nblue" +sleep_time = 0.2 +expiry_time = 0.1 @pytest.fixture(scope="module") def localserver(tmpdir_factory): srvdir = tmpdir_factory.mktemp("srv") with open(srvdir.join("testfile.txt"), "w") as fout: - fout.write(TESTDATA) + fout.write(TESTDATA1) xrdexe = shutil.which("xrootd") proc = subprocess.Popen([xrdexe, srvdir]) @@ -26,9 +28,8 @@ def localserver(tmpdir_factory): proc.wait(timeout=10) -@pytest.mark.skip("not implemented") def test_broken_server(): - with pytest.raises(IOError): + with pytest.raises(OSError): # try to connect on the wrong port should fail _ = fsspec.open("root://localhost:12345/") @@ -52,13 +53,13 @@ def test_read_xrd(localserver): status, res = f.read() if not status.ok: raise RuntimeError(status) - assert res.decode("ascii") == TESTDATA + assert res.decode("ascii") == TESTDATA1 f.close() def test_read_fsspec(localserver): with fsspec.open(localserver + "/testfile.txt", "rt") as f: - assert f.read() == TESTDATA + assert f.read() == TESTDATA1 f.seek(0) assert f.readline() == "apple\n" f.seek(0) @@ -72,19 +73,203 @@ def test_read_fsspec(localserver): fs, token, path = fsspec.get_fs_token_paths(localserver + "/testfile.txt", "rt") assert fs.read_block(path[0], 0, 4) == b"appl" + fs.rm(path[0], True) def test_write_fsspec(localserver): - with fsspec.open(localserver + "/testfile2.txt", "wt") as f: - f.write(TESTWRITEDATA) + with fsspec.open(localserver + "/testfile.txt", "wt") as f: + f.write(TESTDATA1) f.flush() - with fsspec.open(localserver + "/testfile2.txt", "rt") as f: - assert f.read() == TESTWRITEDATA + with fsspec.open(localserver + "/testfile.txt", "rt") as f: + assert f.read() == TESTDATA1 + fs, token, path = fsspec.get_fs_token_paths(localserver + "/testfile.txt", "rt") + fs.rm(path[0], True) def test_append_fsspec(localserver): - with fsspec.open(localserver + "/testfile2.txt", "at") as f: - f.write(TESTWRITEDATA) + with fsspec.open(localserver + "/testfile.txt", "wt") as f: + f.write(TESTDATA1) + f.flush() + with fsspec.open(localserver + "/testfile.txt", "at") as f: + f.write(TESTDATA2) + f.flush() + with fsspec.open(localserver + "/testfile.txt", "rt") as f: + assert f.read() == TESTDATA1 + TESTDATA2 + fs, token, path = fsspec.get_fs_token_paths(localserver + "/testfile.txt", "rt") + fs.rm(path[0], True) + + +@pytest.mark.parametrize("cache_expiry", [0, expiry_time]) +def test_mk_and_rm_dir_fsspec(localserver, cache_expiry): + with fsspec.open(localserver + "/Folder1/testfile1.txt", "wt") as f: + f.write(TESTDATA2) + f.flush() + with fsspec.open(localserver + "/Folder2/testfile2.txt", "wt") as f: + f.write(TESTDATA2) + f.flush() + fs, token, path = fsspec.get_fs_token_paths( + localserver, "rt", storage_options={"listings_expiry_time": cache_expiry} + ) + time.sleep(sleep_time) + + assert set(fs.ls(path[0], False)) == {"Folder1", "Folder2"} + fs.mkdir(path[0] + "/Folder3/Folder33") + time.sleep(sleep_time) + assert set(fs.ls(path[0], False)) == { + "Folder1", + "Folder2", + "Folder3", + } + + with pytest.raises(OSError): + fs.mkdir(path[0] + "/Folder4/Folder44", False) + + fs.mkdirs(path[0] + "/Folder4/Folder44") + time.sleep(sleep_time) + assert set(fs.ls(path[0], False)) == { + "Folder1", + "Folder2", + "Folder3", + "Folder4", + } + fs.mkdirs(path[0] + "/Folder4", True) + time.sleep(sleep_time) + with pytest.raises(OSError): + fs.mkdirs(path[0] + "/Folder4", False) + + fs.rm(path[0] + "/Folder4", True) + time.sleep(sleep_time) + assert set(fs.ls(path[0], False)) == { + "Folder1", + "Folder2", + "Folder3", + } + + with pytest.raises(OSError): + fs.rm(path[0] + "/Folder3", False) + with pytest.raises(OSError): + fs.rmdir(path[0] + "/Folder3") + fs.rm(path[0] + "/Folder1", True) + fs.rm(path[0] + "/Folder2", True) + fs.rm(path[0] + "/Folder3", True) + + +def test_touch_modified(localserver): + time.sleep(sleep_time) + with fsspec.open(localserver + "/testfile.txt", "wt") as f: + f.write(TESTDATA1) + f.flush() + fs, token, path = fsspec.get_fs_token_paths( + localserver, "rt", storage_options={"listings_expiry_time": expiry_time} + ) + t1 = fs.modified(path[0] + "/testfile.txt") + assert fs.read_block(path[0] + "/testfile.txt", 0, 4) == b"appl" + time.sleep(1) + fs.touch(path[0] + "/testfile.txt", False) + t2 = fs.modified(path[0] + "/testfile.txt") + assert fs.read_block(path[0] + "/testfile.txt", 0, 4) == b"appl" + time.sleep(1) + fs.touch(path[0] + "/testfile.txt", True) + t3 = fs.modified(path[0] + "/testfile.txt") + assert fs.read_block(path[0] + "/testfile.txt", 0, 4) == b"" + assert t1 < t2 and t2 < t3 + fs.rm(path[0] + "/testfile.txt", True) + + +def test_dir_cache(localserver): + fs, token, path = fsspec.get_fs_token_paths( + localserver, "rt", storage_options={"listings_expiry_time": expiry_time} + ) + fs.mkdir(path[0] + "/Folder1") + fs.mkdir(path[0] + "/Folder2") + time.sleep(sleep_time) + dirs = fs.ls(path[0], True) + dirs_cached = fs._ls_from_cache(path[0]) + assert dirs == dirs_cached + fs.rm(path[0] + "/Folder1") + fs.rm(path[0] + "/Folder2") + + +@pytest.mark.parametrize("cache_expiry", [0, expiry_time]) +def test_info(localserver, cache_expiry): + with fsspec.open(localserver + "/testfile.txt", "wt") as f: + f.write(TESTDATA1) + f.flush() + fs, token, path = fsspec.get_fs_token_paths( + localserver, "rt", storage_options={"listings_expiry_time": cache_expiry} + ) + time.sleep(sleep_time) + assert fs.info(path[0] + "/testfile.txt") in fs.ls(path[0], True) + _ = fs.ls(path[0], True) + assert fs.info(path[0] + "/testfile.txt") in fs.ls(path[0], True) + fs.rm(path[0] + "/testfile.txt") + + +@pytest.mark.parametrize("cache_expiry", [0, expiry_time]) +def test_walk_find(localserver, cache_expiry): + fs, token, path = fsspec.get_fs_token_paths( + localserver, "rt", storage_options={"listings_expiry_time": cache_expiry} + ) + with fsspec.open(localserver + "/WalkFolder/testfile1.txt", "wt") as f: + f.write(TESTDATA2) + f.flush() + with fsspec.open(localserver + "/WalkFolder/InnerFolder/testfile2.txt", "wt") as f: + f.write(TESTDATA2) + f.flush() + out = fs.walk(path[0] + "/WalkFolder") + listing = [] + for item in out: + listing.append(item) + assert listing == [ + (path[0] + "/WalkFolder", ["InnerFolder"], ["testfile1.txt"]), + (path[0] + "/WalkFolder/InnerFolder", [], ["testfile2.txt"]), + ] + # unable to use sets here^, would rather + out = fs.find(path[0] + "/WalkFolder") + listing = [] + for item in out: + listing.append(item) + assert set(listing) == { + path[0] + "/WalkFolder/InnerFolder/testfile2.txt", + path[0] + "/WalkFolder/testfile1.txt", + } + fs.rm(path[0] + "/WalkFolder", True) + + +@pytest.mark.parametrize("cache_expiry", [0, expiry_time]) +def test_du(localserver, cache_expiry): + with fsspec.open(localserver + "/WalkFolder/testfile1.txt", "wt") as f: + f.write(TESTDATA2) + f.flush() + with fsspec.open(localserver + "/WalkFolder/InnerFolder/testfile2.txt", "wt") as f: + f.write(TESTDATA2) + f.flush() + fs, token, path = fsspec.get_fs_token_paths( + localserver, "rt", storage_options={"listings_expiry_time": cache_expiry} + ) + assert fs.du(path[0] + "/WalkFolder", False) == { + path[0] + "/WalkFolder/InnerFolder/testfile2.txt": 21, + path[0] + "/WalkFolder/testfile1.txt": 21, + } + assert fs.du(path[0] + "/WalkFolder", True) == 42 + fs.rm(path[0] + "/WalkFolder", True) + + +@pytest.mark.parametrize("cache_expiry", [0, expiry_time]) +def test_glob(localserver, cache_expiry): + with fsspec.open(localserver + "/WalkFolder/testfile1.txt", "wt") as f: + f.write(TESTDATA2) + f.flush() + with fsspec.open(localserver + "/WalkFolder/testfile2.txt", "wt") as f: + f.write(TESTDATA2) f.flush() - with fsspec.open(localserver + "/testfile2.txt", "rt") as f: - assert f.read() == TESTWRITEDATA + TESTWRITEDATA + time.sleep(sleep_time) + fs, token, path = fsspec.get_fs_token_paths( + localserver, "rt", storage_options={"listings_expiry_time": cache_expiry} + ) + print(fs.glob(path[0] + "/*.txt")) + assert set(fs.glob(path[0] + "/WalkFolder/*.txt")) == { + path[0] + "/WalkFolder/testfile1.txt", + path[0] + "/WalkFolder/testfile2.txt", + } + fs.rm(path[0] + "/WalkFolder", True)