Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #13148 - Added options for future M2M usage to ingest missing dates #62

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 92 additions & 7 deletions mi/core/instrument/playback.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@
@brief Playback process using ZMQ messaging.

Usage:
playback datalog <module> <refdes> <event_url> <particle_url> [--allowed=<particles>] [--max_events=<events>] <files>...
playback ascii <module> <refdes> <event_url> <particle_url> [--allowed=<particles>] [--max_events=<events>] <files>...
playback chunky <module> <refdes> <event_url> <particle_url> [--allowed=<particles>] [--max_events=<events>] <files>...
playback zplsc <module> <refdes> <event_url> <particle_url> [--allowed=<particles>] [--max_events=<events>] <files>...
playback datalog <module> <refdes> <event_url> <particle_url> [--allowed=<particles>] [--max_events=<events>]
[--timerange=<times>] [--no-dup] [--m2m] [--limit=<rangelimit>] <files>...
playback ascii <module> <refdes> <event_url> <particle_url> [--allowed=<particles>] [--max_events=<events>]
[--timerange=<times>] [--no-dup] [--m2m] [--limit=<rangelimit>] <files>...
playback chunky <module> <refdes> <event_url> <particle_url> [--allowed=<particles>] [--max_events=<events>]
[--timerange=<times>] [--no-dup] [--m2m] [--limit=<rangelimit>] <files>...
playback zplsc <module> <refdes> <event_url> <particle_url> [--allowed=<particles>] [--max_events=<events>]
[--timerange=<times>] [--no-dup] [--m2m] [--limit=<rangelimit>] <files>...

Options:
-h, --help Show this screen
--allowed=<particles> Comma-separated list of publishable particles
-h, --help Show this screen
--allowed=<particles> Comma-separated list of publishable particles
--timerange=<times> Comma-seperated range for times in YYYY-MM-DDTHH:MM:SS.SSSSZ format
--no-dup Don't allow duplicates to be ingested
--m2m Use m2m

To run without installing:
python -m mi.core.instrument.playback ...
Expand All @@ -37,6 +44,7 @@
from ooi_port_agent.common import PacketType
from ooi_port_agent.packet import Packet, PacketHeader
from wrapper import EventKeys, encode_exception, DriverWrapper
execfile('/home/asadev/uframes/repos/preload-database/tools/m2m.py')

__author__ = 'Ronald Ronquillo'
__license__ = 'Apache 2.0'
Expand Down Expand Up @@ -343,17 +351,86 @@ def _process_packet(self):
return False


class M2mPlayback(MachineToMachine):
"""
Inherits from MachineToMachine to handle cabled-specific m2m requests.
"""

URLS = \
{
'uframe-test': 'https://ooinet-dev-01.oceanobservatories.org',
'uframe-2-test': 'https://ooinet-dev-02.oceanobservatories.org',
'uframe-3-test': 'https://ooinet-dev-03.oceanobservatories.org',
}

def __init__(self, base_url, refdes, limit):

self.refdes = refdes
self.timeframe = 'YYYY-MM-DDTHH:MM:SS.SSSSZ'
self.limit = limit

with open('api', 'r') as fd:
api = fd.readlines()

# get api user/key and delete the '\n' char at end
api_user = api[0].split(' = ')[1][:-1]
api_key = api[1].split(' = ')[1][:-1]

MachineToMachine.__init__(self, base_url, api_user, api_key)

self.base_cabled_url = self.inv_url + '/' + '/'.join(self.refdes.split('-', 2)) + '/streamed/'

self.limit_url = '?limit=' + str(limit)

self.streams_list = []
self.streamed_data = {}
self.preferred_timestamp = {}
self.timestamps = {}

def request_streams_list(self):
"""
Will acquire a list of streams according to the reference designator
"""
self.streams_list = self.requests(self.base_cabled_url)

def request_stream_data(self):
"""
Will acquire JSON data from the list of streams
"""
for stream in self.streams_list:
if self.limit:
self.streamed_data[stream] = self.requests(self.base_cabled_url + stream + '/' + self.limit_url)
else:
self.streamed_data[stream] = self.requests(self.base_cabled_url + stream + '/')

self.preferred_timestamp[stream] = self.streamed_data[stream][0]['preferred_timestamp']

def get_times(self):
"""
Will build a table with a list of timestamps according to the stream name. NTP timestamp
"""
for stream in self.streams_list:
times_list = []
for data in self.streamed_data[stream]:
times_list.append(data[self.preferred_timestamp[stream]])

self.timestamps[stream] = times_list


def main():
options = docopt(__doc__)

module = options['<module>']
refdes = options['<refdes>']
event_url = options['<event_url>']
particle_url = options['<particle_url>']
files = options.get('<files>')
allowed = options.get('--allowed')
timerange = options.get('--timerange')
limit = options.get('--limit')

if allowed is not None:
allowed = [_.strip() for _ in allowed.split(',')]

max_events = options.get('--max_events')
if not max_events:
max_events = Publisher.DEFAULT_MAX_EVENTS
Expand All @@ -379,11 +456,19 @@ def main():
else:
reader = None

# need the timerange, no-dup, and m2m options to perform M2M
if timerange and options['--no-dup'] and options['--m2m'] and limit:
timerange = timerange.split(',')
machine = particle_url.split('@')[1].split('?')[0]
base_url = M2mPlayback.URLS[machine]
m2m_playback = M2mPlayback(base_url, refdes, limit)
wrapper = PlaybackWrapper(module, refdes, event_url, particle_url, reader, allowed, files, max_events)

if zplsc_reader:
wrapper.zplsc_playback()
else:
wrapper.playback()

if __name__ == '__main__':
main()