From 85f30bc2b7f0e6ab86ee72b656b1aac0de9d4e34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Barth=C3=A9l=C3=A9my=20von=20Haller?= Date: Wed, 1 Nov 2023 13:58:21 +0100 Subject: [PATCH] [QC-996] policy multiple_per_run can delete first and last (#1921) * [QC-996] policy multiple_per_run can delete first and last * fix and preserve all versions if there are less than 4 * use a dedicated method to update the metadata. At the moment te QCCDB requires to specify the start of validity behind the scene. --- .../script/RepoCleaner/qcrepocleaner/Ccdb.py | 17 ++ .../qcrepocleaner/policies_utils.py | 1 + .../qcrepocleaner/rules/multiple_per_run.py | 38 +++- .../RepoCleaner/tests/test_MultiplePerRun.py | 5 +- .../test_MultiplePerRun_deleteFirstLast.py | 214 ++++++++++++++++++ 5 files changed, 271 insertions(+), 4 deletions(-) create mode 100644 Framework/script/RepoCleaner/tests/test_MultiplePerRun_deleteFirstLast.py diff --git a/Framework/script/RepoCleaner/qcrepocleaner/Ccdb.py b/Framework/script/RepoCleaner/qcrepocleaner/Ccdb.py index d4a55be463..86001b308a 100644 --- a/Framework/script/RepoCleaner/qcrepocleaner/Ccdb.py +++ b/Framework/script/RepoCleaner/qcrepocleaner/Ccdb.py @@ -200,7 +200,24 @@ def updateValidity(self, version: ObjectVersion, valid_from: int, valid_to: int, except requests.exceptions.RequestException as e: logging.error(f"Exception in updateValidity: {traceback.format_exc()}") + @dryable.Dryable() + def updateMetadata(self, version: ObjectVersion, metadata): + logger.debug(f"update metadata : {metadata}") + full_path = self.url + '/' + version.path + '/' + str(version.validFrom) + '/' + str(version.uuid) + '?' + if metadata is not None: + for key in metadata: + full_path += key + "=" + metadata[key] + "&" + if self.set_adjustable_eov: + logger.debug(f"As the parameter force is set, we add metadata adjustableEOV") + full_path += "adjustableEOV=1&" + try: + headers = {'Connection': 'close'} + r = requests.put(full_path, headers=headers) + r.raise_for_status() + except requests.exceptions.RequestException as e: + logging.error(f"Exception in updateMetadata: {traceback.format_exc()}") + @dryable.Dryable() def putVersion(self, version: ObjectVersion, data): ''' :param version: An ObjectVersion that describes the data to be uploaded. diff --git a/Framework/script/RepoCleaner/qcrepocleaner/policies_utils.py b/Framework/script/RepoCleaner/qcrepocleaner/policies_utils.py index 29026e19a9..5a7ff0b09e 100644 --- a/Framework/script/RepoCleaner/qcrepocleaner/policies_utils.py +++ b/Framework/script/RepoCleaner/qcrepocleaner/policies_utils.py @@ -24,6 +24,7 @@ def get_run(v: ObjectVersion) -> str: def group_versions(ccdb, object_path, period_pass, versions_buckets_dict: DefaultDict[str, List[ObjectVersion]]): # Find all the runs and group the versions (by run or by a combination of multiple attributes) versions = ccdb.getVersionsList(object_path) + logger.debug(f"group_versions: found {len(versions)} versions") for v in versions: logger.debug(f"Assigning {v} to a bucket") run = get_run(v) diff --git a/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py b/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py index a9e9c4af5a..0915915a37 100644 --- a/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py +++ b/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py @@ -11,6 +11,7 @@ logger = logging # default logger + def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_timestamp: int, extra_params: Dict[str, str]): ''' @@ -24,12 +25,22 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t - migrate_to_EOS: Migrate the object to EOS. (default: false) - interval_between_versions: Period in minutes between the versions we will keep. (default: 90) - period_pass: Keep 1 version for a combination of run+pass+period if true. (default: false) + - delete_first_last: delete the first and last of the run[+pass+period] before actually applying the rule. It is implemented like this : Map of buckets: run[+pass+period] -> list of versions Go through all objects: Add the object to the corresponding key (run[+pass+period]) + Sort the versions in the bucket Remove the empty run from the map (we ignore objects without a run) Go through the map: for each run (resp. run+pass+period) + + if delete_first_last + Get flag cleaner_2nd from first object (if there) + if cleaner_2nd + continue # we do not want to reprocess the same run twice + flag second with `cleaner_2nd` + delete first and last versions in the bucket + Get SOR (validity of first object) if SOR < now - delay do @@ -62,6 +73,8 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t logger.debug(f"interval_between_versions : {interval_between_versions}") migrate_to_EOS = (extra_params.get("migrate_to_EOS", False) is True) logger.debug(f"migrate_to_EOS : {migrate_to_EOS}") + delete_first_last = (extra_params.get("delete_first_last", False) is True) + logger.debug(f"delete_first_last : {delete_first_last}") # Find all the runs and group the versions (by run or by a combination of multiple attributes) policies_utils.group_versions(ccdb, object_path, period_pass, versions_buckets_dict) @@ -85,7 +98,28 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t logger.debug(f" not in the allowed period, skip this bucket") preservation_list.extend(run_versions) else: - logger.debug(f" not in the grace period") + logger.debug(f" not in the grace period") + + if delete_first_last: + logger.debug(f" delete_first_last is set") + run_versions.sort(key=lambda x: x.createdAt) + # Get flag cleaner_2nd from first object (if there) + cleaner_2nd = "cleaner_2nd" in run_versions[0].metadata + if cleaner_2nd or len(run_versions) < 4: + logger.debug(f" first version has flag cleaner_2nd or there are less than 4 version, " + f"we continue to next bucket") + preservation_list.extend(run_versions) + continue + # flag second with `cleaner_2nd` + ccdb.updateMetadata(run_versions[1], {'cleaner_2nd': 'true'}) + # delete first and last versions in the bucket + logger.debug(f" delete the first and last versions") + deletion_list.append(run_versions[-1]) + ccdb.deleteVersion(run_versions[-1]) + del run_versions[-1] + deletion_list.append(run_versions[0]) + ccdb.deleteVersion(run_versions[0]) + del run_versions[0] last_preserved: ObjectVersion = None for v in run_versions: @@ -98,7 +132,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t logger.debug(f" --> preserve") last_preserved = v if migrate_to_EOS: - ccdb.updateValidity(v, v.validFrom, v.validTo, metadata_for_preservation) + ccdb.updateMetadata(v, metadata_for_preservation) preservation_list.append(last_preserved) else: # in between period --> delete logger.debug(f" --> delete") diff --git a/Framework/script/RepoCleaner/tests/test_MultiplePerRun.py b/Framework/script/RepoCleaner/tests/test_MultiplePerRun.py index f39acb7e43..8a3f53ce0f 100644 --- a/Framework/script/RepoCleaner/tests/test_MultiplePerRun.py +++ b/Framework/script/RepoCleaner/tests/test_MultiplePerRun.py @@ -21,7 +21,7 @@ class TestProduction(unittest.TestCase): one_minute = 60000 def setUp(self): - self.ccdb = Ccdb('http://ccdb-test.cern.ch:8080') + self.ccdb = Ccdb('http://137.138.47.222:8080') self.extra = {"interval_between_versions": "90", "migrate_to_EOS": False} self.path = "qc/TST/MO/repo/test" @@ -77,7 +77,8 @@ def test_5_runs(self): # Prepare data test_path = self.path + "/test_5_runs" - self.prepare_data(test_path, [3, 3, 3, 3, 3], [60, 120, 190, 240, 24*60], 123) + self.prepare_data(test_path, [1*60, 2*60, 3*60+10, 4*60, 5*60], + [60, 120, 190, 240, 24*60], 123) stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1, to_timestamp=self.in_ten_years, extra_params=self.extra) diff --git a/Framework/script/RepoCleaner/tests/test_MultiplePerRun_deleteFirstLast.py b/Framework/script/RepoCleaner/tests/test_MultiplePerRun_deleteFirstLast.py new file mode 100644 index 0000000000..d636c1e370 --- /dev/null +++ b/Framework/script/RepoCleaner/tests/test_MultiplePerRun_deleteFirstLast.py @@ -0,0 +1,214 @@ +import logging +import time +import unittest +from datetime import timedelta, date, datetime +from typing import List + +from qcrepocleaner.Ccdb import Ccdb, ObjectVersion +from qcrepocleaner.rules import multiple_per_run + + +class TestProduction(unittest.TestCase): + """ + This test pushes data to the CCDB and then run the Rule Production and then check. + It does it for several use cases. + One should truncate /qc/TST/MO/repo/test before running it. + """ + + thirty_minutes = 1800000 + one_hour = 3600000 + in_ten_years = 1975323342000 + one_minute = 60000 + + def setUp(self): + self.ccdb = Ccdb('http://137.138.47.222:8080') + self.extra = {"interval_between_versions": "90", "migrate_to_EOS": False, "delete_first_last": True} + self.path = "qc/TST/MO/repo/test" + + def test_1_finished_run(self): + """ + 1 run of 2.5 hours finished 22 hours ago. + Expected output: SOR, EOR, 1 in the middle + """ + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d-%b-%y %H:%M:%S') + logging.getLogger().setLevel(int(10)) + + # Prepare data + test_path = self.path + "/test_1_finished_run" + self.prepare_data(test_path, [150], [22*60], 123) + objectsBefore = self.ccdb.getVersionsList(test_path) + + stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1, + to_timestamp=self.in_ten_years, extra_params=self.extra) + objectsAfter = self.ccdb.getVersionsList(test_path) + + self.assertEqual(stats["deleted"], 147) + self.assertEqual(stats["preserved"], 3) + self.assertEqual(stats["updated"], 0) + + self.assertEqual(objectsAfter[0].validFrom, objectsBefore[1].validFrom) + self.assertEqual(objectsAfter[2].validFrom, objectsBefore[-2].validFrom) + + def test_2_runs(self): + """ + 2 runs of 2.5 hours, separated by 3 hours, second finished 20h ago. + Expected output: SOR, EOR, 1 in the middle for the first one, all for the second + """ + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d-%b-%y %H:%M:%S') + logging.getLogger().setLevel(int(10)) + + # Prepare data + test_path = self.path + "/test_2_runs" + self.prepare_data(test_path, [150, 150], [3*60, 20*60], 123) + + stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1, + to_timestamp=self.in_ten_years, extra_params=self.extra) + + self.assertEqual(stats["deleted"], 147) + self.assertEqual(stats["preserved"], 3+150) + self.assertEqual(stats["updated"], 0) + + def test_5_runs(self): + """ + 1 hour Run - 1h - 2 hours Run - 2h - 3h10 run - 3h10 - 4 hours run - 4 hours - 5 hours run - 5 h + All more than 24 hours + Expected output: 2 + 3 + 4 + 4 + 5 + """ + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d-%b-%y %H:%M:%S') + logging.getLogger().setLevel(int(10)) + + # Prepare data + test_path = self.path + "/test_5_runs" + self.prepare_data(test_path, [1*60, 2*60, 3*60+10, 4*60, 5*60], + [60, 120, 190, 240, 24*60], 123) + + stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1, + to_timestamp=self.in_ten_years, extra_params=self.extra) + self.assertEqual(stats["deleted"], 60+120+190+240+300-18) + self.assertEqual(stats["preserved"], 18) + self.assertEqual(stats["updated"], 0) + + # and now re-run it to make sure we preserve the state + stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1, + to_timestamp=self.in_ten_years, extra_params=self.extra) + + self.assertEqual(stats["deleted"], 0) + self.assertEqual(stats["preserved"], 18) + self.assertEqual(stats["updated"], 0) + + def test_run_one_object(self): + """ + A run with a single object + Expected output: keep the object + """ + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d-%b-%y %H:%M:%S') + logging.getLogger().setLevel(int(10)) + + # Prepare data + test_path = self.path + "/test_run_one_object" + self.prepare_data(test_path, [1], [25*60], 123) + + stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1, + to_timestamp=self.in_ten_years, extra_params=self.extra) + + self.assertEqual(stats["deleted"], 0) + self.assertEqual(stats["preserved"], 1) + self.assertEqual(stats["updated"], 0) + + def test_run_two_object(self): + """ + A run with 2 objects + Expected output: keep the 2 objects + """ + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d-%b-%y %H:%M:%S') + logging.getLogger().setLevel(int(10)) + + # Prepare data + test_path = self.path + "/test_run_two_object" + self.prepare_data(test_path, [2], [25*60], 123) + + stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1, + to_timestamp=self.in_ten_years, extra_params=self.extra) + + self.assertEqual(stats["deleted"], 0) + self.assertEqual(stats["preserved"], 2) + self.assertEqual(stats["updated"], 0) + + def test_3_runs_with_period(self): + """ + 3 runs more than 24h in the past but only the middle one starts in the period that is allowed. + Expected output: second run is trimmed, not the other + """ + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d-%b-%y %H:%M:%S') + logging.getLogger().setLevel(int(10)) + + # Prepare data + test_path = self.path + "/test_3_runs_with_period" + self.prepare_data(test_path, [30,30, 30], [120,120,25*60], 123) + + current_timestamp = int(time.time() * 1000) + stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=current_timestamp-29*60*60*1000, + to_timestamp=current_timestamp-26*60*60*1000, extra_params=self.extra) + + self.assertEqual(stats["deleted"], 28) + self.assertEqual(stats["preserved"], 90-28) + self.assertEqual(stats["updated"], 0) + + def test_asdf(self): + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%d-%b-%y %H:%M:%S') + logging.getLogger().setLevel(int(10)) + test_path = self.path + "/asdf" + self.prepare_data(test_path, [70, 70, 70], [6*60, 6*60, 25*60], 55555) + + def prepare_data(self, path, run_durations: List[int], time_till_next_run: List[int], first_run_number: int): + """ + Prepare a data set populated with a number of runs. + run_durations contains the duration of each of these runs in minutes + time_till_next_run is the time between two runs in minutes. + The first element of time_till_next_run is used to separate the first two runs. + Both lists must have the same number of elements. + """ + + if len(run_durations) != len(time_till_next_run): + logging.error(f"run_durations and time_till_next_run must have the same length") + exit(1) + + total_duration = 0 + for a, b in zip(run_durations, time_till_next_run): + total_duration += a + b + logging.info(f"Total duration : {total_duration}") + + current_timestamp = int(time.time() * 1000) + cursor = current_timestamp - total_duration * 60 * 1000 + first_ts = cursor + data = {'part': 'part'} + run = first_run_number + + for run_duration, time_till_next in zip(run_durations, time_till_next_run): + metadata = {'RunNumber': str(run)} + logging.debug(f"cursor: {cursor}") + logging.debug(f"time_till_next: {time_till_next}") + + for i in range(run_duration): + to_ts = cursor + 24 * 60 * 60 * 1000 # a day + metadata2 = {**metadata, 'Created': str(cursor)} + version_info = ObjectVersion(path=path, validFrom=cursor, validTo=to_ts, metadata=metadata2, + createdAt=cursor) + self.ccdb.putVersion(version=version_info, data=data) + cursor += 1 * 60 * 1000 + + run += 1 + cursor += time_till_next * 60 * 1000 + + return first_ts + + +if __name__ == '__main__': + unittest.main()