Skip to content

Commit

Permalink
port to python3 (#4) (#5)
Browse files Browse the repository at this point in the history
* initial 2to3 run
* format to PEP8 spec
  • Loading branch information
pymonger authored Mar 20, 2019
1 parent fb39760 commit 6a19411
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 242 deletions.
69 changes: 41 additions & 28 deletions aws_get.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import json, requests, types, re, getpass, sys, os
import json
import requests
import types
import re
import getpass
import sys
import os
from pprint import pformat
import logging
import tarfile
import notify_by_email
from hysds.celery import app
import boto3
from urlparse import urlparse
from urllib.parse import urlparse

#TODO: Setup logger for this job here. Should log to STDOUT or STDERR as this is a job
# TODO: Setup logger for this job here. Should log to STDOUT or STDERR as this is a job
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("aws_get")

Expand All @@ -19,16 +25,19 @@ def aws_get_script(dataset=None):
es_url = app.conf["GRQ_ES_URL"]
index = app.conf["DATASET_ALIAS"]
#facetview_url = app.conf["GRQ_URL"]
print('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index))
logging.debug('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index))
print json.dumps(dataset)
print(('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index)))
logging.debug(
'%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index))
print(json.dumps(dataset))
logging.debug(json.dumps(dataset))

r = requests.post('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index), json.dumps(dataset))
r = requests.post('%s/%s/_search?search_type=scan&scroll=10m&size=100' %
(es_url, index), json.dumps(dataset))
if r.status_code != 200:
print("Failed to query ES. Got status code %d:\n%s" %(r.status_code, json.dumps(r.json(), indent=2)))
logger.debug("Failed to query ES. Got status code %d:\n%s" %
(r.status_code, json.dumps(r.json(), indent=2)))
print(("Failed to query ES. Got status code %d:\n%s" %
(r.status_code, json.dumps(r.json(), indent=2))))
logger.debug("Failed to query ES. Got status code %d:\n%s" %
(r.status_code, json.dumps(r.json(), indent=2)))
r.raise_for_status()
logger.debug("result: %s" % pformat(r.json()))

Expand All @@ -47,32 +56,35 @@ def stream_aws_get(scroll_id):
aws_get_cmd = 'aws s3 sync {} {}\n'

while True:
r = requests.post('%s/_search/scroll?scroll=10m' % es_url, data=scroll_id)
r = requests.post('%s/_search/scroll?scroll=10m' %
es_url, data=scroll_id)
res = r.json()
logger.debug("res: %s" % pformat(res))
scroll_id = res['_scroll_id']
if len(res['hits']['hits']) == 0: break
if len(res['hits']['hits']) == 0:
break
# Elastic Search seems like it's returning duplicate urls. Remove duplicates
unique_urls=[]
for hit in res['hits']['hits']:
[unique_urls.append(url) for url in hit['_source']['urls'] if url not in unique_urls and url.startswith("s3")]
unique_urls = []
for hit in res['hits']['hits']:
[unique_urls.append(url) for url in hit['_source']['urls']
if url not in unique_urls and url.startswith("s3")]

for url in unique_urls:
logging.debug("urls in unique urls: %s",url)
logging.debug("urls in unique urls: %s", url)
parsed_url = urlparse(url)
yield 'echo "downloading %s"\n' % os.path.basename(parsed_url.path)
yield aws_get_cmd.format("{}://{}".format(parsed_url.scheme,
parsed_url.path[1:] if parsed_url.path.startswith('/') else parsed_url.path),
os.path.basename(parsed_url.path))

# malarout: interate over each line of stream_aws_get response, and write to a file which is later attached to the email.
with open('aws_get_script.sh','w') as f:
with open('aws_get_script.sh', 'w') as f:
for i in stream_aws_get(scroll_id):
f.write(i)
f.write(i)

# for gzip compressed use file extension .tar.gz and modifier "w:gz"
os.rename('aws_get_script.sh','aws_get_script.bash')
tar = tarfile.open("aws_get.tar.gz", "w:gz")
os.rename('aws_get_script.sh', 'aws_get_script.bash')
tar = tarfile.open("aws_get.tar.gz", "w:gz")
tar.add('aws_get_script.bash')
tar.close()

Expand All @@ -81,12 +93,12 @@ def stream_aws_get(scroll_id):
'''
Main program of aws_get_script
'''
#encoding to a JSON object
query = {}
query = json.loads(sys.argv[1])
# encoding to a JSON object
query = {}
query = json.loads(sys.argv[1])
emails = sys.argv[2]
rule_name = sys.argv[3]

# getting the script
aws_get_script(query)
# now email the query
Expand All @@ -98,6 +110,7 @@ def stream_aws_get(scroll_id):
body += "\n\nYou can use this AWS get script attached to download products.\n"
body += "Please rename aws_get_script.bash to aws_get_script.sh before running it."
if os.path.isfile('aws_get.tar.gz'):
aws_get_content = open('aws_get.tar.gz','r').read()
attachments = { 'aws_get.tar.gz':aws_get_content}
notify_by_email.send_email(getpass.getuser(), cc_recipients, bcc_recipients, subject, body, attachments=attachments)
aws_get_content = open('aws_get.tar.gz', 'r').read()
attachments = {'aws_get.tar.gz': aws_get_content}
notify_by_email.send_email(getpass.getuser(
), cc_recipients, bcc_recipients, subject, body, attachments=attachments)
5 changes: 3 additions & 2 deletions lib/get_component_configuration.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from hysds.celery import app


def get_component_config(component):
'''
From a component get the common configuration values
@param component - component
'''
if component=="mozart" or component=="figaro":
if component == "mozart" or component == "figaro":
es_url = app.conf["JOBS_ES_URL"]
query_idx = app.conf["STATUS_ALIAS"]
facetview_url = app.conf["MOZART_URL"]
elif component=="tosca":
elif component == "tosca":
es_url = app.conf["GRQ_ES_URL"]
query_idx = app.conf["DATASET_ALIAS"]
facetview_url = app.conf["GRQ_URL"]
Expand Down
132 changes: 80 additions & 52 deletions notify_by_email.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#!/usr/bin/env python
import os, sys, getpass, requests, json, types, base64, socket
import os
import sys
import getpass
import requests
import json
import types
import base64
import socket
from smtplib import SMTP
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
Expand All @@ -15,13 +22,16 @@ def get_hostname():
"""Get hostname."""

# get hostname
try: return socket.getfqdn()
try:
return socket.getfqdn()
except:
# get IP
try: return socket.gethostbyname(socket.gethostname())
try:
return socket.gethostbyname(socket.gethostname())
except:
raise RuntimeError("Failed to resolve hostname for full email address. Check system.")

raise RuntimeError(
"Failed to resolve hostname for full email address. Check system.")


def send_email(sender, cc_recipients, bcc_recipients, subject, body, attachments=None):
"""Send an email.
Expand All @@ -37,7 +47,7 @@ def send_email(sender, cc_recipients, bcc_recipients, subject, body, attachments
The charset of the email will be the first one out of US-ASCII, ISO-8859-1
and UTF-8 that can represent all the characters occurring in the email.
"""

# combined recipients
recipients = cc_recipients + bcc_recipients

Expand All @@ -62,16 +72,16 @@ def send_email(sender, cc_recipients, bcc_recipients, subject, body, attachments

# We must always pass Unicode strings to Header, otherwise it will
# use RFC 2047 encoding even on plain ASCII strings.
sender_name = str(Header(unicode(sender_name), header_charset))
sender_name = str(Header(str(sender_name), header_charset))
unicode_parsed_cc_recipients = []
for recipient_name, recipient_addr in parsed_cc_recipients:
recipient_name = str(Header(unicode(recipient_name), header_charset))
recipient_name = str(Header(str(recipient_name), header_charset))
# Make sure email addresses do not contain non-ASCII characters
recipient_addr = recipient_addr.encode('ascii')
unicode_parsed_cc_recipients.append((recipient_name, recipient_addr))
unicode_parsed_bcc_recipients = []
for recipient_name, recipient_addr in parsed_bcc_recipients:
recipient_name = str(Header(unicode(recipient_name), header_charset))
recipient_name = str(Header(str(recipient_name), header_charset))
# Make sure email addresses do not contain non-ASCII characters
recipient_addr = recipient_addr.encode('ascii')
unicode_parsed_bcc_recipients.append((recipient_name, recipient_addr))
Expand All @@ -85,25 +95,26 @@ def send_email(sender, cc_recipients, bcc_recipients, subject, body, attachments
for recipient_name, recipient_addr in unicode_parsed_cc_recipients])
msg['BCC'] = COMMASPACE.join([formataddr((recipient_name, recipient_addr))
for recipient_name, recipient_addr in unicode_parsed_bcc_recipients])
msg['Subject'] = Header(unicode(subject), header_charset)
msg['Subject'] = Header(str(subject), header_charset)
msg['FROM'] = "[email protected]"
msg.attach(MIMEText(body.encode(body_charset), 'plain', body_charset))

# Add attachments
if isinstance(attachments, types.DictType):
if isinstance(attachments, dict):
for fname in attachments:
part = MIMEBase('application', "octet-stream")
part.set_payload(attachments[fname])
Encoders.encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename="%s"' % fname)
part.add_header('Content-Disposition',
'attachment; filename="%s"' % fname)
msg.attach(part)

#print "#" * 80
#print msg.as_string()
# print "#" * 80
# print msg.as_string()

# Send the message via SMTP to docker host
smtp_url = "smtp://%s:25" % get_container_host_ip()
print "smtp_url : %s",smtp_url
print("smtp_url : %s", smtp_url)
smtp = SMTP(get_container_host_ip())
smtp.sendmail(sender, recipients, msg.as_string())
smtp.quit()
Expand All @@ -124,31 +135,40 @@ def get_source(es_url, query_idx, objectid):
}
}
}
print 'get_source debug:', '%s/%s/_search',es_url," ", query_idx,' ',json.dumps(query)
r = requests.post('%s/%s/_search' % (es_url, query_idx), data=json.dumps(query))
print('get_source debug:', '%s/%s/_search', es_url,
" ", query_idx, ' ', json.dumps(query))
r = requests.post('%s/%s/_search' %
(es_url, query_idx), data=json.dumps(query))
r.raise_for_status()
result = r.json()
if result['hits']['total'] == 0: return None
else: return result['hits']['hits'][0]['_source']
if result['hits']['total'] == 0:
return None
else:
return result['hits']['hits'][0]['_source']


def get_cities(src):
"""Return list of cities."""

cities = []
for city in src.get('city',[]):
cities.append("%s, %s" % (city.get('name', ''), city.get('admin1_name', '')))
for city in src.get('city', []):
cities.append("%s, %s" %
(city.get('name', ''), city.get('admin1_name', '')))
return cities


def get_value(d, key):
"""Return value from source based on key."""

for k in key.split('.'):
if k in d: d = d[k]
else: return None
if isinstance(d, types.ListType): return ', '.join([str(i) for i in d])
else: return d
if k in d:
d = d[k]
else:
return None
if isinstance(d, list):
return ', '.join([str(i) for i in d])
else:
return d


def get_metadata_snippet(src, snippet_cfg):
Expand All @@ -157,9 +177,11 @@ def get_metadata_snippet(src, snippet_cfg):
body = ""
for k, label in snippet_cfg:
val = get_value(src, k)
if val is not None: body += "%s: %s\n" % (label, val)
if val is not None:
body += "%s: %s\n" % (label, val)
body += "location type: %s\n" % src.get('location', {}).get('type', None)
body += "location coordinates: %s\n" % src.get('location', {}).get('coordinates', [])
body += "location coordinates: %s\n" % src.get(
'location', {}).get('coordinates', [])
cities = get_cities(src)
body += "Closest cities: %s" % "\n ".join(cities)
return body
Expand All @@ -169,36 +191,39 @@ def get_facetview_link(facetview_url, objectid, system_version=None):
"""Return link to objectid in FacetView interface."""

if system_version is None:
b64 = base64.urlsafe_b64encode('{"query":{"query_string":{"query":"_id:%s"}}}' % objectid)
b64 = base64.urlsafe_b64encode(
'{"query":{"query_string":{"query":"_id:%s"}}}' % objectid)
else:
b64 = base64.urlsafe_b64encode('{"query":{"query_string":{"query":"_id:%s AND system_version:%s"}}}' % (objectid, system_version))
if facetview_url.endswith('/'): facetview_url = facetview_url[:-1]
b64 = base64.urlsafe_b64encode(
'{"query":{"query_string":{"query":"_id:%s AND system_version:%s"}}}' % (objectid, system_version))
if facetview_url.endswith('/'):
facetview_url = facetview_url[:-1]
return '%s/?base64=%s' % (facetview_url, b64)


if __name__ == "__main__":
settings_file = os.path.normpath(
os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'settings.json')
)
os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'settings.json')
)
settings = json.load(open(settings_file))

objectid = sys.argv[1]
url = sys.argv[2]
emails = sys.argv[3]
rule_name = sys.argv[4]
component = sys.argv[5]

if component=="mozart" or component=="figaro":
es_url = app.conf["JOBS_ES_URL"]
query_idx = app.conf["STATUS_ALIAS"]
facetview_url = app.conf["MOZART_URL"]
elif component=="tosca":
es_url = app.conf["GRQ_ES_URL"]
query_idx = app.conf["DATASET_ALIAS"]
#facetview_url = app.conf["TOSCA_URL"]
#updating facetview_url with updated aria-search-beta hostname
if component == "mozart" or component == "figaro":
es_url = app.conf["JOBS_ES_URL"]
query_idx = app.conf["STATUS_ALIAS"]
facetview_url = app.conf["MOZART_URL"]
elif component == "tosca":
es_url = app.conf["GRQ_ES_URL"]
query_idx = app.conf["DATASET_ALIAS"]
#facetview_url = app.conf["TOSCA_URL"]
# updating facetview_url with updated aria-search-beta hostname
facetview_url = "https://aria-search-beta.jpl.nasa.gov/search"

cc_recipients = [i.strip() for i in emails.split(',')]
Expand All @@ -211,7 +236,7 @@ def get_facetview_link(facetview_url, objectid, system_version=None):
# attach metadata json
body += "\n\n%s" % get_metadata_snippet(src, settings['SNIPPET_CFG'])
body += "\n\nThe entire metadata json for this product has been attached for your convenience.\n\n"
attachments = { 'metadata.json': json.dumps(src, indent=2) }
attachments = {'metadata.json': json.dumps(src, indent=2)}

# attach browse images
if len(src['browse_urls']) > 0:
Expand All @@ -222,14 +247,17 @@ def get_facetview_link(facetview_url, objectid, system_version=None):
small_img = i['small_img']
small_img_url = os.path.join(browse_url, small_img)
r = requests.get(small_img_url)
if r.status_code != 200: continue
if r.status_code != 200:
continue
attachments[small_img] = r.content
else: body += "\n\n"
else:
body += "\n\n"
body += "You may access the product here:\n\n%s" % url
facet_url = get_facetview_link(facetview_url, objectid, None if src is None else src.get('system_version', None))
facet_url = get_facetview_link(
facetview_url, objectid, None if src is None else src.get('system_version', None))
if facet_url is not None:
body += "\n\nYou may view this product in FacetView here:\n\n%s" % facet_url
body += "\n\nNOTE: You may have to cut and paste the FacetView link into your "
body += "browser's address bar to prevent your email client from escaping the curly brackets."
send_email("%s@%s" % (getpass.getuser(), get_hostname()), cc_recipients,
send_email("%s@%s" % (getpass.getuser(), get_hostname()), cc_recipients,
bcc_recipients, subject, body, attachments=attachments)
Loading

0 comments on commit 6a19411

Please sign in to comment.