diff --git a/purge.py b/purge.py index 45ac6f1..1748934 100644 --- a/purge.py +++ b/purge.py @@ -1,16 +1,17 @@ #!/bin/env python import json import logging + import psutil +from multiprocessing import Pool import osaka.main from hysds.celery import app from hysds.es_util import get_mozart_es, get_grq_es from utils import revoke, create_info_message_files -from multiprocessing import Pool LOG_FILE_NAME = 'purge.log' -logging.basicConfig(filename=LOG_FILE_NAME, filemode='a', level=logging.DEBUG) +logging.basicConfig(filename=LOG_FILE_NAME, filemode='a', level=logging.INFO) logger = logging tosca_es = get_grq_es() @@ -22,23 +23,21 @@ def read_context(): return cxt -def delete_dataset(es_result): - ident = es_result["_id"] - index = es_result["_index"] +def delete_from_object_store(es_result): + _id = es_result["_id"] dataset = es_result["_source"]["dataset"] - # find the Best URL first - best = None + + best = None # find the Best URL first for url in es_result["_source"]["urls"]: if not url.startswith("http"): best = url - print('paramater being passed to osaka.main.rmall: ', best) # making osaka call to delete product if best is not None: + print('paramater being passed to osaka.main.rmall:', best) # making osaka call to delete product osaka.main.rmall(best) - - tosca_es.delete_by_id(index=index, id=ident, ignore=404) # removing the metadata - logger.info('Purged %s' % ident) - + logger.info('Purged from object store %s' % _id) + else: + logger.warning("url not found for: %s" % _id) return dataset @@ -55,27 +54,62 @@ def purge_products(query, component, operation): if component == "mozart" or component == "figaro": es = get_mozart_es() es_index = app.conf["STATUS_ALIAS"] + _source = ["uuid", "payload_id"] else: # "tosca" es = get_grq_es() es_index = app.conf["DATASET_ALIAS"] + _source = ["dataset", "urls"] + + results = es.query(index=es_index, body=query, _source=_source) # Querying for products + + # filter fields returned with the bulk API + filter_path = [ + "items.delete.error.reason", + "items.delete._id", + "items.delete._index", + "items.delete.result" + ] - results = es.query(index=es_index, body=query) # Querying for products - num_processes = psutil.cpu_count() - 2 - p = Pool(processes=num_processes) if component == 'tosca': - deleted_datasets = dict() - updated_datasets = p.map(delete_dataset, results) - for dataset in updated_datasets: - if dataset in deleted_datasets: - count = deleted_datasets[dataset] - deleted_datasets[dataset] = count + 1 + num_processes = psutil.cpu_count() - 2 + p = Pool(processes=num_processes) + + body = [{ + "delete": {"_index": row["_index"], "_id": row["_id"]} + } for row in results] + bulk_res = es.es.bulk(index=es_index, body=body, filter_path=filter_path) + logger.info(json.dumps(bulk_res, indent=2)) + + logger.info("purging datasets from object store: ") + p.map(delete_from_object_store, results) # deleting objects from storage (s3, etc.) + + dataset_purge_stats = {} + deleted_docs_count = 0 + failed_deletions = [] + for row in bulk_res["items"]: + if row["delete"].get("result", None) == "deleted": + deleted_docs_count += 1 + _index = row["delete"]["_index"] + logger.info("deleted from ES: %s" % row["delete"]["_id"]) + if _index not in dataset_purge_stats: + dataset_purge_stats[_index] = 1 + else: + dataset_purge_stats[_index] += 1 else: - deleted_datasets[dataset] = 1 - if len(deleted_datasets) != 0: - msg_details = "Datasets purged by type:\n\n" - for ds in deleted_datasets.keys(): - msg_details += "{} - {}\n".format(deleted_datasets[ds], ds) - + failed_deletions.append(row["delete"]) + + if deleted_docs_count or failed_deletions: + msg_details = "" + if deleted_docs_count > 0: + msg_details += "Datasets purged from ES:\n" + for k, v in dataset_purge_stats.items(): + msg_details += "{} - {}\n".format(k, v) + if len(failed_deletions) > 0: + msg_details += "\n\n" + msg_details += "Datasets failed to purge from ES:\n" + msg_details += json.dumps(failed_deletions) + logger.warning("datasets failed to delete: ") + logger.warning(json.dumps(failed_deletions)) create_info_message_files(msg_details=msg_details) else: purge = True if operation == 'purge' else False # purge job from index