From 8cb6d9f5cec7c020be57a951bfa9fcba812ac950 Mon Sep 17 00:00:00 2001 From: Phil Tran Date: Mon, 1 Oct 2018 00:50:58 -0700 Subject: [PATCH 1/4] Added timerange, no-dup, m2m options --- mi/core/instrument/playback.py | 99 +++++++++++++++++++++++++++++++--- 1 file changed, 91 insertions(+), 8 deletions(-) diff --git a/mi/core/instrument/playback.py b/mi/core/instrument/playback.py index 041d2f91..344104be 100644 --- a/mi/core/instrument/playback.py +++ b/mi/core/instrument/playback.py @@ -6,14 +6,21 @@ @brief Playback process using ZMQ messaging. Usage: - playback datalog [--allowed=] [--max_events=] ... - playback ascii [--allowed=] [--max_events=] ... - playback chunky [--allowed=] [--max_events=] ... - playback zplsc [--allowed=] [--max_events=] ... + playback datalog [--allowed=] [--max_events=] + [--timerange=] [--no-dup] [--m2m] ... + playback ascii [--allowed=] [--max_events=] + [--timerange=] [--no-dup] [--m2m] ... + playback chunky [--allowed=] [--max_events=] + [--timerange=] [--no-dup] [--m2m] ... + playback zplsc [--allowed=] [--max_events=] + [--timerange=] [--no-dup] [--m2m] ... Options: - -h, --help Show this screen - --allowed= Comma-separated list of publishable particles + -h, --help Show this screen + --allowed= Comma-separated list of publishable particles + --timerange= Comma-seperated range for times in YYYY-MM-DDTHH:MM:SS.SSSSZ format + --no-dup Don't allow duplicates to be ingested + --m2m Use m2m To run without installing: python -m mi.core.instrument.playback ... @@ -37,6 +44,7 @@ from ooi_port_agent.common import PacketType from ooi_port_agent.packet import Packet, PacketHeader from wrapper import EventKeys, encode_exception, DriverWrapper +execfile('/home/asadev/uframes/repos/preload-database/tools/m2m.py') __author__ = 'Ronald Ronquillo' __license__ = 'Apache 2.0' @@ -124,7 +132,8 @@ def packet_from_fh(file_handle): class PlaybackWrapper(object): - def __init__(self, module, refdes, event_url, particle_url, reader_klass, allowed, files, max_events, handler=None): + def __init__(self, module, refdes, event_url, particle_url, reader_klass, allowed, files, max_events, + handler=None, times=None, no_dup=None, ): version = DriverWrapper.get_version(module) headers = {'sensor': refdes, 'deliveryType': 'streamed', 'version': version, 'module': module} self.max_events = max_events @@ -132,6 +141,8 @@ def __init__(self, module, refdes, event_url, particle_url, reader_klass, allowe self.particle_publisher = Publisher.from_url(particle_url, handler, headers, allowed, max_events) self.protocol = self.construct_protocol(module) self.reader = reader_klass(files, self.got_data) + self.times = times + self.no_dup = no_dup def set_header_filename(self, filename): self.event_publisher.set_source(filename) @@ -343,17 +354,80 @@ def _process_packet(self): return False +class M2mPlayback(MachineToMachine): + """ + Inherits from MachineToMachine to handle cabled-specific m2m requests. + """ + + URLS = \ + { + 'uframe-test': 'https://ooinet-dev-01.oceanobservatories.org', + 'uframe-2-test': 'https://ooinet-dev-02.oceanobservatories.org', + 'uframe-3-test': 'https://ooinet-dev-03.oceanobservatories.org', + } + + def __init__(self, base_url, refdes): + + self.refdes = refdes + self.limit = 5 + self.timeframe = 'YYYY-MM-DDTHH:MM:SS.SSSSZ' + + with open('api', 'r') as fd: + api = fd.readlines() + + # get api user/key and delete the '\n' char at end + api_user = api[0].split(' = ')[1][:-1] + api_key = api[1].split(' = ')[1][:-1] + + MachineToMachine.__init__(self, base_url, api_user, api_key) + + self.base_cabled_url = self.inv_url + '/' + '/'.join(self.refdes.split('-', 2)) + '/streamed/' + self.limit_url = '?limit=' + str(self.limit) + + self.streams_list = [] + self.streamed_data = {} + self.preferred_timestamp = {} + self.timestamps = {} + + def request_streams_list(self): + """ + Will acquire a list of streams according to the reference designator + """ + self.streams_list = self.requests(self.base_cabled_url) + + def request_stream_data(self): + """ + Will acquire JSON data from the list of streams + """ + for stream in self.streams_list: + self.streamed_data[stream] = self.requests(self.base_cabled_url + stream + '/' + self.limit_url) + self.preferred_timestamp[stream] = self.streamed_data[stream][0]['preferred_timestamp'] + + def get_times(self): + """ + Will build a table with a list of timestamps according to the stream name. NTP timestamp + """ + for stream in self.streams_list: + times_list = [] + for data in self.streamed_data[stream]: + times_list.append(data[self.preferred_timestamp[stream]]) + + self.timestamps[stream] = times_list + + def main(): options = docopt(__doc__) - module = options[''] refdes = options[''] event_url = options[''] particle_url = options[''] files = options.get('') allowed = options.get('--allowed') + timerange = options.get('--timerange') + if allowed is not None: allowed = [_.strip() for _ in allowed.split(',')] + max_events = options.get('--max_events') if not max_events: max_events = Publisher.DEFAULT_MAX_EVENTS @@ -379,7 +453,15 @@ def main(): else: reader = None + # need the timerange, no-dup, and m2m options to perform M2M + if timerange and options['--no-dup'] and options['--m2m']: + timerange = timerange.split(',') + machine = particle_url.split('@')[1].split('?')[0] + base_url = M2mPlayback.URLS[machine] + m2m_playback = M2mPlayback(base_url, refdes) + wrapper = PlaybackWrapper(module, refdes, event_url, particle_url, reader, allowed, files, max_events) + if zplsc_reader: wrapper.zplsc_playback() else: @@ -387,3 +469,4 @@ def main(): if __name__ == '__main__': main() + From 58bd7cdfe57b9f83970401cde0188171187ea2c3 Mon Sep 17 00:00:00 2001 From: Phil Tran Date: Mon, 1 Oct 2018 01:13:24 -0700 Subject: [PATCH 2/4] Added limit option --- mi/core/instrument/playback.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/mi/core/instrument/playback.py b/mi/core/instrument/playback.py index 344104be..8085499d 100644 --- a/mi/core/instrument/playback.py +++ b/mi/core/instrument/playback.py @@ -7,13 +7,13 @@ Usage: playback datalog [--allowed=] [--max_events=] - [--timerange=] [--no-dup] [--m2m] ... + [--timerange=] [--no-dup] [--m2m] [--limit=] ... playback ascii [--allowed=] [--max_events=] - [--timerange=] [--no-dup] [--m2m] ... + [--timerange=] [--no-dup] [--m2m] [--limit=] ... playback chunky [--allowed=] [--max_events=] - [--timerange=] [--no-dup] [--m2m] ... + [--timerange=] [--no-dup] [--m2m] [--limit=] ... playback zplsc [--allowed=] [--max_events=] - [--timerange=] [--no-dup] [--m2m] ... + [--timerange=] [--no-dup] [--m2m] [--limit=] ... Options: -h, --help Show this screen @@ -366,11 +366,11 @@ class M2mPlayback(MachineToMachine): 'uframe-3-test': 'https://ooinet-dev-03.oceanobservatories.org', } - def __init__(self, base_url, refdes): + def __init__(self, base_url, refdes, limit): self.refdes = refdes - self.limit = 5 self.timeframe = 'YYYY-MM-DDTHH:MM:SS.SSSSZ' + self.limit = limit with open('api', 'r') as fd: api = fd.readlines() @@ -382,7 +382,8 @@ def __init__(self, base_url, refdes): MachineToMachine.__init__(self, base_url, api_user, api_key) self.base_cabled_url = self.inv_url + '/' + '/'.join(self.refdes.split('-', 2)) + '/streamed/' - self.limit_url = '?limit=' + str(self.limit) + + self.limit_url = '?limit=' + str(limit) self.streams_list = [] self.streamed_data = {} @@ -400,7 +401,11 @@ def request_stream_data(self): Will acquire JSON data from the list of streams """ for stream in self.streams_list: - self.streamed_data[stream] = self.requests(self.base_cabled_url + stream + '/' + self.limit_url) + if self.limit: + self.streamed_data[stream] = self.requests(self.base_cabled_url + stream + '/' + self.limit_url) + else: + self.streamed_data[stream] = self.requests(self.base_cabled_url + stream + '/') + self.preferred_timestamp[stream] = self.streamed_data[stream][0]['preferred_timestamp'] def get_times(self): @@ -424,6 +429,7 @@ def main(): files = options.get('') allowed = options.get('--allowed') timerange = options.get('--timerange') + limit = options.get('--limit') if allowed is not None: allowed = [_.strip() for _ in allowed.split(',')] @@ -454,12 +460,12 @@ def main(): reader = None # need the timerange, no-dup, and m2m options to perform M2M - if timerange and options['--no-dup'] and options['--m2m']: + if timerange and options['--no-dup'] and options['--m2m'] and limit: timerange = timerange.split(',') machine = particle_url.split('@')[1].split('?')[0] base_url = M2mPlayback.URLS[machine] - m2m_playback = M2mPlayback(base_url, refdes) - + m2m_playback = M2mPlayback(base_url, refdes, limit) + wrapper = PlaybackWrapper(module, refdes, event_url, particle_url, reader, allowed, files, max_events) if zplsc_reader: From 86eec7d0d87204d93cb0cfa5395c4b0d87ed1ab4 Mon Sep 17 00:00:00 2001 From: Phil Tran Date: Mon, 1 Oct 2018 01:15:24 -0700 Subject: [PATCH 3/4] Added limit option --- mi/core/instrument/playback.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/mi/core/instrument/playback.py b/mi/core/instrument/playback.py index 8085499d..6bf07fc6 100644 --- a/mi/core/instrument/playback.py +++ b/mi/core/instrument/playback.py @@ -133,7 +133,7 @@ def packet_from_fh(file_handle): class PlaybackWrapper(object): def __init__(self, module, refdes, event_url, particle_url, reader_klass, allowed, files, max_events, - handler=None, times=None, no_dup=None, ): + handler=None): version = DriverWrapper.get_version(module) headers = {'sensor': refdes, 'deliveryType': 'streamed', 'version': version, 'module': module} self.max_events = max_events @@ -141,8 +141,6 @@ def __init__(self, module, refdes, event_url, particle_url, reader_klass, allowe self.particle_publisher = Publisher.from_url(particle_url, handler, headers, allowed, max_events) self.protocol = self.construct_protocol(module) self.reader = reader_klass(files, self.got_data) - self.times = times - self.no_dup = no_dup def set_header_filename(self, filename): self.event_publisher.set_source(filename) @@ -464,8 +462,7 @@ def main(): timerange = timerange.split(',') machine = particle_url.split('@')[1].split('?')[0] base_url = M2mPlayback.URLS[machine] - m2m_playback = M2mPlayback(base_url, refdes, limit) - + m2m_playback = M2mPlayback(base_url, refdes, limit) wrapper = PlaybackWrapper(module, refdes, event_url, particle_url, reader, allowed, files, max_events) if zplsc_reader: From a1f55eb63cd41ad2311e0423892fe96359e19e8b Mon Sep 17 00:00:00 2001 From: Phil Tran Date: Mon, 1 Oct 2018 01:15:24 -0700 Subject: [PATCH 4/4] Added limit option --- mi/core/instrument/playback.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/mi/core/instrument/playback.py b/mi/core/instrument/playback.py index 8085499d..b3e7475d 100644 --- a/mi/core/instrument/playback.py +++ b/mi/core/instrument/playback.py @@ -132,8 +132,7 @@ def packet_from_fh(file_handle): class PlaybackWrapper(object): - def __init__(self, module, refdes, event_url, particle_url, reader_klass, allowed, files, max_events, - handler=None, times=None, no_dup=None, ): + def __init__(self, module, refdes, event_url, particle_url, reader_klass, allowed, files, max_events, handler=None): version = DriverWrapper.get_version(module) headers = {'sensor': refdes, 'deliveryType': 'streamed', 'version': version, 'module': module} self.max_events = max_events @@ -141,8 +140,6 @@ def __init__(self, module, refdes, event_url, particle_url, reader_klass, allowe self.particle_publisher = Publisher.from_url(particle_url, handler, headers, allowed, max_events) self.protocol = self.construct_protocol(module) self.reader = reader_klass(files, self.got_data) - self.times = times - self.no_dup = no_dup def set_header_filename(self, filename): self.event_publisher.set_source(filename) @@ -464,8 +461,7 @@ def main(): timerange = timerange.split(',') machine = particle_url.split('@')[1].split('?')[0] base_url = M2mPlayback.URLS[machine] - m2m_playback = M2mPlayback(base_url, refdes, limit) - + m2m_playback = M2mPlayback(base_url, refdes, limit) wrapper = PlaybackWrapper(module, refdes, event_url, particle_url, reader, allowed, files, max_events) if zplsc_reader: