diff --git a/src/orger/org_view.py b/src/orger/org_view.py index fd32e2a..c762b40 100644 --- a/src/orger/org_view.py +++ b/src/orger/org_view.py @@ -1,16 +1,18 @@ #!/usr/bin/env python3 import argparse from argparse import ArgumentParser, Namespace +from collections import Counter import logging import inspect +import json from pathlib import Path from subprocess import check_call -from tempfile import TemporaryDirectory -from typing import List, Tuple, Iterable, Optional, Union, Callable, Dict +import sys +from typing import Any, List, Tuple, Iterable, Optional, Union, Callable, Dict from .inorganic import OrgNode, TimestampStyle from .state import JsonState -from .atomic_append import PathIsh, atomic_append_check, assert_not_edited +from .atomic_append import atomic_append_check, assert_not_edited from .common import setup_logger, orger_user_dir # TODO tests for determinism? not sure where should they be... @@ -103,22 +105,29 @@ class Mirror(OrgView): @classmethod def main(cls, setup_parser=None) -> None: p = cls.parser() - p.add_argument('--to', type=Path, default=Path(cls.name() + '.org'), help='Filename to output') + og = p.add_mutually_exclusive_group() + og.add_argument('--to', type=Path, default=Path(cls.name() + '.org'), help='Filename to output') + og.add_argument('--stdout', action='store_true', help='pass to print output to stdout, useful for testing/debugging') if setup_parser is not None: setup_parser(p) args = p.parse_args() inst = cls(cmdline_args=args) inst.main_common() - inst._run(to=args.to) + inst._run(to=args.to, stdout=args.stdout) def get_items(self) -> Iterable: raise NotImplementedError - def _run(self, to: Path): + def _run(self, to: Path, stdout: bool) -> None: org_tree = self.make_tree() rtree = org_tree.render(level=0) + if stdout: + print(rtree) + return + + # otherwise output to file assert_not_edited(to) # again, not properly atomic, but hopefully enough # TODO create a github issue, maybe someone comes up with proper way of solving this @@ -190,23 +199,28 @@ class Queue(OrgView): def _run( self, to: Path, + stdout: bool, state_path: Path, init: bool=False, dry_run: bool=False, ) -> None: self.logger.info('Using state file %s', state_path) - if not to.exists() and not init: - err = RuntimeError(f"{to} doesn't exist! Try running with --init") - import sys - if sys.stdin.isatty(): - resp = input(f"{to} doesn't exist. Create empty file? y/n ").strip().lower() - if resp != 'y': + appender: Callable[[str], Any] + if stdout: + appender = lambda s: sys.stdout.write(s) + else: + appender = lambda s: atomic_append_check(to, s) + + if not to.exists() and not init: + err = RuntimeError(f"{to} doesn't exist! Try running with --init") + if sys.stdin.isatty(): + resp = input(f"{to} doesn't exist. Create empty file? y/n ").strip().lower() + if resp != 'y': + raise err + else: raise err - else: - raise err - state_path.parent.mkdir(parents=True, exist_ok=True) # not sure... state = JsonState( path=state_path, logger=self.logger, @@ -214,27 +228,23 @@ def _run( ) items = list(self.get_items()) - from collections import Counter dups = [k for k, cnt in Counter(i[0] for i in items).items() if cnt > 1] if len(dups) > 0: raise RuntimeError(f'Duplicate items {dups}') if not to.exists(): self.logger.warning("target %s didn't exist, initializing!", to) - atomic_append_check(to, self.file_header + '\n') + appender(self.file_header + '\n') for key, item in items: def action(item=item): # not sure about this newline, but better to have extra whitespace than rely on trailing rendered = '\n' + item.render(level=1) - atomic_append_check( - to, - rendered, - ) + appender(rendered) self.logger.debug('processing %s', key) state.feed( key=key, - value=item, # TODO not sure about this one... perhaps only link? + value=item, # TODO not sure about this one... perhaps only link? action=action, ) @@ -245,7 +255,9 @@ def get_items(self) -> Iterable[OrgWithKey]: def main(cls, setup_parser=None) -> None: default_state = orger_user_dir() / 'states' / (cls.name() + '.state.json') p = cls.parser() - p.add_argument('--to' , type=Path, default=Path(cls.name() + '.org') , help='file where new items are added') + og = p.add_mutually_exclusive_group() + og.add_argument('--to' , type=Path, default=Path(cls.name() + '.org') , help='file where new items are added') + og.add_argument('--stdout', action='store_true', help='pass to print output to stdout, useful for testing/debugging') p.add_argument('--state', type=Path, default=default_state, help='state file for keeping track of handled items') p.add_argument('--init', action='store_true') # todo not sure if I really need it? p.add_argument('--dry-run', action='store_true', help='Run without modifying the state file') @@ -257,6 +269,7 @@ def main(cls, setup_parser=None) -> None: inst.main_common() inst._run( to=args.to, + stdout=args.stdout, state_path=args.state, init=args.init, dry_run=args.dry_run, @@ -274,7 +287,7 @@ def get_items(self): rpath = tmp_path / 'test.org' - TestView([])._run(to=rpath) + TestView([])._run(to=rpath, stdout=False) assert rpath.read_text() == ''' # autogenerated! #+TITLE: sometitle @@ -285,7 +298,7 @@ def get_items(self): # TODO shit, it's gonna use implicit date?? ('first' , OrgNode(heading='whatever')), ('second', OrgNode(heading='alala')), # TODO why was that even necessary?? - ])._run(to=rpath) + ])._run(to=rpath, stdout=False) # TODO eh, perhaps use trailing space? assert rpath.read_text() == """ # autogenerated! @@ -296,8 +309,7 @@ def get_items(self): * alala""".lstrip() -def test_org_view_append(tmp_path: Path): - import json +def test_org_view_append(tmp_path: Path) -> None: class TestView(Queue): def __init__(self, items: List[OrgWithKey], *args, **kwargs) -> None: super().__init__(*args, file_header='# autogenerated!', **kwargs) # type: ignore @@ -313,6 +325,7 @@ def get_items(self): def run_view(items, **kwargs): TestView(items)._run( to=rpath, + stdout=False, state_path=spath, **kwargs, ) diff --git a/src/orger/state.py b/src/orger/state.py index 5671bbe..939c90f 100644 --- a/src/orger/state.py +++ b/src/orger/state.py @@ -9,7 +9,7 @@ PathIsh = Union[str, Path] State = Dict[str, Any] -from atomicwrites import atomic_write # type: ignore[import] +from atomicwrites import atomic_write # type: ignore[import-untyped] # TODO hmm. state should be ordered ideally? so it's easy to add/remove items? # would require storing as list of lists? or use that https://stackoverflow.com/a/6921760/706389 @@ -44,6 +44,7 @@ def __setitem__(self, key: str, value: Any) -> None: self.logger.debug('dry run! ignoring %s: %s', key, value) return + self.path.parent.mkdir(parents=True, exist_ok=True) with atomic_write(str(self.path), overwrite=True) as fo: json.dump(current, fo, indent=1, sort_keys=True) @@ -63,9 +64,7 @@ def feed(self, key: str, value: Any, action: Callable[[], None]) -> None: if key in self: self.logger.debug('already handled: %s: %s', key, value) return - self.logger.info('adding %s: %s', key, value) - # TODO not sure about print... - print(f'adding new item {key}: {value}') + self.logger.info('adding new item %s: %s', key, value) action() self[key] = repr(value)