Skip to content

Commit

Permalink
[QC-996] policy multiple_per_run can delete first and last (#1921)
Browse files Browse the repository at this point in the history
* [QC-996] policy multiple_per_run can delete first and last

* fix and preserve all versions if there are less than 4

* use a dedicated method to update the metadata. At the moment te QCCDB requires to specify the start of validity behind the scene.
  • Loading branch information
Barthelemy authored Nov 1, 2023
1 parent 2f86036 commit 85f30bc
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 4 deletions.
17 changes: 17 additions & 0 deletions Framework/script/RepoCleaner/qcrepocleaner/Ccdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,24 @@ def updateValidity(self, version: ObjectVersion, valid_from: int, valid_to: int,
except requests.exceptions.RequestException as e:
logging.error(f"Exception in updateValidity: {traceback.format_exc()}")

@dryable.Dryable()
def updateMetadata(self, version: ObjectVersion, metadata):
logger.debug(f"update metadata : {metadata}")
full_path = self.url + '/' + version.path + '/' + str(version.validFrom) + '/' + str(version.uuid) + '?'
if metadata is not None:
for key in metadata:
full_path += key + "=" + metadata[key] + "&"
if self.set_adjustable_eov:
logger.debug(f"As the parameter force is set, we add metadata adjustableEOV")
full_path += "adjustableEOV=1&"
try:
headers = {'Connection': 'close'}
r = requests.put(full_path, headers=headers)
r.raise_for_status()
except requests.exceptions.RequestException as e:
logging.error(f"Exception in updateMetadata: {traceback.format_exc()}")

@dryable.Dryable()
def putVersion(self, version: ObjectVersion, data):
'''
:param version: An ObjectVersion that describes the data to be uploaded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def get_run(v: ObjectVersion) -> str:
def group_versions(ccdb, object_path, period_pass, versions_buckets_dict: DefaultDict[str, List[ObjectVersion]]):
# Find all the runs and group the versions (by run or by a combination of multiple attributes)
versions = ccdb.getVersionsList(object_path)
logger.debug(f"group_versions: found {len(versions)} versions")
for v in versions:
logger.debug(f"Assigning {v} to a bucket")
run = get_run(v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

logger = logging # default logger


def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_timestamp: int,
extra_params: Dict[str, str]):
'''
Expand All @@ -24,12 +25,22 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
- migrate_to_EOS: Migrate the object to EOS. (default: false)
- interval_between_versions: Period in minutes between the versions we will keep. (default: 90)
- period_pass: Keep 1 version for a combination of run+pass+period if true. (default: false)
- delete_first_last: delete the first and last of the run[+pass+period] before actually applying the rule.
It is implemented like this :
Map of buckets: run[+pass+period] -> list of versions
Go through all objects: Add the object to the corresponding key (run[+pass+period])
Sort the versions in the bucket
Remove the empty run from the map (we ignore objects without a run)
Go through the map: for each run (resp. run+pass+period)
if delete_first_last
Get flag cleaner_2nd from first object (if there)
if cleaner_2nd
continue # we do not want to reprocess the same run twice
flag second with `cleaner_2nd`
delete first and last versions in the bucket
Get SOR (validity of first object)
if SOR < now - delay
do
Expand Down Expand Up @@ -62,6 +73,8 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
logger.debug(f"interval_between_versions : {interval_between_versions}")
migrate_to_EOS = (extra_params.get("migrate_to_EOS", False) is True)
logger.debug(f"migrate_to_EOS : {migrate_to_EOS}")
delete_first_last = (extra_params.get("delete_first_last", False) is True)
logger.debug(f"delete_first_last : {delete_first_last}")

# Find all the runs and group the versions (by run or by a combination of multiple attributes)
policies_utils.group_versions(ccdb, object_path, period_pass, versions_buckets_dict)
Expand All @@ -85,7 +98,28 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
logger.debug(f" not in the allowed period, skip this bucket")
preservation_list.extend(run_versions)
else:
logger.debug(f" not in the grace period")
logger.debug(f" not in the grace period")

if delete_first_last:
logger.debug(f" delete_first_last is set")
run_versions.sort(key=lambda x: x.createdAt)
# Get flag cleaner_2nd from first object (if there)
cleaner_2nd = "cleaner_2nd" in run_versions[0].metadata
if cleaner_2nd or len(run_versions) < 4:
logger.debug(f" first version has flag cleaner_2nd or there are less than 4 version, "
f"we continue to next bucket")
preservation_list.extend(run_versions)
continue
# flag second with `cleaner_2nd`
ccdb.updateMetadata(run_versions[1], {'cleaner_2nd': 'true'})
# delete first and last versions in the bucket
logger.debug(f" delete the first and last versions")
deletion_list.append(run_versions[-1])
ccdb.deleteVersion(run_versions[-1])
del run_versions[-1]
deletion_list.append(run_versions[0])
ccdb.deleteVersion(run_versions[0])
del run_versions[0]

last_preserved: ObjectVersion = None
for v in run_versions:
Expand All @@ -98,7 +132,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
logger.debug(f" --> preserve")
last_preserved = v
if migrate_to_EOS:
ccdb.updateValidity(v, v.validFrom, v.validTo, metadata_for_preservation)
ccdb.updateMetadata(v, metadata_for_preservation)
preservation_list.append(last_preserved)
else: # in between period --> delete
logger.debug(f" --> delete")
Expand Down
5 changes: 3 additions & 2 deletions Framework/script/RepoCleaner/tests/test_MultiplePerRun.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class TestProduction(unittest.TestCase):
one_minute = 60000

def setUp(self):
self.ccdb = Ccdb('http://ccdb-test.cern.ch:8080')
self.ccdb = Ccdb('http://137.138.47.222:8080')
self.extra = {"interval_between_versions": "90", "migrate_to_EOS": False}
self.path = "qc/TST/MO/repo/test"

Expand Down Expand Up @@ -77,7 +77,8 @@ def test_5_runs(self):

# Prepare data
test_path = self.path + "/test_5_runs"
self.prepare_data(test_path, [3, 3, 3, 3, 3], [60, 120, 190, 240, 24*60], 123)
self.prepare_data(test_path, [1*60, 2*60, 3*60+10, 4*60, 5*60],
[60, 120, 190, 240, 24*60], 123)

stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import logging
import time
import unittest
from datetime import timedelta, date, datetime
from typing import List

from qcrepocleaner.Ccdb import Ccdb, ObjectVersion
from qcrepocleaner.rules import multiple_per_run


class TestProduction(unittest.TestCase):
"""
This test pushes data to the CCDB and then run the Rule Production and then check.
It does it for several use cases.
One should truncate /qc/TST/MO/repo/test before running it.
"""

thirty_minutes = 1800000
one_hour = 3600000
in_ten_years = 1975323342000
one_minute = 60000

def setUp(self):
self.ccdb = Ccdb('http://137.138.47.222:8080')
self.extra = {"interval_between_versions": "90", "migrate_to_EOS": False, "delete_first_last": True}
self.path = "qc/TST/MO/repo/test"

def test_1_finished_run(self):
"""
1 run of 2.5 hours finished 22 hours ago.
Expected output: SOR, EOR, 1 in the middle
"""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))

