Skip to content

Commit

Permalink
streaming data input as ndjson: option -S
Browse files Browse the repository at this point in the history
Option -S reads data from stdin/files using existing mechanisms, as
newline-delimited json.  "_" is an iterator providing the json values.

Note that json parsing errors will not raise until running the user query, or
until formatting and printing output in the case that output is streamed and
flattened using -F.

Due to refactoring the order of loading data and performing certain
checks has changed.  As a result users may see different error messages
than in the past.
  • Loading branch information
stevenpelley committed Dec 5, 2024
1 parent fbb4299 commit 0fcd472
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 32 deletions.
100 changes: 70 additions & 30 deletions jello/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""jello - query JSON at the command line with python syntax"""

import contextlib
import json
import os
import sys
import signal
Expand All @@ -8,6 +10,7 @@
import traceback
from textwrap import TextWrapper
import jello
import jello.lib as lib
from jello.lib import opts, load_json, read_file, pyquery, format_response


Expand All @@ -21,7 +24,7 @@ def get_stdin():
if sys.stdin.isatty():
return None
else:
return sys.stdin.read()
return sys.stdin


def print_help():
Expand All @@ -45,6 +48,8 @@ def print_help():
-r raw string output (no quotes)
-R raw string input (don't auto convert input to dict/list)
-s print the JSON schema in grep-able format
-S stream input newline-delimited json from STDIN or file
from -f. "_" is an iterator providing the entries
-t print type annotations in schema view
-v version info
-h help
Expand Down Expand Up @@ -126,6 +131,27 @@ def print_exception(e=None, data='', query='', response='', ex_type='Runtime'):
sys.exit(1)


def print_json_streaming_error(e, query='', ex_type='Json Load'):
cause = e.__cause__
data = None
if isinstance(cause, json.JSONDecodeError):
data = cause.doc
print_exception(
cause,
data=data,
query=query,
ex_type=ex_type
)


class nullclosing:
"""simpler version of contextlib.nullcontext, which was introduced in 3.7
(currently support 3.6)"""

def close(self):
pass


def main(data=None, query='_'):
# break on ctrl-c keyboard interrupt
signal.signal(signal.SIGINT, ctrlc)
Expand All @@ -140,12 +166,14 @@ def main(data=None, query='_'):
if sys.platform.startswith('win32'):
os.system('')

stdin = None
if data is None:
data = get_stdin()
stdin = get_stdin()

options = []
long_options = {}
arg_section = '' # can be query_file or data_files
data_files = []

for arg in sys.argv[1:]:
if arg == '-q':
Expand All @@ -164,10 +192,8 @@ def main(data=None, query='_'):
arg_section = ''

elif arg_section == 'data_files':
try:
data += '\n' + read_file(arg)
except Exception as e:
print_error(f'jello: Issue reading data file: {e}')
data_files.append(arg)
arg_section = ''

elif arg.startswith('-') and not arg.startswith('--'):
options.extend(arg[1:])
Expand Down Expand Up @@ -196,6 +222,7 @@ def main(data=None, query='_'):
opts.raw = opts.raw or 'r' in options
opts.raw_input = opts.raw_input or 'R' in options
opts.schema = opts.schema or 's' in options
opts.stream_input = opts.stream_input or 'S' in options
opts.types = opts.types or 't' in options
opts.version_info = opts.version_info or 'v' in options
opts.helpme = opts.helpme or 'h' in options
Expand All @@ -213,39 +240,52 @@ def main(data=None, query='_'):
'''))
sys.exit()

if data is None and not opts.empty:
if not opts.empty and data is None and not stdin and not data_files:
print_error('jello: Missing JSON or JSON Lines data via STDIN or file via -f option.\n')

# read all the file sources
input_context_manager = contextlib.closing(nullclosing())
if opts.empty:
data = '{}'

# load the data as a raw string or JSON
if opts.raw_input:
data = str(data).rstrip('\r\n')

elif opts.stream_input:
data = lib.StreamingJsonInput(data, stdin, data_files)
input_context_manager = data
else:
# load the JSON or JSON Lines into a dict or list of dicts
data = lib.read_data_nonstreaming(data, stdin, data_files)
if opts.raw_input:
data = str(data).rstrip('\r\n')
else:
try:
data = load_json(data)
except json.JSONDecodeError as e:
print_exception(e, ex_type='JSON Load')

# closes input data resources
with input_context_manager as _:
# Read .jelloconf.py (if it exists) and run the query
response = ''
try:
data = load_json(data)
response = pyquery(data, query)
except lib.StreamingJsonError as e:
# when streaming input errors are not raised until the data is
# pulled by the user query
print_json_streaming_error(e)
except Exception as e:
print_exception(e, ex_type='JSON Load')

# Read .jelloconf.py (if it exists) and run the query
response = ''
try:
response = pyquery(data, query)
except Exception as e:
print_exception(e, data, query, ex_type='Query')
print_exception(e, data, query, ex_type='Query')

# reset opts.mono after pyquery since initialization in pyquery can change values
if opts.force_color:
opts.mono = False
# reset opts.mono after pyquery since initialization in pyquery can change values
if opts.force_color:
opts.mono = False

# Create and print schema or JSON/JSON-Lines/Lines
try:
format_response(response)
except Exception as e:
print_exception(e, data, query, response, ex_type='Output')
# Create and print schema or JSON/JSON-Lines/Lines
try:
format_response(response)
except lib.StreamingJsonError as e:
# when streaming output using -F we don't parse input and process it
# until we pull from the output iterator here
print_json_streaming_error(e)
except Exception as e:
print_exception(e, data, query, response, ex_type='Output')


if __name__ == '__main__':
Expand Down
168 changes: 168 additions & 0 deletions jello/lib.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""jello - query JSON at the command line with python syntax"""

