Skip to content

Commit

Permalink
Merge branch 'jpl-master' into upstream-master
Browse files Browse the repository at this point in the history
  • Loading branch information
pymonger committed Apr 19, 2018
2 parents fce6ce3 + 53830fb commit 26a4b0e
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 17 deletions.
22 changes: 22 additions & 0 deletions docker/hysds-io.json.lw-tosca-wget-email
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"label":"Wget Script via E-mail",
"component":"tosca",
"submission_type":"individual",
"params" : [
{
"name": "query",
"type": "text",
"from": "passthrough"
},
{
"name": "emails",
"type": "text",
"from": "submitter"
},
{
"name": "name",
"type": "text",
"from": "passthrough"
}
]
}
28 changes: 28 additions & 0 deletions docker/hysds-io.json.lw-tosca-wget-product
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"label":"Wget Script via Product",
"component":"tosca",
"submission_type":"individual",
"params" : [
{
"name": "query",
"type": "text",
"from": "passthrough"
},
{
"name": "emails",
"type": "text",
"from": "value",
"value": "unused"
},
{
"name": "name",
"type": "text",
"from": "passthrough"
},
{
"name": "username",
"from": "passthrough",
"type": "text"
}
]
}
20 changes: 20 additions & 0 deletions docker/job-spec.json.lw-tosca-wget-email
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"required_queues":["system-jobs-queue"],
"command":"/home/ops/verdi/ops/lightweight-jobs/wget.sh",
"imported_worker_files":{"/home/ops/.aws":"/home/ops/.aws"},
"disk_usage":"3GB",
"params" : [
{
"name": "query",
"destination": "positional"
},
{
"name": "emails",
"destination": "positional"
},
{
"name": "name",
"destination": "positional"
}
]
}
24 changes: 24 additions & 0 deletions docker/job-spec.json.lw-tosca-wget-product
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"required_queues":["system-jobs-queue"],
"command":"/home/ops/verdi/ops/lightweight-jobs/wget.sh",
"imported_worker_files":{"/home/ops/.aws":"/home/ops/.aws"},
"disk_usage":"3GB",
"params" : [
{
"name": "query",
"destination": "positional"
},
{
"name": "emails",
"destination": "positional"
},
{
"name": "name",
"destination": "positional"
},
{
"name": "username",
"destination": "context"
}
]
}
83 changes: 66 additions & 17 deletions wget.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
from hysds.celery import app
import boto3
from urlparse import urlparse
import datetime


PRODUCT_TEMPLATE = "product_downloader-{0}-{1}-{2}"

#TODO: Setup logger for this job here. Should log to STDOUT or STDERR as this is a job
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("hysds")


def wget_script(dataset=None):
"""Return wget script."""

Expand All @@ -33,6 +38,21 @@ def wget_script(dataset=None):

scan_result = r.json()
count = scan_result['hits']['total']
#size = int(math.ceil(count/10.0))
#print("SIZE : %d" %size)
#scroll_id = scan_result['_scroll_id']
logging.debug('%s/%s/_search?search_type=scan&scroll=10m&size=%s' % (es_url, index, count))
r = requests.post('%s/%s/_search?search_type=scan&scroll=10m&size=%s' % (es_url, index, count), json.dumps(dataset))
if r.status_code != 200:
print("Failed to query ES. Got status code %d:\n%s" %(r.status_code, json.dumps(r.json(), indent=2)))
logger.debug("Failed to query ES. Got status code %d:\n%s" %
(r.status_code, json.dumps(r.json(), indent=2)))
r.raise_for_status()
logger.debug("result: %s" % pformat(r.json()))

scan_result = r.json()
count = scan_result['hits']['total']

scroll_id = scan_result['_scroll_id']

# stream output a page at a time for better performance and lower memory footprint
Expand Down Expand Up @@ -95,10 +115,10 @@ def stream_wget(scroll_id):
f.write(i)

# for gzip compressed use file extension .tar.gz and modifier "w:gz"
os.rename('wget_script.sh','wget_script.bash')
tar = tarfile.open("wget.tar.gz", "w:gz")
tar.add('wget_script.bash')
tar.close()
#os.rename('wget_script.sh','wget_script.bash')
#tar = tarfile.open("wget.tar.gz", "w:gz")
#tar.add('wget_script.bash')
#tar.close()


def get_s3_files(url):
Expand All @@ -120,6 +140,43 @@ def get_s3_files(url):
files.extend(get_s3_files(folder))
return files

def email(query, emails, rule_name):
'''
Sends out an email with the script attached
'''
# for gzip compressed use file extension .tar.gz and modifier "w:gz"
os.rename('wget_script.sh', 'wget_script.bash')
tar = tarfile.open("wget.tar.gz", "w:gz")
tar.add('wget_script.bash')
tar.close()
attachments = None
cc_recipients = [i.strip() for i in emails.split(',')]
bcc_recipients = []
subject = "[monitor] (wget_script:%s)" % (rule_name)
body = "Product was ingested from query: %s" % query
body += "\n\nYou can use this wget script attached to download products.\n"
body += "Please rename wget_script.bash to wget_script.sh before running it."
if os.path.isfile('wget.tar.gz'):
wget_content = open('wget.tar.gz', 'r').read()
attachments = { 'wget.tar.gz':wget_content}
notify_by_email.send_email(getpass.getuser(), cc_recipients,
bcc_recipients, subject, body, attachments=attachments)

def make_product(rule_name, query):
'''
Make a product out of this WGET script
'''
with open("_context.json", "r") as fp:
context = json.load(fp)
name = PRODUCT_TEMPLATE.format(rule_name, context["username"],
datetime.datetime.now().strftime("%Y%m%dT%H%M%S"))
os.mkdir(name)
os.rename("wget_script.sh", "{0}/wget_script.bash".format(name))
with open("{0}/{0}.met.json".format(name), "w") as fp:
json.dump({"source_query": json.dumps(query)}, fp)
with open("{0}/{0}.dataset.json".format(name), "w") as fp:
json.dump({"id": name, "version": "v0.1"}, fp)

if __name__ == "__main__":
'''
Main program of wget_script
Expand All @@ -132,16 +189,8 @@ def get_s3_files(url):

# getting the script
wget_script(query)
# now email the query
attachments = None
cc_recipients = [i.strip() for i in emails.split(',')]
bcc_recipients = []
subject = "[monitor] (wget_script:%s)" % (rule_name)
body = "Product was ingested from query: %s" % query
body += "\n\nYou can use this wget script attached to download products.\n"
body += "Please rename wget_script.bash to wget_script.sh before running it."
if os.path.isfile('wget.tar.gz'):
wget_content = open('wget.tar.gz','r').read()
attachments = { 'wget.tar.gz':wget_content}
notify_by_email.send_email(getpass.getuser(), cc_recipients, bcc_recipients, subject, body, attachments=attachments)

if emails=="unused":
make_product(rule_name, query)
else:
# now email the query
email(query, emails, rule_name)

0 comments on commit 26a4b0e

Please sign in to comment.