From 3842251dad35c364ce3a63da04e0a5c593d1a156 Mon Sep 17 00:00:00 2001 From: Justin Mayfield Date: Tue, 25 Aug 2015 00:31:45 -0600 Subject: [PATCH] Initial extraction from my target project ecmcli. --- .gitignore | 54 ++++++++ LICENSE | 21 ++++ MANIFEST.in | 2 + README.md | 88 +++++++++++++ examples/hello_world.py | 28 +++++ setup.cfg | 2 + setup.py | 42 +++++++ shellish/__init__.py | 16 +++ shellish/command.py | 273 ++++++++++++++++++++++++++++++++++++++++ shellish/completer.py | 138 ++++++++++++++++++++ shellish/debug.py | 14 +++ shellish/shell.py | 101 +++++++++++++++ test/__init__.py | 0 test/basic.py | 28 +++++ 14 files changed, 807 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 MANIFEST.in create mode 100644 README.md create mode 100644 examples/hello_world.py create mode 100644 setup.cfg create mode 100644 setup.py create mode 100644 shellish/__init__.py create mode 100644 shellish/command.py create mode 100644 shellish/completer.py create mode 100644 shellish/debug.py create mode 100644 shellish/shell.py create mode 100644 test/__init__.py create mode 100644 test/basic.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..51cbe85 --- /dev/null +++ b/.gitignore @@ -0,0 +1,54 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +bin/ +build/ +develop-eggs/ +dist/ +eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.cache +nosetests.xml +coverage.xml + +# Translations +*.mo + +# Mr Developer +.mr.developer.cfg +.project +.pydevproject + +# Rope +.ropeproject + +# Django stuff: +*.log +*.pot + +# Sphinx documentation +docs/_build/ + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..352b170 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2015 Justin Mayfield + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..04f196a --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,2 @@ +include README.md +include LICENSE diff --git a/README.md b/README.md new file mode 100644 index 0000000..ca362db --- /dev/null +++ b/README.md @@ -0,0 +1,88 @@ +shellish - Framework for creating heavy a shell-ish CLI. +=========== + +This module combines the Python standard library modules argparse and cmd +to provided a unified way to make cli programs that can also be interactive +when invoked in "shell" mode. + +The main benefit to using this package is streamlined command hierarchy when +you want to have rich set of subcommands along with a pretty powerful tab +completion layer that parses argparse arguments automagically. + +Requirements +-------- + +* None more black! + + +Installation +-------- + + python3 ./setup.py build + python3 ./setup.py install + + +Compatibility +-------- + +* Python 3.4+ + + +TODO +-------- + +* Documentation +* Documentation +* Documentation + + +Getting Started +-------- + +TBD + + +Examples +-------- + +**Hello World** + +A requisite Hello World.. + +```python +import shellish + + +class Hello(shellish.Command): + """ I am a required docstring used to document the --help output! """ + + name = 'hello' + + def __init__(self, *args, **kwargs): + self.add_subcommand(World, default=True) + + def run(self, args): + shellish.Shell(self).cmdloop() + + +class World(shellish.Command): + """ Say something. """ + + name = 'world' + + def run(self, args): + print('Hello World') + + +if __name__ == '__main__': + root = Hello() + args = root.argparser.parse_args() + try: + root.invoke(args) + except KeyboardInterrupt: + sys.exit(1) +``` + +```bash +python3 ./hello.py hello world +``` diff --git a/examples/hello_world.py b/examples/hello_world.py new file mode 100644 index 0000000..b7ea2a0 --- /dev/null +++ b/examples/hello_world.py @@ -0,0 +1,28 @@ +import shellish + +class Hello(shellish.Command): + """ I am a required docstring used to document the --help output! """ + + name = 'hello' + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.add_subcommand(World, default=True) + + def run(self, args): + shellish.Shell(self).cmdloop() + + +class World(shellish.Command): + """ Say something. """ + + name = 'world' + + def run(self, args): + print('Hello World') + + +if __name__ == '__main__': + root = Hello() + args = root.argparser.parse_args() + root.invoke(args) diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..b88034e --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[metadata] +description-file = README.md diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..c876c2e --- /dev/null +++ b/setup.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python + +from setuptools import setup, find_packages + +README = 'README.md' + +def long_desc(): + try: + import pypandoc + except ImportError: + with open(README) as f: + return f.read() + else: + return pypandoc.convert(README, 'rst') + +setup( + name='shellish', + version='0.0.1', + description='A command line shell framework', + author='Justin Mayfield', + author_email='tooker@gmail.com', + url='https://github.com/mayfield/shellish/', + license='MIT', + long_description=long_desc(), + packages=find_packages(), + install_requires=[], + test_suite='test', + classifiers=[ + 'Development Status :: 1 - Planning', + #'Development Status :: 2 - Pre-Alpha', + #'Development Status :: 3 - Alpha', + #'Development Status :: 4 - Beta', + #'Development Status :: 5 - Production/Stable', + #'Development Status :: 6 - Mature', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: MIT License', + 'Operating System :: OS Independent', + 'Programming Language :: Python', + 'Topic :: Software Development :: Libraries', + 'Topic :: System :: Shells', + ] +) diff --git a/shellish/__init__.py b/shellish/__init__.py new file mode 100644 index 0000000..ab06451 --- /dev/null +++ b/shellish/__init__.py @@ -0,0 +1,16 @@ +""" +Public interface. +""" + +from . import shell, completer, command + + +def export(module, symbol): + globals()[symbol] = getattr(module, symbol) + +for x in shell.__public__: + export(shell, x) +for x in completer.__public__: + export(completer, x) +for x in command.__public__: + export(command, x) diff --git a/shellish/command.py b/shellish/command.py new file mode 100644 index 0000000..ee359ce --- /dev/null +++ b/shellish/command.py @@ -0,0 +1,273 @@ +""" +The merger of argparse and cmd goes here. This holds the main base class +used by all commands. +""" + +import argparse +import collections +import functools +import itertools +import shlex +import time +import traceback +from . import debug, completer + +__public__ = ['Command'] + + +class Command(object): + """ The primary encapsulation for a shellish command. Each command or + subcommand should be an instance of this class. The docstring for sub- + classes is used in --help output for this command and is required. """ + + name = None # Single word string required by subclass. + + def setup_args(self, parser): + """ Subclasses should provide any setup for their parsers here. """ + pass + + def prerun(self, args): + """ Hook to do thing prior to any invocation. """ + pass + + def run(self, args): + """ Primary entry point for command exec. """ + self.argparser.print_usage() + raise SystemExit(1) + + def add_argument(self, *args, complete=None, **kwargs): + """ Allow cleaner action supplementation. """ + action = self.argparser.add_argument(*args, **kwargs) + if complete: + action.complete = complete + return action + + def __init__(self, parent=None, **context): + self.inject_context(parent, context) + self.parent = parent + self.depth = (parent.depth + 1) if parent else 0 + self.subcommands = [] + self.subparsers = None + self.default_subcommand = None + self.argparser = self.create_argparser() + self.last_invoke = None + self.setup_args(self.argparser) + + def inject_context(self, parent, context): + """ Map context attributes from the parent and from the context + argument into this instance (as attributes). """ + self.context_keys = set(context.keys()) + for key, value in context.items(): + setattr(self, key, value) + if parent: + for key in parent.context_keys: + setattr(self, key, getattr(parent, key)) + self.context_keys |= parent.context_keys + + def create_argparser(self): + """ Factory for arg parser, can be replaced with any ArgParser compat + instance. """ + Formatter = argparse.RawDescriptionHelpFormatter + desc = self.clean_docstring()[1] + parser = argparse.ArgumentParser(self.name, description=desc, + formatter_class=Formatter) + return parser + + def clean_docstring(self): + """ Return sanitized docstring from this class. + The first line of the docstring is the title, and remaining lines are + the details, aka git style. """ + if not self.__doc__: + raise SyntaxError('Docstring missing for: %s' % self) + doc = [x.strip() for x in self.__doc__.splitlines()] + if not doc[0]: + doc.pop(0) # Some people leave the first line blank. + title = doc.pop(0) + if doc: + desc = '%s\n\n%s' % (title, '\n'.join(doc)) + else: + desc = title + return title, desc + + def complete_wrap(self, *args, **kwargs): + """ Readline eats exceptions raised by completer functions. """ + try: + return self._complete_wrap(*args, **kwargs) + except BaseException as e: + traceback.print_exc() + raise e + + def _complete_wrap(self, text, line, begin, end): + """ Get and format completer choices. Note that all internal calls to + completer functions must use set()s but this wrapper has to return a + list to satisfy cmd.Cmd. """ + choices = self.complete(text, line, begin, end) + debug.log("PREFORMAT CHOICES", choices) + sz = len(choices) + if sz == 1: + return ['%s ' % shlex.quote(choices.pop())] + elif sz > 2: + # We don't need the sentinel choice to prevent completion + # when there is already more than 1 choice. + choices -= {completer.ActionCompleter.sentinel} + return ['%s ' % x for x in choices] + + def complete(self, text, line, begin, end): + """ Do naive argument parsing so the completer has better ability to + understand expansion rules. """ + debug.log() + import datetime + debug.log(datetime.datetime.now()) + line = line[:end] # Ignore characters following the cursor. + args = self.split_line(line)[1:] + options = self.deep_scan_parser(self.argparser) + + # Walk into options tree if subcommands are detected. + last_subcommand = None + while True: + for key, completers in options.items(): + if key in args and hasattr(completers[0], 'items'): + args.remove(key) + last_subcommand = key + options = completers[0] + break + else: + break + if text == last_subcommand: + # We have to specially catch the case where the last argument is + # the key used to find our subparser. More specifically when the + # cursor is not preceded by a space too, as this prevents the + # completion routines from continuing. The simplest way without + # complicating the algo for coming up with our options list is to + # simply shortcut the completer by returning a single item. + # Subsequent tabs will work normally. + return {text} + + # Look for incomplete actions. + choices = set(options) - {None} + arg_buf = [] + pos_args = [] + trailing_action = None + # The slice below skips the last arg if it is 'active'. + for x in reversed(args[:-1 if text else None]): + if x in options: + action = options[x][0] + action.consume(arg_buf) + pos_args.extend(arg_buf) + del arg_buf[:] + if not trailing_action: + trailing_action = action + if not action.full: + if action.reached_min: + choices |= action(self, text) + choices -= {action.key} + else: + choices = action(self, text) + break + else: + arg_buf.insert(0, x) + pos_args.extend(arg_buf) + + # Feed any remaining arguments in the buffer to positionals so long as + # there isn't a trailing action that can still consume. + if None in options and (not trailing_action or trailing_action.full): + for x_action in options[None]: + debug.log("CONSUME FOR POS", x_action, pos_args) + x_action.consume(pos_args) + if not x_action.reached_min: + choices = x_action(self, text) + break + elif not x_action.full: + choices |= x_action(self, text) + + debug.log('line ::%s::' % line) + debug.log("text ::%s::" % text) + debug.log('args', args) + debug.log('choices', choices) + return set(x for x in choices if x.startswith(text)) + + def split_line(self, line): + """ Try to do pure shlex.split unless it can't parse the line. In that + case we trim the input line until shlex can split the args and tack the + unparsable portion on as the last argument. """ + remainder = [] + while True: + try: + args = shlex.split(line) + except ValueError: + remainder.append(line[-1]) + line = line[:-1] + else: + if remainder: + args.append(''.join(reversed(remainder))) + return args + + @functools.lru_cache() + def deep_scan_parser(self, parser): + results = collections.defaultdict(list) + for x in parser._actions: + ac = completer.ActionCompleter(x) + if ac.subparsers: + for key, xx in ac.subparsers.items(): + results[key].append(self.deep_scan_parser(xx)) + else: + results[ac.key].append(ac) + return results + + def invoke(self, args): + """ If a subparser is present and configured we forward invocation to + the correct subcommand method. If all else fails we call run(). """ + commands = self.get_commands_from(args) + self.last_invoke = time.time() + if self.subparsers: + try: + command = commands[self.depth] + except IndexError: + if self.default_subcommand: + self.default_subcommand.argparser.parse_args([], namespace=args) + self.invoke(args) # retry + return + else: + self.prerun(args) + command.invoke(args) + return + self.prerun(args) + self.run(args) + + def get_commands_from(self, args): + """ We have to code the key names for each depth. This method scans + for each level and returns a list of the command arguments. """ + commands = [] + for i in itertools.count(0): + try: + commands.append(getattr(args, 'command%d' % i)) + except AttributeError: + break + return commands + + def add_subcommand(self, command_class, default=False): + command = command_class(parent=self) + if command.name is None: + raise TypeError('Cannot add unnamed command: %s' % command) + if not self.subparsers: + desc = 'Provide a subcommand argument to perform an operation.' + addsub = self.argparser.add_subparsers + self.subparsers = addsub(title='subcommands', description=desc, + metavar='COMMAND') + if default: + if self.default_subcommand: + raise ValueError("Default subcommand already exists.") + self.default_subcommand = command + title, desc = command.clean_docstring() + help_fmt = '%s (default)' if default else '%s' + help = help_fmt % title + prog = '%s %s' % (self.subparsers._prog_prefix, command.name) + if command.subparsers: + for x in command.subparsers.choices.values(): + x.prog = '%s %s' % (prog, x.prog.rsplit(' ', 1)[1]) + command.argparser.prog = prog + action = self.subparsers._ChoicesPseudoAction(command.name, (), help) + self.subparsers._choices_actions.append(action) + self.subparsers._name_parser_map[command.name] = command.argparser + command.argparser.set_defaults(**{'command%d' % self.depth: command}) + self.subcommands.append(command) diff --git a/shellish/completer.py b/shellish/completer.py new file mode 100644 index 0000000..66b8d86 --- /dev/null +++ b/shellish/completer.py @@ -0,0 +1,138 @@ +""" +Tab completion handling. +""" + +import argparse +from . import debug + +__public__ = [] + + +class ActionCompleter(object): + """ Stateful behavior for tab completion. Calling this instance returns + valid choices for the action with the given prefix (if any). The results + are cached and the cache will be used for repeat calls or narrowed down + prefixes. """ + + sentinel = ' ' + + def __init__(self, action): + self.action = action + self.key = None + self.choices = None + self.consumed = 0 + self.subparsers = None + self.last_complete = None + self.cache = {} + if action.choices and hasattr(action.choices, 'items'): + self.subparsers = action.choices.copy() + if action.option_strings: + # Only include the longest option string under the assumption + # that this most accurately describes the argument. + self.key = max(action.option_strings, key=len) + if action.choices: + self.completer = self.choice_complete + self.choices = action.choices + elif getattr(action, 'complete', None): + self.completer = self.proxy_complete(action.complete) + else: + self.completer = self.hint_complete + self.parse_nargs(action.nargs) + + def __str__(self): + return '<%s key:%s action:(%s)>' % (type(self).__name__, self.key, + self.about_action()) + + def __call__(self, command, prefix): + if self.last_complete is not command.last_invoke: + debug.log("CLEAR CACHE") + self.cache.clear() + self.last_complete = command.last_invoke + try: + choices = self.cache[prefix] + except KeyError: + choices = self.cache[prefix] = self.completer(prefix) + return choices + + def thru_cache(self, prefix): + """ Search the radix tree for this prefix. Partial matches will be + refined, stored and returned. Full miss will cause a lookup. """ + offt = self.cache + bestfit = offt.setdefault('__value__', self.cache_miss) + for c in prefix: + offt = offt[c] + bestfit = offt.setdefault('__value__', bestfit) + if bestfit is self.cache_miss: + value = self.completer(prefix) + self.cache_miss += 1 + else: + value = frozenset(x for x in bestfit if x.startswith(prefix)) + if value == bestfit: + self.cache_hit += 1 + else: + self.cache_partial += 1 + offt['__value__'] = value + return value + + def parse_nargs(self, nargs): + """ Nargs is essentially a multi-type encoding. We have to parse it + to understand how many values this action may consume. """ + self.max_args = self.min_args = 0 + if nargs is None: + self.max_args = self.min_args = 1 + elif nargs == argparse.OPTIONAL: + self.max_args = 1 + elif nargs == argparse.ZERO_OR_MORE: + self.max_args = None + elif nargs in (argparse.ONE_OR_MORE, argparse.REMAINDER): + self.min_args = 1 + self.max_args = None + elif nargs != argparse.PARSER: + self.max_args = self.min_args = nargs + + def consume(self, args): + """ Consume the arguments we support. The args are modified inline. + The return value is the number of args eaten. """ + consumable = args[:self.max_args] + self.consumed = len(consumable) + del args[:self.consumed] + return self.consumed + + @property + def full(self): + """ Can this action take more arguments? """ + if self.max_args is None: + return False + debug.log("is full?", self, self.max_args, self.consumed) + return self.consumed >= self.max_args + + @property + def reached_min(self): + """ Have we consumed the minimum number of args. """ + return self.consumed >= self.min_args + + def about_action(self): + """ Simple string describing the action. """ + name = self.action.metavar or self.action.dest + type_name = self.action.type.__name__ if self.action.type else '' + if self.action.help or type_name: + extra = ' (%s)' % (self.action.help or 'type: %s' % type_name) + else: + extra = '' + return name + extra + + def choice_complete(self, prefix): + return frozenset(x for x in self.choices if x.startswith(prefix)) + + def hint_complete(self, prefix): + """ For arguments that don't have complete functions or .choices we + can only hint about the argument details. The results are designed to + not self-expand (ie, len(choices) > 1). """ + return frozenset(( + '' % self.about_action(), + self.sentinel + )) + + def proxy_complete(self, func): + """ Pass completion work to foreign function. """ + return lambda *args, **kwargs: frozenset(func(*args, **kwargs)) diff --git a/shellish/debug.py b/shellish/debug.py new file mode 100644 index 0000000..34fa80e --- /dev/null +++ b/shellish/debug.py @@ -0,0 +1,14 @@ +""" +Debugging (printf mostly) facilities for the framework. +""" + +import pprint + +LOG_FILE = 'shellish.debug' + + +def log(*args): + formatted = [pprint.pformat(x, width=1) if isinstance(x, (list, dict)) + else str(x) for x in args] + with open(LOG_FILE, 'a') as f: + f.write(' '.join(formatted) + '\n') diff --git a/shellish/shell.py b/shellish/shell.py new file mode 100644 index 0000000..2ef4e76 --- /dev/null +++ b/shellish/shell.py @@ -0,0 +1,101 @@ +""" +The interactive portions of shellish. +""" + +import cmd +import os.path +import readline +import shlex +import shutil +import sys + +__public__ = ['Shell'] + + +class ShellQuit(Exception): + pass + + +class Shell(cmd.Cmd): + """ The interactive manager for a session of command calls. This babysits + a tree of commands until the user requests our exit. """ + + prompt = '$ ' + history_dir = os.path.expanduser('~') + intro = 'Type "help" or "?" to list commands and "exit" to quit.' + + def __init__(self, root_command): + self.root_command = root_command + self.name = root_command.name + self.history_file = os.path.join(self.history_dir, + '.%s_history' % self.name) + try: + readline.read_history_file(self.history_file) + except FileNotFoundError: + pass + for x in root_command.subcommands: + setattr(self, 'do_%s' % x.name, self.wrap_command_invoke(x)) + setattr(self, 'help_%s' % x.name, x.argparser.print_help) + setattr(self, 'complete_%s' % x.name, x.complete_wrap) + delims = set(readline.get_completer_delims()) + readline.set_completer_delims(''.join(delims - set('-+@:'))) + super().__init__() + + def wrap_command_invoke(self, cmd): + def wrap(arg): + args = cmd.argparser.parse_args(shlex.split(arg)) + cmd.invoke(args) + wrap.__doc__ = cmd.__doc__ + wrap.__name__ = 'do_%s' % cmd.name + return wrap + + def get_names(self): + names = super().get_names() + commands = self.root_command.subcommands + for op in ('help', 'do', 'complete'): + names.extend('%s_%s' % (op, x.name) for x in commands) + return names + + def complete_help(self, *args, **kwargs): + topics = super().complete_help(*args, **kwargs) + return ['%s ' % x.rstrip() for x in topics] + + def completenames(self, *args, **kwargs): + names = super().completenames(*args, **kwargs) + return ['%s ' % x.rstrip() for x in names] + + def emptyline(self): + """ Do not re-run the last command. """ + pass + + def columnize(self, items, displaywidth=None): + if displaywidth is None: + displaywidth, h = shutil.get_terminal_size() + return super().columnize(items, displaywidth=displaywidth) + + def cmdloop(self): + intro = () + while True: + try: + super().cmdloop(*intro) + except ShellQuit: + return + except KeyboardInterrupt: + print() + except SystemExit as e: + if not str(e).isnumeric(): + print(e, file=sys.stderr) + finally: + readline.write_history_file(self.history_file) + if not intro: + intro = ('',) + + def do_exit(self, arg): + raise ShellQuit() + + def default(self, line): + if line == 'EOF': + print('^D') + raise ShellQuit() + else: + return super().default(line) diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/basic.py b/test/basic.py new file mode 100644 index 0000000..77afa16 --- /dev/null +++ b/test/basic.py @@ -0,0 +1,28 @@ +""" +Sanity tests for the shellish library. +""" + +import shellish +import unittest + + +class ShellSanity(unittest.TestCase): + + def test_shell_init(self): + self.assertRaises(TypeError, shellish.Shell) + shellish.Shell(shellish.Command()) + + +class CommandSanity(unittest.TestCase): + + def test_command_init(self): + class Foo(shellish.Command): + """ foo """ + pass + Foo() + shellish.Command() + + def test_docstring_required(self): + class Foo(shellish.Command): + pass + self.assertRaises(SyntaxError, Foo)