import collections.abc
from io import StringIO
import os
import sys
import types
Expand Down Expand Up @@ -59,6 +60,7 @@ class opts:
number_color = None
string_color = None
flatten = None
stream_input = None


class JelloTheme:
Expand Down Expand Up @@ -440,6 +442,172 @@ def read_file(file_path):
return f.read()


def read_data_nonstreaming(initial_data, stdin, data_files):
sio = StringIO()
sep = ""
if initial_data is not None:
sio.write(initial_data)
sio.write("\n")
sep = "\n"
if stdin:
sio.write(sep)
sio.write(stdin.read())
sep = "\n"
for file in data_files:
sio.write(sep)
# let the JsonDecoderError raise
sio.write(read_file(file))
sep = "\n"
return sio.getvalue()


class StreamingJsonError(Exception):
'''
Wraps exceptions raised while loading and parsing json data.
Raised from exception so that __cause__ provides the underlying error.
When streaming data is not deserialized until pulled from an iterator during
user query execution or output formatting (when using -F to flatten and
stream the output). One cannot rely on where an exception is caught to
indicate what went wrong. This class signifies that an exception occurred
during reading or deserializing input data even when the exception
propagates from later function calls.
'''


class CloseableIterator(collections.abc.Iterator):
'''
Iterator that also provides close() method.
Provides for safe file closing when reading from files within
iterators/generators, where the scope cannot be controlled to use a context
manager.
'''
closer = None
it = None

def __init__(self, closer, it):
self.closer = closer
self.it = it

def close(self):
self.closer()

def __iter__(self):
return self

def __next__(self):
return next(self.it)


def _generate_json_from_lines_iter(lines_iter):
"""
Returns iterator of json objects from newline-delimited json input iterable.
lines_iter is any iterable whose iterator returns strings of individual json
objects. For example, a list of strings or a file-like object of ndjson.
"""

# the set of exceptions file readline() may throw is not documented.
# separating this apart to isolate exceptions arising from reading an
# underlying iterator and file.
it = iter(lines_iter)
while True:
try:
line = next(it)
except StopIteration:
return
except Exception as e:
raise StreamingJsonError from e

stripped = line.strip()
if not stripped:
continue

try:
yield json.loads(stripped)
except json.JSONDecodeError as e:
raise StreamingJsonError from e


class StreamingJsonInput:
"""
Iterator and context manager for streaming json from stdin and files.
"""

# these are "closeable iterators" when backed by a closeable file
current_iterator = None
# functions to construct remaining cloaseable iterators
remaining_iterator_factories = None

def __init__(self, initial_data, stdin, files):
"""
initial_data. String from cli.main.
stdin_or_files. sys.stdin or data file paths.
"""
# set up the iterators
self.remaining_iterator_factories = collections.deque()
if initial_data:
self.remaining_iterator_factories.append(
lambda: CloseableIterator(
lambda: None,
_generate_json_from_lines_iter(initial_data.splitlines())
)
)

if stdin:
self.remaining_iterator_factories.append(
lambda f=stdin: CloseableIterator(
# don't close stdin
lambda: None,
_generate_json_from_lines_iter(f)
),
)

for file in files:
def create_file_iterator(f=file):
# the file must live beyond this function call.
# StreamingJsonInput closes it as a context manager via
# CloseableIterator
# pylint: disable-next=R1732:consider-using-with
try:
opened_file = open(f, 'r')
except OSError as e:
raise StreamingJsonError from e
return CloseableIterator(
lambda f2=opened_file: f2.close(),
_generate_json_from_lines_iter(opened_file)
)
self.remaining_iterator_factories.append(create_file_iterator)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if self.current_iterator:
self.current_iterator.close()

def __iter__(self):
return self

def __next__(self):
"""
Returns the next json object.
Raises StreamingJsonException on any deserialization error, with
__cause__ as the original Exception.
"""
while True:
if self.current_iterator is None:
if not self.remaining_iterator_factories:
raise StopIteration
factory = self.remaining_iterator_factories.popleft()
self.current_iterator = factory()

try:
return next(self.current_iterator)
except StopIteration:
self.current_iterator.close()
self.current_iterator = None


def _compile_query(query):
"""
Compile the provided python code block into a function to transform json.
Expand Down
Loading

0 comments on commit 0fcd472

Please sign in to comment.