diff --git a/aws_get.py b/aws_get.py index f7296a2..5b660bc 100644 --- a/aws_get.py +++ b/aws_get.py @@ -1,13 +1,19 @@ -import json, requests, types, re, getpass, sys, os +import json +import requests +import types +import re +import getpass +import sys +import os from pprint import pformat import logging import tarfile import notify_by_email from hysds.celery import app import boto3 -from urlparse import urlparse +from urllib.parse import urlparse -#TODO: Setup logger for this job here. Should log to STDOUT or STDERR as this is a job +# TODO: Setup logger for this job here. Should log to STDOUT or STDERR as this is a job logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger("aws_get") @@ -19,16 +25,19 @@ def aws_get_script(dataset=None): es_url = app.conf["GRQ_ES_URL"] index = app.conf["DATASET_ALIAS"] #facetview_url = app.conf["GRQ_URL"] - print('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index)) - logging.debug('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index)) - print json.dumps(dataset) + print(('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index))) + logging.debug( + '%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index)) + print(json.dumps(dataset)) logging.debug(json.dumps(dataset)) - r = requests.post('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index), json.dumps(dataset)) + r = requests.post('%s/%s/_search?search_type=scan&scroll=10m&size=100' % + (es_url, index), json.dumps(dataset)) if r.status_code != 200: - print("Failed to query ES. Got status code %d:\n%s" %(r.status_code, json.dumps(r.json(), indent=2))) - logger.debug("Failed to query ES. Got status code %d:\n%s" % - (r.status_code, json.dumps(r.json(), indent=2))) + print(("Failed to query ES. Got status code %d:\n%s" % + (r.status_code, json.dumps(r.json(), indent=2)))) + logger.debug("Failed to query ES. Got status code %d:\n%s" % + (r.status_code, json.dumps(r.json(), indent=2))) r.raise_for_status() logger.debug("result: %s" % pformat(r.json())) @@ -47,32 +56,35 @@ def stream_aws_get(scroll_id): aws_get_cmd = 'aws s3 sync {} {}\n' while True: - r = requests.post('%s/_search/scroll?scroll=10m' % es_url, data=scroll_id) + r = requests.post('%s/_search/scroll?scroll=10m' % + es_url, data=scroll_id) res = r.json() logger.debug("res: %s" % pformat(res)) scroll_id = res['_scroll_id'] - if len(res['hits']['hits']) == 0: break + if len(res['hits']['hits']) == 0: + break # Elastic Search seems like it's returning duplicate urls. Remove duplicates - unique_urls=[] - for hit in res['hits']['hits']: - [unique_urls.append(url) for url in hit['_source']['urls'] if url not in unique_urls and url.startswith("s3")] + unique_urls = [] + for hit in res['hits']['hits']: + [unique_urls.append(url) for url in hit['_source']['urls'] + if url not in unique_urls and url.startswith("s3")] for url in unique_urls: - logging.debug("urls in unique urls: %s",url) + logging.debug("urls in unique urls: %s", url) parsed_url = urlparse(url) yield 'echo "downloading %s"\n' % os.path.basename(parsed_url.path) yield aws_get_cmd.format("{}://{}".format(parsed_url.scheme, parsed_url.path[1:] if parsed_url.path.startswith('/') else parsed_url.path), os.path.basename(parsed_url.path)) - + # malarout: interate over each line of stream_aws_get response, and write to a file which is later attached to the email. - with open('aws_get_script.sh','w') as f: + with open('aws_get_script.sh', 'w') as f: for i in stream_aws_get(scroll_id): - f.write(i) + f.write(i) # for gzip compressed use file extension .tar.gz and modifier "w:gz" - os.rename('aws_get_script.sh','aws_get_script.bash') - tar = tarfile.open("aws_get.tar.gz", "w:gz") + os.rename('aws_get_script.sh', 'aws_get_script.bash') + tar = tarfile.open("aws_get.tar.gz", "w:gz") tar.add('aws_get_script.bash') tar.close() @@ -81,12 +93,12 @@ def stream_aws_get(scroll_id): ''' Main program of aws_get_script ''' - #encoding to a JSON object - query = {} - query = json.loads(sys.argv[1]) + # encoding to a JSON object + query = {} + query = json.loads(sys.argv[1]) emails = sys.argv[2] rule_name = sys.argv[3] - + # getting the script aws_get_script(query) # now email the query @@ -98,6 +110,7 @@ def stream_aws_get(scroll_id): body += "\n\nYou can use this AWS get script attached to download products.\n" body += "Please rename aws_get_script.bash to aws_get_script.sh before running it." if os.path.isfile('aws_get.tar.gz'): - aws_get_content = open('aws_get.tar.gz','r').read() - attachments = { 'aws_get.tar.gz':aws_get_content} - notify_by_email.send_email(getpass.getuser(), cc_recipients, bcc_recipients, subject, body, attachments=attachments) + aws_get_content = open('aws_get.tar.gz', 'r').read() + attachments = {'aws_get.tar.gz': aws_get_content} + notify_by_email.send_email(getpass.getuser( + ), cc_recipients, bcc_recipients, subject, body, attachments=attachments) diff --git a/lib/get_component_configuration.py b/lib/get_component_configuration.py index f482d72..4146dae 100644 --- a/lib/get_component_configuration.py +++ b/lib/get_component_configuration.py @@ -1,15 +1,16 @@ from hysds.celery import app + def get_component_config(component): ''' From a component get the common configuration values @param component - component ''' - if component=="mozart" or component=="figaro": + if component == "mozart" or component == "figaro": es_url = app.conf["JOBS_ES_URL"] query_idx = app.conf["STATUS_ALIAS"] facetview_url = app.conf["MOZART_URL"] - elif component=="tosca": + elif component == "tosca": es_url = app.conf["GRQ_ES_URL"] query_idx = app.conf["DATASET_ALIAS"] facetview_url = app.conf["GRQ_URL"] diff --git a/notify_by_email.py b/notify_by_email.py index 0d347d5..ff34cf0 100644 --- a/notify_by_email.py +++ b/notify_by_email.py @@ -1,5 +1,12 @@ #!/usr/bin/env python -import os, sys, getpass, requests, json, types, base64, socket +import os +import sys +import getpass +import requests +import json +import types +import base64 +import socket from smtplib import SMTP from email.MIMEMultipart import MIMEMultipart from email.MIMEText import MIMEText @@ -15,13 +22,16 @@ def get_hostname(): """Get hostname.""" # get hostname - try: return socket.getfqdn() + try: + return socket.getfqdn() except: # get IP - try: return socket.gethostbyname(socket.gethostname()) + try: + return socket.gethostbyname(socket.gethostname()) except: - raise RuntimeError("Failed to resolve hostname for full email address. Check system.") - + raise RuntimeError( + "Failed to resolve hostname for full email address. Check system.") + def send_email(sender, cc_recipients, bcc_recipients, subject, body, attachments=None): """Send an email. @@ -37,7 +47,7 @@ def send_email(sender, cc_recipients, bcc_recipients, subject, body, attachments The charset of the email will be the first one out of US-ASCII, ISO-8859-1 and UTF-8 that can represent all the characters occurring in the email. """ - + # combined recipients recipients = cc_recipients + bcc_recipients @@ -62,16 +72,16 @@ def send_email(sender, cc_recipients, bcc_recipients, subject, body, attachments # We must always pass Unicode strings to Header, otherwise it will # use RFC 2047 encoding even on plain ASCII strings. - sender_name = str(Header(unicode(sender_name), header_charset)) + sender_name = str(Header(str(sender_name), header_charset)) unicode_parsed_cc_recipients = [] for recipient_name, recipient_addr in parsed_cc_recipients: - recipient_name = str(Header(unicode(recipient_name), header_charset)) + recipient_name = str(Header(str(recipient_name), header_charset)) # Make sure email addresses do not contain non-ASCII characters recipient_addr = recipient_addr.encode('ascii') unicode_parsed_cc_recipients.append((recipient_name, recipient_addr)) unicode_parsed_bcc_recipients = [] for recipient_name, recipient_addr in parsed_bcc_recipients: - recipient_name = str(Header(unicode(recipient_name), header_charset)) + recipient_name = str(Header(str(recipient_name), header_charset)) # Make sure email addresses do not contain non-ASCII characters recipient_addr = recipient_addr.encode('ascii') unicode_parsed_bcc_recipients.append((recipient_name, recipient_addr)) @@ -85,25 +95,26 @@ def send_email(sender, cc_recipients, bcc_recipients, subject, body, attachments for recipient_name, recipient_addr in unicode_parsed_cc_recipients]) msg['BCC'] = COMMASPACE.join([formataddr((recipient_name, recipient_addr)) for recipient_name, recipient_addr in unicode_parsed_bcc_recipients]) - msg['Subject'] = Header(unicode(subject), header_charset) + msg['Subject'] = Header(str(subject), header_charset) msg['FROM'] = "no-reply@jpl.nasa.gov" msg.attach(MIMEText(body.encode(body_charset), 'plain', body_charset)) - + # Add attachments - if isinstance(attachments, types.DictType): + if isinstance(attachments, dict): for fname in attachments: part = MIMEBase('application', "octet-stream") part.set_payload(attachments[fname]) Encoders.encode_base64(part) - part.add_header('Content-Disposition', 'attachment; filename="%s"' % fname) + part.add_header('Content-Disposition', + 'attachment; filename="%s"' % fname) msg.attach(part) - #print "#" * 80 - #print msg.as_string() - + # print "#" * 80 + # print msg.as_string() + # Send the message via SMTP to docker host smtp_url = "smtp://%s:25" % get_container_host_ip() - print "smtp_url : %s",smtp_url + print("smtp_url : %s", smtp_url) smtp = SMTP(get_container_host_ip()) smtp.sendmail(sender, recipients, msg.as_string()) smtp.quit() @@ -124,20 +135,25 @@ def get_source(es_url, query_idx, objectid): } } } - print 'get_source debug:', '%s/%s/_search',es_url," ", query_idx,' ',json.dumps(query) - r = requests.post('%s/%s/_search' % (es_url, query_idx), data=json.dumps(query)) + print('get_source debug:', '%s/%s/_search', es_url, + " ", query_idx, ' ', json.dumps(query)) + r = requests.post('%s/%s/_search' % + (es_url, query_idx), data=json.dumps(query)) r.raise_for_status() result = r.json() - if result['hits']['total'] == 0: return None - else: return result['hits']['hits'][0]['_source'] + if result['hits']['total'] == 0: + return None + else: + return result['hits']['hits'][0]['_source'] def get_cities(src): """Return list of cities.""" cities = [] - for city in src.get('city',[]): - cities.append("%s, %s" % (city.get('name', ''), city.get('admin1_name', ''))) + for city in src.get('city', []): + cities.append("%s, %s" % + (city.get('name', ''), city.get('admin1_name', ''))) return cities @@ -145,10 +161,14 @@ def get_value(d, key): """Return value from source based on key.""" for k in key.split('.'): - if k in d: d = d[k] - else: return None - if isinstance(d, types.ListType): return ', '.join([str(i) for i in d]) - else: return d + if k in d: + d = d[k] + else: + return None + if isinstance(d, list): + return ', '.join([str(i) for i in d]) + else: + return d def get_metadata_snippet(src, snippet_cfg): @@ -157,9 +177,11 @@ def get_metadata_snippet(src, snippet_cfg): body = "" for k, label in snippet_cfg: val = get_value(src, k) - if val is not None: body += "%s: %s\n" % (label, val) + if val is not None: + body += "%s: %s\n" % (label, val) body += "location type: %s\n" % src.get('location', {}).get('type', None) - body += "location coordinates: %s\n" % src.get('location', {}).get('coordinates', []) + body += "location coordinates: %s\n" % src.get( + 'location', {}).get('coordinates', []) cities = get_cities(src) body += "Closest cities: %s" % "\n ".join(cities) return body @@ -169,36 +191,39 @@ def get_facetview_link(facetview_url, objectid, system_version=None): """Return link to objectid in FacetView interface.""" if system_version is None: - b64 = base64.urlsafe_b64encode('{"query":{"query_string":{"query":"_id:%s"}}}' % objectid) + b64 = base64.urlsafe_b64encode( + '{"query":{"query_string":{"query":"_id:%s"}}}' % objectid) else: - b64 = base64.urlsafe_b64encode('{"query":{"query_string":{"query":"_id:%s AND system_version:%s"}}}' % (objectid, system_version)) - if facetview_url.endswith('/'): facetview_url = facetview_url[:-1] + b64 = base64.urlsafe_b64encode( + '{"query":{"query_string":{"query":"_id:%s AND system_version:%s"}}}' % (objectid, system_version)) + if facetview_url.endswith('/'): + facetview_url = facetview_url[:-1] return '%s/?base64=%s' % (facetview_url, b64) - + if __name__ == "__main__": settings_file = os.path.normpath( - os.path.join( - os.path.dirname(os.path.realpath(__file__)), - 'settings.json') - ) + os.path.join( + os.path.dirname(os.path.realpath(__file__)), + 'settings.json') + ) settings = json.load(open(settings_file)) - + objectid = sys.argv[1] url = sys.argv[2] emails = sys.argv[3] rule_name = sys.argv[4] component = sys.argv[5] - if component=="mozart" or component=="figaro": - es_url = app.conf["JOBS_ES_URL"] - query_idx = app.conf["STATUS_ALIAS"] - facetview_url = app.conf["MOZART_URL"] - elif component=="tosca": - es_url = app.conf["GRQ_ES_URL"] - query_idx = app.conf["DATASET_ALIAS"] - #facetview_url = app.conf["TOSCA_URL"] - #updating facetview_url with updated aria-search-beta hostname + if component == "mozart" or component == "figaro": + es_url = app.conf["JOBS_ES_URL"] + query_idx = app.conf["STATUS_ALIAS"] + facetview_url = app.conf["MOZART_URL"] + elif component == "tosca": + es_url = app.conf["GRQ_ES_URL"] + query_idx = app.conf["DATASET_ALIAS"] + #facetview_url = app.conf["TOSCA_URL"] + # updating facetview_url with updated aria-search-beta hostname facetview_url = "https://aria-search-beta.jpl.nasa.gov/search" cc_recipients = [i.strip() for i in emails.split(',')] @@ -211,7 +236,7 @@ def get_facetview_link(facetview_url, objectid, system_version=None): # attach metadata json body += "\n\n%s" % get_metadata_snippet(src, settings['SNIPPET_CFG']) body += "\n\nThe entire metadata json for this product has been attached for your convenience.\n\n" - attachments = { 'metadata.json': json.dumps(src, indent=2) } + attachments = {'metadata.json': json.dumps(src, indent=2)} # attach browse images if len(src['browse_urls']) > 0: @@ -222,14 +247,17 @@ def get_facetview_link(facetview_url, objectid, system_version=None): small_img = i['small_img'] small_img_url = os.path.join(browse_url, small_img) r = requests.get(small_img_url) - if r.status_code != 200: continue + if r.status_code != 200: + continue attachments[small_img] = r.content - else: body += "\n\n" + else: + body += "\n\n" body += "You may access the product here:\n\n%s" % url - facet_url = get_facetview_link(facetview_url, objectid, None if src is None else src.get('system_version', None)) + facet_url = get_facetview_link( + facetview_url, objectid, None if src is None else src.get('system_version', None)) if facet_url is not None: body += "\n\nYou may view this product in FacetView here:\n\n%s" % facet_url body += "\n\nNOTE: You may have to cut and paste the FacetView link into your " body += "browser's address bar to prevent your email client from escaping the curly brackets." - send_email("%s@%s" % (getpass.getuser(), get_hostname()), cc_recipients, + send_email("%s@%s" % (getpass.getuser(), get_hostname()), cc_recipients, bcc_recipients, subject, body, attachments=attachments) diff --git a/purge.py b/purge.py index 6801d6b..2b0b36a 100644 --- a/purge.py +++ b/purge.py @@ -7,11 +7,12 @@ import osaka.main from hysds.celery import app -#TODO: Setup logger for this job here. Should log to STDOUT or STDERR as this is a job +# TODO: Setup logger for this job here. Should log to STDOUT or STDERR as this is a job logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger("hysds") -def purge_products(query,component,operation): + +def purge_products(query, component, operation): ''' Iterator used to iterate across a query result and submit jobs for every hit @param es_url - ElasticSearch URL to hit with query @@ -20,99 +21,102 @@ def purge_products(query,component,operation): @param query - query to post to ElasticSearch and whose result will be iterated, JSON sting enc @param kwargs - key-word args to match to HySDS IO ''' - logger.debug("Doing %s for %s with query: %s",operation,component,query) - - if component=="mozart" or component=="figaro": + logger.debug("Doing %s for %s with query: %s", operation, component, query) + + if component == "mozart" or component == "figaro": es_url = app.conf["JOBS_ES_URL"] es_index = app.conf["STATUS_ALIAS"] facetview_url = app.conf["MOZART_URL"] - elif component=="tosca": + elif component == "tosca": es_url = app.conf["GRQ_ES_URL"] es_index = app.conf["DATASET_ALIAS"] facetview_url = app.conf["GRQ_URL"] - #Querying for products - start_url = "{0}/{1}/_search".format(es_url,es_index) - scroll_url = "{0}/_search".format(es_url,es_index) - - results = hysds_commons.request_utils.post_scrolled_json_responses(start_url,scroll_url,generator=True,data=json.dumps(query),logger=logger) - #print results - - if component=='tosca': - for result in results: - es_type = result["_type"] - ident = result["_id"] - index = result["_index"] - #find the Best URL first - best = None - for url in result["_source"]["urls"]: - if best is None or not url.startswith("http"): - best = url - - #making osaka call to delete product - print 'paramater being passed to osaka.main.rmall: ',best - if best is not None: osaka.main.rmall(best) + # Querying for products + start_url = "{0}/{1}/_search".format(es_url, es_index) + scroll_url = "{0}/_search".format(es_url, es_index) + + results = hysds_commons.request_utils.post_scrolled_json_responses( + start_url, scroll_url, generator=True, data=json.dumps(query), logger=logger) + # print results + + if component == 'tosca': + for result in results: + es_type = result["_type"] + ident = result["_id"] + index = result["_index"] + # find the Best URL first + best = None + for url in result["_source"]["urls"]: + if best is None or not url.startswith("http"): + best = url + + # making osaka call to delete product + print('paramater being passed to osaka.main.rmall: ', best) + if best is not None: + osaka.main.rmall(best) - #removing the metadata - hysds_commons.metadata_rest_utils.remove_metadata(es_url,index,es_type,ident,logger) + # removing the metadata + hysds_commons.metadata_rest_utils.remove_metadata( + es_url, index, es_type, ident, logger) else: - if operation=='purge': - purge = True - else: - purge = False - # purge job from index - + if operation == 'purge': + purge = True + else: + purge = False + # purge job from index + for result in results: uuid = result["_source"]['uuid'] payload_id = result["_source"]['payload_id'] - index = result["_index"] + index = result["_index"] es_type = result['_type'] - #Always grab latest state (not state from query result) + # Always grab latest state (not state from query result) task = app.AsyncResult(uuid) state = task.state - #Active states may only revoke - logger.info("Job state: %s\n",state) - if state in ["RETRY","STARTED"] or (state == "PENDING" and not purge): + # Active states may only revoke + logger.info("Job state: %s\n", state) + if state in ["RETRY", "STARTED"] or (state == "PENDING" and not purge): if not purge: - logger.info('Revoking %s\n',uuid) - app.control.revoke(uuid,terminate=True) + logger.info('Revoking %s\n', uuid) + app.control.revoke(uuid, terminate=True) else: - logger.info( 'Cannot remove active job %s\n',uuid) + logger.info('Cannot remove active job %s\n', uuid) continue elif not purge: - logger.info( 'Cannot stop inactive job: %s\n',uuid) + logger.info('Cannot stop inactive job: %s\n', uuid) continue - #Saftey net to revoke job if in PENDING state + # Saftey net to revoke job if in PENDING state if state == "PENDING": - logger.info( 'Revoking %s\n',uuid) - app.control.revoke(uuid,terminate=True) - + logger.info('Revoking %s\n', uuid) + app.control.revoke(uuid, terminate=True) + # Both associated task and job from ES - logger.info( 'Removing ES for %s:%s',es_type,payload_id) - r = hysds_commons.metadata_rest_utils.remove_metadata(es_url,index,es_type,payload_id,logger) - #r.raise_for_status() #not req - #res = r.json() #not req + logger.info('Removing ES for %s:%s', es_type, payload_id) + r = hysds_commons.metadata_rest_utils.remove_metadata( + es_url, index, es_type, payload_id, logger) + # r.raise_for_status() #not req + # res = r.json() #not req logger.info('done.\n') logger.info('Finished\n') - + if __name__ == "__main__": ''' Main program of purge_products ''' - #encoding to a JSON object + # encoding to a JSON object #decoded_string = sys.argv[1].decode('string_escape') #dec = decoded_string.replace('u""','"') #decoded_inp = dec.replace('""','"') decoded_inp = sys.argv[1] - print decoded_inp + print(decoded_inp) if decoded_inp.startswith('{"query"') or decoded_inp.startswith("{u'query'") or decoded_inp.startswith("{'query'"): - query_obj = json.loads(decoded_inp) + query_obj = json.loads(decoded_inp) else: - query_obj["query"]=json.loads(decoded_inp) + query_obj["query"] = json.loads(decoded_inp) component = sys.argv[2] operation = sys.argv[3] - purge_products(query_obj,component,operation) - + purge_products(query_obj, component, operation) diff --git a/retry.py b/retry.py index 9e3f187..9ee8d05 100644 --- a/retry.py +++ b/retry.py @@ -1,5 +1,9 @@ #!/usr/bin/env python -import sys, json, requests, time, traceback +import sys +import json +import requests +import time +import traceback from random import randint, uniform from datetime import datetime from celery import uuid @@ -12,21 +16,24 @@ def query_ES(job_id): # get the ES_URL es_url = app.conf["JOBS_ES_URL"] index = app.conf["STATUS_ALIAS"] - query_json = {"query":{"bool": {"must": [{"term": {"job.job_info.id": "job_id"}}]}}} + query_json = { + "query": {"bool": {"must": [{"term": {"job.job_info.id": "job_id"}}]}}} query_json["query"]["bool"]["must"][0]["term"]["job.job_info.id"] = job_id - r = requests.post('%s/%s/_search?' % (es_url, index), json.dumps(query_json)) + r = requests.post('%s/%s/_search?' % + (es_url, index), json.dumps(query_json)) return r -def rand_sleep(sleep_min=0.1, sleep_max=1): time.sleep(uniform(sleep_min, sleep_max)) +def rand_sleep(sleep_min=0.1, sleep_max=1): time.sleep( + uniform(sleep_min, sleep_max)) def get_new_job_priority(old_priority, increment_by, new_priority): if increment_by is not None: priority = int(old_priority) + int(increment_by) if priority == 0 or priority == 9: - print("Not applying {} on previous priority of {} as it needs to be in range 0 to 8".format(increment_by, - old_priority)) + print(("Not applying {} on previous priority of {} as it needs to be in range 0 to 8".format(increment_by, + old_priority))) priority = int(old_priority) else: priority = int(new_priority) @@ -37,7 +44,7 @@ def resubmit_jobs(): es_url = app.conf["JOBS_ES_URL"] # random sleep to prevent from getting ElasticSearch errors: # 429 Client Error: Too Many Requests - time.sleep(randint(1,5)) + time.sleep(randint(1, 5)) # can call submit_job # iterate through job ids and query to get the job json @@ -52,23 +59,24 @@ def resubmit_jobs(): new_priority = ctx["new_job_priority"] retry_count_max = ctx['retry_count_max'] - retry_job_ids = ctx['retry_job_id'] if isinstance(ctx['retry_job_id'], list) else [ctx['retry_job_id']] + retry_job_ids = ctx['retry_job_id'] if isinstance( + ctx['retry_job_id'], list) else [ctx['retry_job_id']] for job_id in retry_job_ids: - print("Retrying job: {}".format(job_id)) + print(("Retrying job: {}".format(job_id))) try: # get job json for ES rand_sleep() response = query_ES(job_id) if response.status_code != 200: - print("Failed to query ES. Got status code %d:\n%s" % (response.status_code, json.dumps(response.json(), - indent=2))) + print(("Failed to query ES. Got status code %d:\n%s" % (response.status_code, json.dumps(response.json(), + indent=2)))) response.raise_for_status() resp_json = response.json() job_json = resp_json["hits"]["hits"][0]["_source"]["job"] # don't retry a retry if job_json['type'].startswith('job-lw-mozart-retry'): - print "Cannot retry retry job %s. Skipping" % job_id + print("Cannot retry retry job %s. Skipping" % job_id) continue # check retry_remaining_count @@ -76,8 +84,8 @@ def resubmit_jobs(): if job_json['retry_count'] < retry_count_max: job_json['retry_count'] = int(job_json['retry_count']) + 1 else: - print "For job {}, retry_count now is {}, retry_count_max limit of {} reached. Cannot retry again."\ - .format(job_id, job_json['retry_count'], retry_count_max) + print("For job {}, retry_count now is {}, retry_count_max limit of {} reached. Cannot retry again." + .format(job_id, job_json['retry_count'], retry_count_max)) continue else: job_json['retry_count'] = 1 @@ -90,7 +98,8 @@ def resubmit_jobs(): del job_json['job_info'][i] # set queue time - job_json['job_info']['time_queued'] = datetime.utcnow().isoformat() + 'Z' + job_json['job_info']['time_queued'] = datetime.utcnow( + ).isoformat() + 'Z' # reset priority old_priority = job_json['priority'] @@ -101,10 +110,11 @@ def resubmit_jobs(): rand_sleep() try: app.control.revoke(job_json['job_id'], terminate=True) - print "revoked original job: %s" % job_json['job_id'] - except Exception, e: - print "Got error issuing revoke on job %s: %s" % (job_json['job_id'], traceback.format_exc()) - print "Continuing." + print("revoked original job: %s" % job_json['job_id']) + except Exception as e: + print("Got error issuing revoke on job %s: %s" % + (job_json['job_id'], traceback.format_exc())) + print("Continuing.") # generate celery task id job_json['task_id'] = uuid() @@ -112,12 +122,14 @@ def resubmit_jobs(): # delete old job status rand_sleep() try: - r = requests.delete("%s/%s/job/_query?q=_id:%s" % (es_url, query_idx, job_json['job_id'])) + r = requests.delete("%s/%s/job/_query?q=_id:%s" % + (es_url, query_idx, job_json['job_id'])) r.raise_for_status() - print "deleted original job status: %s" % job_json['job_id'] - except Exception, e: - print "Got error deleting job status %s: %s" % (job_json['job_id'], traceback.format_exc()) - print "Continuing." + print("deleted original job status: %s" % job_json['job_id']) + except Exception as e: + print("Got error deleting job status %s: %s" % + (job_json['job_id'], traceback.format_exc())) + print("Continuing.") # log queued status rand_sleep() @@ -125,7 +137,7 @@ def resubmit_jobs(): 'job_id': job_json['job_id'], 'payload_id': job_json['job_info']['job_payload']['payload_task_id'], 'status': 'job-queued', - 'job': job_json } + 'job': job_json} log_job_status(job_status_json) # submit job @@ -136,14 +148,14 @@ def resubmit_jobs(): priority=job_json['priority'], task_id=job_json['task_id']) except Exception as ex: - print >> sys.stderr, "[ERROR] Exception occured {0}:{1} {2}".format(type(ex) ,ex,traceback.format_exc()) + print("[ERROR] Exception occured {0}:{1} {2}".format( + type(ex), ex, traceback.format_exc()), file=sys.stderr) if __name__ == "__main__": query_idx = app.conf['STATUS_ALIAS'] input_type = sys.argv[1] - if input_type != "worker": + if input_type != "worker": resubmit_jobs() else: - print "Cannot retry a worker." - + print("Cannot retry a worker.") diff --git a/wget.py b/wget.py index 7f2dd4c..e068217 100755 --- a/wget.py +++ b/wget.py @@ -1,17 +1,23 @@ -import json, requests, types, re, getpass, sys, os +import json +import requests +import types +import re +import getpass +import sys +import os from pprint import pformat import logging import tarfile import notify_by_email from hysds.celery import app import boto3 -from urlparse import urlparse +from urllib.parse import urlparse import datetime PRODUCT_TEMPLATE = "product_downloader-{0}-{1}-{2}" -#TODO: Setup logger for this job here. Should log to STDOUT or STDERR as this is a job +# TODO: Setup logger for this job here. Should log to STDOUT or STDERR as this is a job logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger("hysds") @@ -23,16 +29,19 @@ def wget_script(dataset=None): es_url = app.conf["GRQ_ES_URL"] index = app.conf["DATASET_ALIAS"] #facetview_url = app.conf["GRQ_URL"] - print('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index)) - logging.debug('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index)) - print json.dumps(dataset) + print(('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index))) + logging.debug( + '%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index)) + print(json.dumps(dataset)) logging.debug(json.dumps(dataset)) - r = requests.post('%s/%s/_search?search_type=scan&scroll=10m&size=100' % (es_url, index), json.dumps(dataset)) + r = requests.post('%s/%s/_search?search_type=scan&scroll=10m&size=100' % + (es_url, index), json.dumps(dataset)) if r.status_code != 200: - print("Failed to query ES. Got status code %d:\n%s" %(r.status_code, json.dumps(r.json(), indent=2))) - logger.debug("Failed to query ES. Got status code %d:\n%s" % - (r.status_code, json.dumps(r.json(), indent=2))) + print(("Failed to query ES. Got status code %d:\n%s" % + (r.status_code, json.dumps(r.json(), indent=2)))) + logger.debug("Failed to query ES. Got status code %d:\n%s" % + (r.status_code, json.dumps(r.json(), indent=2))) r.raise_for_status() logger.debug("result: %s" % pformat(r.json())) @@ -41,12 +50,15 @@ def wget_script(dataset=None): #size = int(math.ceil(count/10.0)) #print("SIZE : %d" %size) #scroll_id = scan_result['_scroll_id'] - logging.debug('%s/%s/_search?search_type=scan&scroll=10m&size=%s' % (es_url, index, count)) - r = requests.post('%s/%s/_search?search_type=scan&scroll=10m&size=%s' % (es_url, index, count), json.dumps(dataset)) + logging.debug('%s/%s/_search?search_type=scan&scroll=10m&size=%s' % + (es_url, index, count)) + r = requests.post('%s/%s/_search?search_type=scan&scroll=10m&size=%s' % + (es_url, index, count), json.dumps(dataset)) if r.status_code != 200: - print("Failed to query ES. Got status code %d:\n%s" %(r.status_code, json.dumps(r.json(), indent=2))) + print(("Failed to query ES. Got status code %d:\n%s" % + (r.status_code, json.dumps(r.json(), indent=2)))) logger.debug("Failed to query ES. Got status code %d:\n%s" % - (r.status_code, json.dumps(r.json(), indent=2))) + (r.status_code, json.dumps(r.json(), indent=2))) r.raise_for_status() logger.debug("result: %s" % pformat(r.json())) @@ -70,75 +82,82 @@ def stream_wget(scroll_id): wget_cmd_password = wget_cmd + ' --user=$user --password=$password' while True: - r = requests.post('%s/_search/scroll?scroll=10m' % es_url, data=scroll_id) + r = requests.post('%s/_search/scroll?scroll=10m' % + es_url, data=scroll_id) res = r.json() logger.debug("res: %s" % pformat(res)) scroll_id = res['_scroll_id'] - if len(res['hits']['hits']) == 0: break + if len(res['hits']['hits']) == 0: + break # Elastic Search seems like it's returning duplicate urls. Remove duplicates - unique_urls=[] - for hit in res['hits']['hits']: - [unique_urls.append(url) for url in hit['_source']['urls'] if url not in unique_urls and url.startswith("http")] + unique_urls = [] + for hit in res['hits']['hits']: + [unique_urls.append(url) for url in hit['_source']['urls'] + if url not in unique_urls and url.startswith("http")] for url in unique_urls: - logging.debug("urls in unique urls: %s",url) + logging.debug("urls in unique urls: %s", url) if '.s3-website' in url or 'amazonaws.com' in url: - parsed_url = urlparse(url) - cut_dirs = len(parsed_url.path[1:].split('/')) - 1 + parsed_url = urlparse(url) + cut_dirs = len(parsed_url.path[1:].split('/')) - 1 else: + if 's1a_ifg' in url: + cut_dirs = 3 + else: + cut_dirs = 6 + if '.s3-website' in url or 'amazonaws.com' in url: + files = get_s3_files(url) + for file in files: + yield 'echo "downloading %s"\n' % file if 's1a_ifg' in url: - cut_dirs = 3 + yield "%s --cut-dirs=%d %s\n" % (wget_cmd, cut_dirs, file) else: - cut_dirs = 6 - if '.s3-website' in url or 'amazonaws.com' in url: - files = get_s3_files(url) - for file in files: - yield 'echo "downloading %s"\n' % file - if 's1a_ifg' in url: - yield "%s --cut-dirs=%d %s\n" % (wget_cmd, cut_dirs, file) - else: - yield "%s --cut-dirs=%d %s\n" % (wget_cmd, cut_dirs, file) + yield "%s --cut-dirs=%d %s\n" % (wget_cmd, cut_dirs, file) if 'aria2-dav.jpl.nasa.gov' in url: - yield 'echo "downloading %s"\n' % url - yield "%s --cut-dirs=%d %s/\n" % (wget_cmd_password, (cut_dirs+1), url) + yield 'echo "downloading %s"\n' % url + yield "%s --cut-dirs=%d %s/\n" % (wget_cmd_password, (cut_dirs+1), url) if 'aria-csk-dav.jpl.nasa.gov' in url: - yield 'echo "downloading %s"\n' % url - yield "%s --cut-dirs=%d %s/\n" % (wget_cmd_password, (cut_dirs+1), url) + yield 'echo "downloading %s"\n' % url + yield "%s --cut-dirs=%d %s/\n" % (wget_cmd_password, (cut_dirs+1), url) if 'aria-dst-dav.jpl.nasa.gov' in url: - yield 'echo "downloading %s"\n' % url - yield "%s --cut-dirs=%d %s/\n" % (wget_cmd, cut_dirs, url) - break - + yield 'echo "downloading %s"\n' % url + yield "%s --cut-dirs=%d %s/\n" % (wget_cmd, cut_dirs, url) + break + # malarout: interate over each line of stream_wget response, and write to a file which is later attached to the email. - with open('wget_script.sh','w') as f: + with open('wget_script.sh', 'w') as f: for i in stream_wget(scroll_id): - f.write(i) + f.write(i) # for gzip compressed use file extension .tar.gz and modifier "w:gz" - #os.rename('wget_script.sh','wget_script.bash') - #tar = tarfile.open("wget.tar.gz", "w:gz") - #tar.add('wget_script.bash') - #tar.close() + # os.rename('wget_script.sh','wget_script.bash') + #tar = tarfile.open("wget.tar.gz", "w:gz") + # tar.add('wget_script.bash') + # tar.close() def get_s3_files(url): - files = [] - print("Url in the get_s3_files function: %s",url) - parsed_url = urlparse(url) - bucket = parsed_url.hostname.split('.', 1)[0] - client = boto3.client('s3') - results = client.list_objects(Bucket=bucket, Delimiter='/', Prefix=parsed_url.path[1:] + '/') - - if results.get('Contents'): - for result in results.get('Contents'): - files.append(parsed_url.scheme + "://" + parsed_url.hostname + '/' + result.get('Key')) - - if results.get('CommonPrefixes'): - for result in results.get('CommonPrefixes'): - # Prefix values have a trailing '/'. Let's remove it to be consistent with our dir urls - folder = parsed_url.scheme + "://" + parsed_url.hostname + '/' + result.get('Prefix')[:-1] - files.extend(get_s3_files(folder)) - return files + files = [] + print(("Url in the get_s3_files function: %s", url)) + parsed_url = urlparse(url) + bucket = parsed_url.hostname.split('.', 1)[0] + client = boto3.client('s3') + results = client.list_objects( + Bucket=bucket, Delimiter='/', Prefix=parsed_url.path[1:] + '/') + + if results.get('Contents'): + for result in results.get('Contents'): + files.append(parsed_url.scheme + "://" + + parsed_url.hostname + '/' + result.get('Key')) + + if results.get('CommonPrefixes'): + for result in results.get('CommonPrefixes'): + # Prefix values have a trailing '/'. Let's remove it to be consistent with our dir urls + folder = parsed_url.scheme + "://" + \ + parsed_url.hostname + '/' + result.get('Prefix')[:-1] + files.extend(get_s3_files(folder)) + return files + def email(query, emails, rule_name): ''' @@ -158,10 +177,11 @@ def email(query, emails, rule_name): body += "Please rename wget_script.bash to wget_script.sh before running it." if os.path.isfile('wget.tar.gz'): wget_content = open('wget.tar.gz', 'r').read() - attachments = { 'wget.tar.gz':wget_content} + attachments = {'wget.tar.gz': wget_content} notify_by_email.send_email(getpass.getuser(), cc_recipients, bcc_recipients, subject, body, attachments=attachments) + def make_product(rule_name, query): ''' Make a product out of this WGET script @@ -177,20 +197,21 @@ def make_product(rule_name, query): with open("{0}/{0}.dataset.json".format(name), "w") as fp: json.dump({"id": name, "version": "v0.1"}, fp) + if __name__ == "__main__": ''' Main program of wget_script ''' - #encoding to a JSON object - query = {} - query = json.loads(sys.argv[1]) + # encoding to a JSON object + query = {} + query = json.loads(sys.argv[1]) emails = sys.argv[2] rule_name = sys.argv[3] - + # getting the script wget_script(query) - if emails=="unused": - make_product(rule_name, query) + if emails == "unused": + make_product(rule_name, query) else: - # now email the query - email(query, emails, rule_name) + # now email the query + email(query, emails, rule_name)