# Prepare data
test_path = self.path + "/test_1_finished_run"
self.prepare_data(test_path, [150], [22*60], 123)
objectsBefore = self.ccdb.getVersionsList(test_path)

stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)
objectsAfter = self.ccdb.getVersionsList(test_path)

self.assertEqual(stats["deleted"], 147)
self.assertEqual(stats["preserved"], 3)
self.assertEqual(stats["updated"], 0)

self.assertEqual(objectsAfter[0].validFrom, objectsBefore[1].validFrom)
self.assertEqual(objectsAfter[2].validFrom, objectsBefore[-2].validFrom)

def test_2_runs(self):
"""
2 runs of 2.5 hours, separated by 3 hours, second finished 20h ago.
Expected output: SOR, EOR, 1 in the middle for the first one, all for the second
"""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))

# Prepare data
test_path = self.path + "/test_2_runs"
self.prepare_data(test_path, [150, 150], [3*60, 20*60], 123)

stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)

self.assertEqual(stats["deleted"], 147)
self.assertEqual(stats["preserved"], 3+150)
self.assertEqual(stats["updated"], 0)

def test_5_runs(self):
"""
1 hour Run - 1h - 2 hours Run - 2h - 3h10 run - 3h10 - 4 hours run - 4 hours - 5 hours run - 5 h
All more than 24 hours
Expected output: 2 + 3 + 4 + 4 + 5
"""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))

