Skip to content

Commit

Permalink
Basic tests for TarArchiveOperations
Browse files Browse the repository at this point in the history
Modelled after the `ZipArchiveOperations` tests in
datalad#407
  • Loading branch information
mih committed Jun 11, 2023
1 parent 3cb5af6 commit f5ba968
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 5 deletions.
9 changes: 4 additions & 5 deletions datalad_next/archive_operations/tarfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
IO,
)

from datalad_next.config import ConfigManager
# TODO we might just want to do it in reverse:
# move the code of `iter_tar` in here and have it call
# `TarArchiveOperations(path).__iter__()` instead.
Expand All @@ -27,13 +28,12 @@
)

from . import ArchiveOperations
from datalad_next.config import ConfigManager

lgr = logging.getLogger('datalad.ext.next.archive_operations.tarfile')


class TarArchiveOperations(ArchiveOperations):
"""
"""Handler for a TAR archive on a local file system
"""
def __init__(self, location: Path, *, cfg: ConfigManager | None = None):
"""
Expand Down Expand Up @@ -66,9 +66,8 @@ def close(self) -> None:

@contextmanager
def open(self, item: Any) -> IO:
"""
"""
yield self.tarfile.extractfile(str(item))
with self.tarfile.extractfile(str(item)) as fp:
yield fp

def __contains__(self, item: Any) -> bool:
try:
Expand Down
Empty file.
81 changes: 81 additions & 0 deletions datalad_next/archive_operations/tests/test_tarfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from __future__ import annotations

from dataclasses import dataclass
from pathlib import (
Path,
PurePosixPath,
)
from typing import Generator

import pytest

from datalad_next.iter_collections.utils import FileSystemItemType

from ..tarfile import TarArchiveOperations


@dataclass
class TestArchive:
path: Path
item_count: int
content: bytes
target_hash: dict[str, str]


@pytest.fixture(scope='session')
def structured_sample_tar_xz(
sample_tar_xz
) -> Generator[TestArchive, None, None]:
yield TestArchive(
path=sample_tar_xz,
item_count=6,
content=b'123\n',
target_hash={
'SHA1': 'b5dfcec4d1b6166067226fae102f7fbcf6bd1bd4',
'md5': 'd700214df5487801e8ee23d31e60382a',
}
)


def test_tararchive_basics(structured_sample_tar_xz: TestArchive):
tartest = structured_sample_tar_xz
# this is intentionally a hard-coded POSIX relpath
member_name = 'test-archive/onetwothree.txt'
with TarArchiveOperations(tartest.path) as archive_ops:
with archive_ops.open(member_name) as member:
assert member.read() == tartest.content


def test_tararchive_contain(structured_sample_tar_xz: TestArchive):
# this is intentionally a hard-coded POSIX relpath
member_name = 'test-archive/onetwothree.txt'
archive_ops = TarArchiveOperations(structured_sample_tar_xz.path)
assert member_name in archive_ops
assert 'bogus' not in archive_ops


def test_tararchive_iterator(structured_sample_tar_xz: TestArchive):
with TarArchiveOperations(structured_sample_tar_xz.path) as archive_ops:
items = list(archive_ops)
assert len(items) == structured_sample_tar_xz.item_count
for item in items:
item_name = (
# TODO should not require this conversion
str(PurePosixPath(item.name)) +
'/' if item.type == FileSystemItemType.directory
else str(PurePosixPath(item.name))
)
assert item_name in archive_ops


def test_open(structured_sample_tar_xz: TestArchive):
archive_ops = TarArchiveOperations(structured_sample_tar_xz.path)
file_pointer = set()
for item in list(archive_ops):
if item.type == FileSystemItemType.file:
with archive_ops.open(str(PurePosixPath(item.name))) as fp:
file_pointer.add(fp)
assert fp.read(len(structured_sample_tar_xz.content)) == structured_sample_tar_xz.content
for fp in file_pointer:
assert fp.closed is True
archive_ops.close()

0 comments on commit f5ba968

Please sign in to comment.