Skip to content

Commit

Permalink
Use pathlib instead of beets's path handling
Browse files Browse the repository at this point in the history
Internally, all paths are represented by `Path` instances instead of
`bytes`.

We replace the following beets utility functions with standard library
versions that handle `Path`s.

- `beets.util.mkdirall` -> `Path.mkdir`
- `beets.util.link` -> `Path.symlink_to`
- `beets.util.copy` -> `shutil.copfyile`
- `beets.util.samefile` -> `Path.__eq__ `
  • Loading branch information
geigerzaehler committed Jul 11, 2024
1 parent 4934186 commit f3bd54d
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 108 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Change Log
==========

## Upcoming
* Consistently use Unicode paths to alternative items. This may result and
collection updates and orphaned files in alternatives. It may also improve
usability on non-standard file systems (see [#74]).

[#74]: https://github.com/geigerzaehler/beets-alternatives/issues/74

## v0.12.0 - 2024-06-25
* Fix an issue where items in a symlink collection with relative links were
always unnecessarily updated.
Expand Down
182 changes: 76 additions & 106 deletions beetsplug/alternatives.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,24 @@
import argparse
import logging
import os.path
import shutil
import threading
import traceback
from concurrent import futures
from enum import Enum
from typing import Callable, Iterable, Iterator, Optional, Sequence, Set, Tuple, cast
from pathlib import Path
from typing import Callable, Iterable, Iterator, Optional, Sequence, Set, Tuple

import beets
import confuse
from beets import art, util
from beets.library import Item, Library, parse_query_string
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand, UserError, decargs, get_path_formats, input_yn, print_
from beets.util import FilesystemError, bytestring_path, displayable_path, syspath
from typing_extensions import Never, override

import beetsplug.convert as convert


def _remove(path_: bytes, soft: bool = True):
"""Remove the file. If `soft`, then no error will be raised if the
file does not exist.
In contrast to beets' util.remove, this uses lexists such that it can
actually remove symlink links.
"""
path = syspath(path_)
if soft and not os.path.lexists(path):
return
try:
os.remove(path)
except OSError as exc:
raise FilesystemError(exc, "delete", (path,), traceback.format_exc()) from exc


class AlternativesPlugin(BeetsPlugin):
def __init__(self):
super().__init__()
Expand Down Expand Up @@ -198,52 +183,49 @@ def parse_config(self, config: confuse.ConfigView):
self.removable = config.get(dict).get("removable", True) # type: ignore

if "directory" in config:
dir = config["directory"].as_str()
assert isinstance(dir, str)
dir = config["directory"].as_path()
assert isinstance(dir, Path)
else:
dir = self.name
dir = bytestring_path(dir)
if not os.path.isabs(syspath(dir)):
dir = os.path.join(self.lib.directory, dir)
dir = Path(self.name)
if not dir.is_absolute():
dir = Path(str(self.lib.directory, "utf8")) / dir # type: ignore
self.directory = dir

def item_change_actions(
self, item: Item, path: bytes, dest: bytes
self, item: Item, actual: Path, dest: Path
) -> Sequence[Action]:
"""Returns the necessary actions for items that were previously in the
external collection, but might require metadata updates.
"""
actions = []

if not util.samefile(path, dest):
if actual != dest:
actions.append(Action.MOVE)

item_mtime_alt = os.path.getmtime(syspath(path))
if item_mtime_alt < os.path.getmtime(syspath(item.path)):
item_mtime_alt = actual.stat().st_mtime
if item_mtime_alt < Path(str(item.path, "utf8")).stat().st_mtime:
actions.append(Action.WRITE)
album = item.get_album()

if (
album
and album.artpath
and os.path.isfile(syspath(album.artpath))
and (item_mtime_alt < os.path.getmtime(syspath(album.artpath)))
and Path(str(album.artpath, "utf8")).is_file()
and (item_mtime_alt < Path(str(album.artpath, "utf8")).stat().st_mtime)
):
actions.append(Action.SYNC_ART)

return actions

def _matched_item_action(self, item: Item) -> Sequence[Action]:
path = self._get_stored_path(item)
if path and os.path.lexists(syspath(path)):
actual = self._get_stored_path(item)
if actual and (actual.is_file() or actual.is_symlink()):
dest = self.destination(item)
_, path_ext = os.path.splitext(path)
_, dest_ext = os.path.splitext(dest)
if path_ext != dest_ext:
if actual.suffix == dest.suffix:
return self.item_change_actions(item, actual, dest)
else:
# formats config option changed
return [Action.REMOVE, Action.ADD]
else:
return self.item_change_actions(item, path, dest)
else:
return [Action.ADD]

Expand All @@ -267,15 +249,15 @@ def ask_create(self, create: Optional[bool] = None) -> bool:
return create

msg = (
f"Collection at '{displayable_path(self.directory)}' does not exists. "
f"Collection at '{self.directory}' does not exists. "
"Maybe you forgot to mount it.\n"
"Do you want to create the collection? (y/n)"
)
return input_yn(msg, require=True)

def update(self, create: Optional[bool] = None):
if not os.path.isdir(syspath(self.directory)) and not self.ask_create(create):
print_(f"Skipping creation of {displayable_path(self.directory)}")
if not self.directory.is_dir() and not self.ask_create(create):
print_(f"Skipping creation of {self.directory}")
return

converter = self._converter()
Expand All @@ -285,32 +267,27 @@ def update(self, create: Optional[bool] = None):
for action in actions:
if action == Action.MOVE:
assert path is not None # action guarantees that `path` is not none
print_(f">{displayable_path(path)} -> {displayable_path(dest)}")
util.mkdirall(dest)
util.move(path, dest)
util.prune_dirs(
# Although the types for `prune_dirs()` require a `str`
# argument the function accepts a `bytes` argument.
cast(str, os.path.dirname(path)),
root=self.directory,
)
print_(f">{path} -> {dest}")
dest.parent.mkdir(parents=True, exist_ok=True)
path.rename(dest)
util.prune_dirs(str(path.parent), root=str(self.directory))
self._set_stored_path(item, dest)
item.store()
path = dest
elif action == Action.WRITE:
assert path is not None # action guarantees that `path` is not none
print_(f"*{displayable_path(path)}")
item.write(path=path)
print_(f"*{path}")
item.write(path=bytes(path))
elif action == Action.SYNC_ART:
assert path is not None # action guarantees that `path` is not none
print_(f"~{displayable_path(path)}")
print_(f"~{path}")
assert path is not None
self._sync_art(item, path)
elif action == Action.ADD:
print_(f"+{displayable_path(dest)}")
print_(f"+{dest}")
converter.run(item)
elif action == Action.REMOVE:
assert path is not None # action guarantees that `path` is not none
print_(f"-{displayable_path(path)}")
print_(f"-{path}")
self._remove_file(item)
item.store()

Expand All @@ -319,54 +296,50 @@ def update(self, create: Optional[bool] = None):
item.store()
converter.shutdown()

def destination(self, item: Item) -> bytes:
def destination(self, item: Item) -> Path:
"""Returns the path for `item` in the external collection."""
path = item.destination(basedir=self.directory, path_formats=self.path_formats)
assert isinstance(path, bytes)
return path
path = item.destination(path_formats=self.path_formats, fragment=True)
# When using `fragment=True` the returned path is guaranteed to be a
# string.
assert isinstance(path, str)
return self.directory / path

def _set_stored_path(self, item: Item, path: bytes):
item[self.path_key] = str(path, "utf8")
def _set_stored_path(self, item: Item, path: Path):
item[self.path_key] = str(path)

def _get_stored_path(self, item: Item) -> Optional[bytes]:
def _get_stored_path(self, item: Item) -> Optional[Path]:
try:
path = item[self.path_key]
except KeyError:
return None
if path:
return path.encode("utf8")

if isinstance(path, str):
return Path(path)
else:
return None

def _remove_file(self, item: Item):
"""Remove the external file for `item`."""
path = self._get_stored_path(item)
assert path, "File to remove does not have a path"
_remove(path)
util.prune_dirs(
# Although the types for `prune_dirs()` require a `str`
# argument the function accepts a `bytes` argument.
cast(str, path),
root=self.directory,
)
if path:
path.unlink(missing_ok=True)
util.prune_dirs(str(path), root=str(self.directory))
del item[self.path_key]

def _converter(self) -> "Worker":
def _convert(item: Item):
dest = self.destination(item)
util.mkdirall(dest)
util.copy(item.path, dest, replace=True)
dest.parent.mkdir(exist_ok=True, parents=True)
shutil.copyfile(item.path, dest)
return item, dest

return Worker(_convert, self.max_workers)

def _sync_art(self, item: Item, path: bytes):
def _sync_art(self, item: Item, path: Path):
"""Embed artwork in the file at `path`."""
album = item.get_album()
if album and album.artpath and os.path.isfile(syspath(album.artpath)):
self._log.debug(
f"Embedding art from {displayable_path(album.artpath)} into {displayable_path(path)}"
)
if album and album.artpath and Path(str(album.artpath, "utf8")).is_file():
self._log.debug(f"Embedding art from {album.artpath} into {path}")
art.embed_item(self._log, item, album.artpath, itempath=path)


Expand Down Expand Up @@ -394,26 +367,26 @@ def _converter(self) -> "Worker":
def _convert(item: Item):
dest = self.destination(item)
with fs_lock:
util.mkdirall(dest)
dest.parent.mkdir(exist_ok=True, parents=True)

if self._should_transcode(item):
self._encode(self.convert_cmd, item.path, dest)
self._encode(self.convert_cmd, item.path, bytes(dest))
# Don't rely on the converter to write correct/complete tags.
item.write(path=dest)
item.write(path=bytes(dest))
else:
self._log.debug(f"copying {displayable_path(dest)}")
util.copy(item.path, dest, replace=True)
self._log.debug(f"copying {dest}")
shutil.copyfile(item.path, dest)
if self._embed:
self._sync_art(item, dest)
return item, dest

return Worker(_convert, self.max_workers)

@override
def destination(self, item: Item) -> bytes:
def destination(self, item: Item) -> Path:
dest = super().destination(item)
if self._should_transcode(item):
return os.path.splitext(dest)[0] + b"." + self.ext
return dest.with_suffix("." + self.ext.decode("utf8"))
else:
return dest

Expand Down Expand Up @@ -444,21 +417,17 @@ def parse_config(self, config: confuse.ConfigView):

@override
def item_change_actions(
self, item: Item, path: bytes, dest: bytes
self, item: Item, actual: Path, dest: Path
) -> Sequence[Action]:
"""Returns the necessary actions for items that were previously in the
external collection, but might require metadata updates.
"""

if path != dest:
return [Action.MOVE]

try:
link_target_correct = os.path.samefile(path, item.path)
except FileNotFoundError:
link_target_correct = False

if link_target_correct:
if (
actual == dest
and actual.is_file() # Symlink not broken, `.samefile()` doesn’t throw
and actual.samefile(Path(str(item.path, "utf8")))
):
return []
else:
return [Action.MOVE]
Expand All @@ -471,43 +440,44 @@ def update(self, create: Optional[bool] = None):
for action in actions:
if action == Action.MOVE:
assert path is not None # action guarantees that `path` is not none
print_(f">{displayable_path(path)} -> {displayable_path(dest)}")
print_(f">{path} -> {dest}")
self._remove_file(item)
self._create_symlink(item)
self._set_stored_path(item, dest)
elif action == Action.ADD:
print_(f"+{displayable_path(dest)}")
print_(f"+{dest}")
self._create_symlink(item)
self._set_stored_path(item, dest)
elif action == Action.REMOVE:
assert path is not None # action guarantees that `path` is not none
print_(f"-{displayable_path(path)}")
print_(f"-{path}")
self._remove_file(item)
else:
continue
item.store()

def _create_symlink(self, item: Item):
dest = self.destination(item)
util.mkdirall(dest)
dest.parent.mkdir(exist_ok=True, parents=True)
item_path = Path(str(item.path, "utf8"))
link = (
os.path.relpath(item.path, os.path.dirname(dest))
os.path.relpath(item_path, dest.parent)
if self.relativelinks == SymlinkType.RELATIVE
else item.path
else item_path
)
util.link(link, dest)
dest.symlink_to(link)

@override
def _sync_art(self, item: Item, path: bytes):
def _sync_art(self, item: Item, path: Path):
pass


class Worker(futures.ThreadPoolExecutor):
def __init__(
self, fn: Callable[[Item], Tuple[Item, bytes]], max_workers: Optional[int]
self, fn: Callable[[Item], Tuple[Item, Path]], max_workers: Optional[int]
):
super().__init__(max_workers)
self._tasks: Set[futures.Future[Tuple[Item, bytes]]] = set()
self._tasks: Set[futures.Future[Tuple[Item, Path]]] = set()
self._fn = fn

def run(self, item: Item):
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ preview = true
extend-select = [
"I", # Sort imports
"C", # Pyflakes conventions
"PTH", # Use pathlib instead of os
"PIE", # Misc. lints
"UP", # Enforce modern Python syntax
"FURB", # Also enforce more modern Python syntax
Expand Down
Loading

0 comments on commit f3bd54d

Please sign in to comment.