# Prepare data
test_path = self.path + "/test_5_runs"
self.prepare_data(test_path, [1*60, 2*60, 3*60+10, 4*60, 5*60],
[60, 120, 190, 240, 24*60], 123)

stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)
self.assertEqual(stats["deleted"], 60+120+190+240+300-18)
self.assertEqual(stats["preserved"], 18)
self.assertEqual(stats["updated"], 0)

# and now re-run it to make sure we preserve the state
stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)

self.assertEqual(stats["deleted"], 0)
self.assertEqual(stats["preserved"], 18)
self.assertEqual(stats["updated"], 0)

def test_run_one_object(self):
"""
A run with a single object
Expected output: keep the object
"""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))

# Prepare data
test_path = self.path + "/test_run_one_object"
self.prepare_data(test_path, [1], [25*60], 123)

stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)

self.assertEqual(stats["deleted"], 0)
self.assertEqual(stats["preserved"], 1)
self.assertEqual(stats["updated"], 0)

def test_run_two_object(self):
"""
A run with 2 objects
Expected output: keep the 2 objects
"""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))

# Prepare data
test_path = self.path + "/test_run_two_object"
self.prepare_data(test_path, [2], [25*60], 123)

stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)

self.assertEqual(stats["deleted"], 0)
self.assertEqual(stats["preserved"], 2)
self.assertEqual(stats["updated"], 0)

def test_3_runs_with_period(self):
"""
3 runs more than 24h in the past but only the middle one starts in the period that is allowed.
Expected output: second run is trimmed, not the other
"""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))

# Prepare data
test_path = self.path + "/test_3_runs_with_period"
self.prepare_data(test_path, [30,30, 30], [120,120,25*60], 123)

current_timestamp = int(time.time() * 1000)
stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=current_timestamp-29*60*60*1000,
to_timestamp=current_timestamp-26*60*60*1000, extra_params=self.extra)

self.assertEqual(stats["deleted"], 28)
self.assertEqual(stats["preserved"], 90-28)
self.assertEqual(stats["updated"], 0)

def test_asdf(self):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))
test_path = self.path + "/asdf"
self.prepare_data(test_path, [70, 70, 70], [6*60, 6*60, 25*60], 55555)

def prepare_data(self, path, run_durations: List[int], time_till_next_run: List[int], first_run_number: int):
"""
Prepare a data set populated with a number of runs.
run_durations contains the duration of each of these runs in minutes
time_till_next_run is the time between two runs in minutes.
The first element of time_till_next_run is used to separate the first two runs.
Both lists must have the same number of elements.
"""

if len(run_durations) != len(time_till_next_run):
logging.error(f"run_durations and time_till_next_run must have the same length")
exit(1)

total_duration = 0
for a, b in zip(run_durations, time_till_next_run):
total_duration += a + b
logging.info(f"Total duration : {total_duration}")

current_timestamp = int(time.time() * 1000)
cursor = current_timestamp - total_duration * 60 * 1000
first_ts = cursor
data = {'part': 'part'}
run = first_run_number

for run_duration, time_till_next in zip(run_durations, time_till_next_run):
metadata = {'RunNumber': str(run)}
logging.debug(f"cursor: {cursor}")
logging.debug(f"time_till_next: {time_till_next}")

for i in range(run_duration):
to_ts = cursor + 24 * 60 * 60 * 1000 # a day
metadata2 = {**metadata, 'Created': str(cursor)}
version_info = ObjectVersion(path=path, validFrom=cursor, validTo=to_ts, metadata=metadata2,
createdAt=cursor)
self.ccdb.putVersion(version=version_info, data=data)
cursor += 1 * 60 * 1000

run += 1
cursor += time_till_next * 60 * 1000

return first_ts


if __name__ == '__main__':
unittest.main()

0 comments on commit 85f30bc

Please sign in to comment.