Skip to content

Commit

Permalink
implement a faster dataset traverser
Browse files Browse the repository at this point in the history
  • Loading branch information
christian-monch committed Mar 23, 2023
1 parent 5d55cfc commit 7cea3e2
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 60 deletions.
11 changes: 2 additions & 9 deletions datalad_metalad/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from datalad.ui import ui

from .extractors.base import (
AnnexedFileInfo,
BaseMetadataExtractor,
DataOutputCategory,
DatasetMetadataExtractor,
Expand Down Expand Up @@ -279,11 +278,7 @@ def __call__(
file_info = json.load(f)
else:
file_info = json.loads(file_info)
file_info = (
AnnexedFileInfo.from_dict(file_info)
if "key" in file_info
else FileInfo.from_dict(file_info)
)
file_info = FileInfo.from_dict(file_info)

source_dataset = check_dataset(dataset or curdir, "extract metadata")

Expand Down Expand Up @@ -613,10 +608,8 @@ def get_file_info(dataset: Dataset,

# noinspection PyUnresolvedReferences
return FileInfo(
type="file", # TODO: what about the situation where path_status["type"] == "symlink"?
type="file",
gitshasum=path_status["gitshasum"],
prev_gitshasum=path_status["prev_gitshasum"],
bytesize=path_status.get("bytesize", 0),
state=path_status["state"],
dataset_path=path_status["parentds"],
path=path_status["path"], # Absolute path, used by extractors
Expand Down
15 changes: 0 additions & 15 deletions datalad_metalad/extractors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
class DatasetInfo:
type: str
gitshasum: str
prev_gitshasum: str
state: str
path: str
dataset_path: str
Expand All @@ -43,20 +42,6 @@ class DatasetInfo:
@dataclass
class FileInfo(DatasetInfo):
intra_dataset_path: str
bytesize: int


@dataclass_json
@dataclass
class AnnexedFileInfo(FileInfo):
humansize: str
key: str
backend: str
mtime: str
keyname: str
has_content: bool
hashdirlower: str = ""
hashdirmixed: str = ""


@dataclass
Expand Down
113 changes: 77 additions & 36 deletions datalad_metalad/pipeline/provider/datasettraverse.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
require_dataset,
resolve_path,
)
from datalad.runner import GitRunner
from datalad.runner.coreprotocols import StdOutErrCapture
from datalad.runner.nonasyncrunner import STDOUT_FILENO, STDERR_FILENO
from datalad.runner.protocol import GeneratorMixIn
from datalad.runner.utils import LineSplitter
from datalad.support.annexrepo import AnnexRepo
from datalad.support.constraints import (
EnsureBool,
Expand All @@ -40,7 +45,6 @@
ResultState,
)
from ...extractors.base import (
AnnexedFileInfo,
DatasetInfo,
FileInfo,
)
Expand Down Expand Up @@ -80,7 +84,7 @@ class DatasetTraverseResult(PipelineResult):
root_dataset_id: Optional[str] = None
root_dataset_version: Optional[str] = None
message: Optional[str] = ""
element_info: Optional[AnnexedFileInfo | DatasetInfo | FileInfo] = None
element_info: Optional[DatasetInfo | FileInfo] = None

def to_dict(self) -> dict:

Expand Down Expand Up @@ -132,6 +136,57 @@ def get_annexstatus(ds, paths=None):
return info


def ls_files(dataset: Dataset) -> Generator:
class GeneratorStdOutErrCapture(StdOutErrCapture, GeneratorMixIn):
def pipe_data_received(self, fd, data):
self.send_result((fd, data))

line_splitter = LineSplitter()
runner = GitRunner()
generator = runner.run(
['git', 'ls-files', '-s', '-m', '-t', '--exclude-standard'],
protocol=GeneratorStdOutErrCapture,
cwd=dataset.repo.pathobj
)
stderr = bytearray()
for file_number, data in generator:
if file_number == STDERR_FILENO:
stderr += data
continue
for line in line_splitter.process(data.decode()):
yield line

data = line_splitter.finish_processing()
if data:
yield data


def ls_struct(dataset: Dataset) -> dict[Path, dict]:

flag_2_type = {
"100644": "file",
"120000": "file",
"160000": "dataset",
}

tag_2_status = {
"C": "modified",
"H": "clean",
}

result = {}
for line in ls_files(dataset):
tag, flag, shasum, number, path = line.split()
full_path = dataset.repo.pathobj / path
result[full_path] = {
"type": flag_2_type[flag],
"path": full_path,
"gitshasum": shasum,
"state": tag_2_status[tag],
}
return result


class DatasetTraverser(Provider):

name_to_item_set = {
Expand Down Expand Up @@ -193,27 +248,35 @@ def __init__(self,
self.seen = dict()
self.annex_info = None

def _already_visited(self, dataset: Dataset, relative_element_path: Path):
def _already_visited(self,
dataset: Dataset,
relative_element_path: Path
) -> bool:
if dataset.id not in self.seen:
self.seen[dataset.id] = set()
if relative_element_path in self.seen[dataset.id]:
lgr.info(f"ignoring already visited element: "
f"{dataset.id}:{relative_element_path}\t"
f"({dataset.repo.pathobj / relative_element_path})")
lgr.debug(
"ignoring already visited element: %s:%s\t%s",
dataset.id, relative_element_path,
dataset.repo.pathobj / relative_element_path
)
return True
self.seen[dataset.id].add(relative_element_path)
return False

def _get_base_dataset_result(self,
dataset: Dataset,
id_key: str = "dataset_id",
version_key: str = "dataset_version"):
version_key: str = "dataset_version"
) -> dict[str, str]:
return {
id_key: str(dataset.id),
version_key: str(dataset.repo.get_hexsha())
}

def _get_dataset_result_part(self, dataset: Dataset):
def _get_dataset_result_part(self,
dataset: Dataset
) -> dict[str, str | Path]:
if dataset.pathobj == self.fs_base_path:
return {
"dataset_path": Path(""),
Expand All @@ -230,19 +293,6 @@ def _get_dataset_result_part(self, dataset: Dataset):
)
}

def get_annex_file_info(self,
annex_repo: AnnexRepo,
dataset_path: Path
) -> list:
annex_status = get_annexstatus(annex_repo)
return [
AnnexedFileInfo.from_annex_status(status, path, str(Path(path).relative_to(dataset_path)))
if len(status) == 13
else FileInfo.from_dict({**status, "dataset_path": str(dataset_path), "path": str(path), "intra_dataset_path": str(Path(path).relative_to(dataset_path)), "bytesize": status["bytesize"]})
for path, status in annex_status.items()
if status["state"] != "untracked"
]

def _traverse_dataset(self, dataset_path: Path) -> Generator:
"""Traverse all elements of dataset, and potentially its subdatasets.
Expand Down Expand Up @@ -278,16 +328,8 @@ def _traverse_dataset(self, dataset_path: Path) -> Generator:
))

if TraversalType.FILE in self.item_set:
if isinstance(dataset.repo, AnnexRepo):
status = get_annexstatus
else:
status = GitRepo.status

for path_str, element_info in status(dataset.repo).items():
if element_info["state"] == "untracked":
continue
for element_path, element_info in ls_struct(dataset).items():
if element_info["type"] == "file":
element_path = Path(path_str)
if self.is_excluded(element_path):
lgr.debug(f"Ignoring excluded path {element_path}")
continue
Expand All @@ -311,8 +353,9 @@ def _traverse_dataset(self, dataset_path: Path) -> Generator:
yield from self._traverse_dataset(submodule_info["path"])
else:
lgr.debug(
f"ignoring not installed sub-dataset at "
f"{submodule_path}")
"ignoring not installed sub-dataset at %s",
submodule_path
)
return

def is_excluded(self, path: Path) -> bool:
Expand All @@ -335,7 +378,7 @@ def _get_element_info_object(self,
dataset_path: str,
element_path: Path,
element_info: dict
) -> AnnexedFileInfo | DatasetInfo | FileInfo:
) -> DatasetInfo | FileInfo:
dataset_keys = {
"path": str(element_path),
"dataset_path": dataset_path
Expand All @@ -348,9 +391,7 @@ def _get_element_info_object(self,
**dataset_keys,
"intra_dataset_path": str(intra_dataset_path)
}
if len(element_info) == 5:
return FileInfo.from_dict({**element_info, **file_keys})
return AnnexedFileInfo.from_dict({**element_info, **file_keys})
return FileInfo.from_dict({**element_info, **file_keys})

def _generate_result(self,
dataset: Dataset,
Expand Down

0 comments on commit 7cea3e2

Please sign in to comment.