Skip to content

Commit

Permalink
HC-439: Purge datasets v1.2.0 job time out after new pcm deployment (#26
Browse files Browse the repository at this point in the history
)

* using the bulk API to purge datasets

* removed multiprocessing import

* edited msg details

* pulling only necessary fields in purge

* re-added multiprocessing for s3 deletion only

* debug info, debug is too verbose

* fixed bug, debug ERROR

* debug INFO

* added deleted datasets stats

* re-did some stuff

* re-did some stuff

* re-did some stuff

* removing objects from ES first before deleting from s3

* moving log line to after we delete from es

Co-authored-by: dustinlo <[email protected]>
  • Loading branch information
DustinKLo and dustinlo authored Oct 26, 2022
1 parent b2de775 commit 0947fe0
Showing 1 changed file with 61 additions and 27 deletions.
88 changes: 61 additions & 27 deletions purge.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#!/bin/env python
import json
import logging

import psutil
from multiprocessing import Pool

import osaka.main
from hysds.celery import app
from hysds.es_util import get_mozart_es, get_grq_es
from utils import revoke, create_info_message_files
from multiprocessing import Pool

LOG_FILE_NAME = 'purge.log'
logging.basicConfig(filename=LOG_FILE_NAME, filemode='a', level=logging.DEBUG)
logging.basicConfig(filename=LOG_FILE_NAME, filemode='a', level=logging.INFO)
logger = logging

tosca_es = get_grq_es()
Expand All @@ -22,23 +23,21 @@ def read_context():
return cxt


def delete_dataset(es_result):
ident = es_result["_id"]
index = es_result["_index"]
def delete_from_object_store(es_result):
_id = es_result["_id"]
dataset = es_result["_source"]["dataset"]
# find the Best URL first
best = None

best = None # find the Best URL first
for url in es_result["_source"]["urls"]:
if not url.startswith("http"):
best = url

print('paramater being passed to osaka.main.rmall: ', best) # making osaka call to delete product
if best is not None:
print('paramater being passed to osaka.main.rmall:', best) # making osaka call to delete product
osaka.main.rmall(best)

tosca_es.delete_by_id(index=index, id=ident, ignore=404) # removing the metadata
logger.info('Purged %s' % ident)

logger.info('Purged from object store %s' % _id)
else:
logger.warning("url not found for: %s" % _id)
return dataset


Expand All @@ -55,27 +54,62 @@ def purge_products(query, component, operation):
if component == "mozart" or component == "figaro":
es = get_mozart_es()
es_index = app.conf["STATUS_ALIAS"]
_source = ["uuid", "payload_id"]
else: # "tosca"
es = get_grq_es()
es_index = app.conf["DATASET_ALIAS"]
_source = ["dataset", "urls"]

results = es.query(index=es_index, body=query, _source=_source) # Querying for products

# filter fields returned with the bulk API
filter_path = [
"items.delete.error.reason",
"items.delete._id",
"items.delete._index",
"items.delete.result"
]

results = es.query(index=es_index, body=query) # Querying for products
num_processes = psutil.cpu_count() - 2
p = Pool(processes=num_processes)
if component == 'tosca':
deleted_datasets = dict()
updated_datasets = p.map(delete_dataset, results)
for dataset in updated_datasets:
if dataset in deleted_datasets:
count = deleted_datasets[dataset]
deleted_datasets[dataset] = count + 1
num_processes = psutil.cpu_count() - 2
p = Pool(processes=num_processes)

body = [{
"delete": {"_index": row["_index"], "_id": row["_id"]}
} for row in results]
bulk_res = es.es.bulk(index=es_index, body=body, filter_path=filter_path)
logger.info(json.dumps(bulk_res, indent=2))

logger.info("purging datasets from object store: ")
p.map(delete_from_object_store, results) # deleting objects from storage (s3, etc.)

dataset_purge_stats = {}
deleted_docs_count = 0
failed_deletions = []
for row in bulk_res["items"]:
if row["delete"].get("result", None) == "deleted":
deleted_docs_count += 1
_index = row["delete"]["_index"]
logger.info("deleted from ES: %s" % row["delete"]["_id"])
if _index not in dataset_purge_stats:
dataset_purge_stats[_index] = 1
else:
dataset_purge_stats[_index] += 1
else:
deleted_datasets[dataset] = 1
if len(deleted_datasets) != 0:
msg_details = "Datasets purged by type:\n\n"
for ds in deleted_datasets.keys():
msg_details += "{} - {}\n".format(deleted_datasets[ds], ds)

failed_deletions.append(row["delete"])

if deleted_docs_count or failed_deletions:
msg_details = ""
if deleted_docs_count > 0:
msg_details += "Datasets purged from ES:\n"
for k, v in dataset_purge_stats.items():
msg_details += "{} - {}\n".format(k, v)
if len(failed_deletions) > 0:
msg_details += "\n\n"
msg_details += "Datasets failed to purge from ES:\n"
msg_details += json.dumps(failed_deletions)
logger.warning("datasets failed to delete: ")
logger.warning(json.dumps(failed_deletions))
create_info_message_files(msg_details=msg_details)
else:
purge = True if operation == 'purge' else False # purge job from index
Expand Down

0 comments on commit 0947fe0

Please sign in to comment.