Skip to content

Commit

Permalink
feat(ingest/gc): add index truncation logic (#10099)
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Apr 2, 2024
1 parent a89e189 commit e0b20e1
Showing 1 changed file with 133 additions and 2 deletions.
135 changes: 133 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import datetime
import logging
import re
import time
from dataclasses import dataclass
from typing import Iterable
from typing import Dict, Iterable

from pydantic import Field

from datahub.configuration.common import ConfigModel
from datahub.configuration.common import ConfigModel, OperationalError
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand All @@ -15,12 +18,30 @@
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit

logger = logging.getLogger(__name__)


class DataHubGcSourceConfig(ConfigModel):
cleanup_expired_tokens: bool = Field(
default=True,
description="Whether to clean up expired tokens or not",
)
truncate_indices: bool = Field(
default=True,
description="Whether to truncate elasticsearch indices or not which can be safely truncated",
)
truncate_index_older_than_days: int = Field(
default=30,
description="Indices older than this number of days will be truncated",
)
truncation_watch_until: int = Field(
default=10000,
description="Wait for truncation of indices until this number of documents are left",
)
truncation_sleep_between_seconds: int = Field(
default=30,
description="Sleep between truncation monitoring.",
)


@dataclass
Expand Down Expand Up @@ -51,8 +72,118 @@ def get_workunits_internal(
) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens:
self.revoke_expired_tokens()
if self.config.truncate_indices:
self.truncate_indices()
yield from []

def truncate_indices(self) -> None:
self._truncate_timeseries_helper(aspect_name="operation", entity_type="dataset")
self._truncate_timeseries_helper(
aspect_name="datasetusagestatistics", entity_type="dataset"
)
self._truncate_timeseries_helper(
aspect_name="chartUsageStatistics", entity_type="chart"
)
self._truncate_timeseries_helper(
aspect_name="dashboardUsageStatistics", entity_type="dashboard"
)

def _truncate_timeseries_helper(self, aspect_name: str, entity_type: str) -> None:
self._truncate_timeseries_with_watch_optional(
aspect_name=aspect_name, entity_type=entity_type, watch=False
)
self._truncate_timeseries_with_watch_optional(
aspect_name=aspect_name, entity_type=entity_type, watch=True
)

def _truncate_timeseries_with_watch_optional(
self, aspect_name: str, entity_type: str, watch: bool
) -> None:
graph = self.graph
assert graph is not None
if watch:
to_delete = 1
while to_delete > 0:
response = self.truncate_timeseries_util(
aspect=aspect_name,
dry_run=watch,
days_ago=self.config.truncate_index_older_than_days,
entity_type=entity_type,
)
val = response.get("value", "")
if "This was a dry run" not in val or "out of" not in val:
return
prev_to_delete = to_delete
to_delete, total = re.findall(r"\d+", val)[:2]
to_delete = int(to_delete)
if to_delete <= 0:
logger.info("Nothing to delete.")
return
logger.info(f"to_delete {to_delete} / {total}")
if to_delete == prev_to_delete:
logger.info("Seems to be stuck. Ending the loop.")
break
elif to_delete < self.config.truncation_watch_until:
logger.info("Too small truncation. Not going to watch.")
return
else:
time.sleep(self.config.truncation_sleep_between_seconds)
else:
self.truncate_timeseries_util(
aspect=aspect_name,
dry_run=watch,
days_ago=self.config.truncate_index_older_than_days,
entity_type=entity_type,
)

def x_days_ago_millis(self, days: int) -> int:
x_days_ago_datetime = datetime.datetime.now(
datetime.timezone.utc
) - datetime.timedelta(days=days)
return int(x_days_ago_datetime.timestamp() * 1000)

def truncate_timeseries_util(
self,
aspect: str,
days_ago: int,
dry_run: bool = True,
entity_type: str = "dataset",
) -> Dict:
graph = self.graph
assert graph is not None

gms_url = graph._gms_server
if not dry_run:
logger.info(
f"Going to truncate timeseries for {aspect} for {gms_url} older than {days_ago} days"
)
days_ago_millis = self.x_days_ago_millis(days_ago)
url = f"{gms_url}/operations?action=truncateTimeseriesAspect"
try:
response = graph._post_generic(
url=url,
payload_dict={
"entityType": entity_type,
"aspect": aspect,
"endTimeMillis": days_ago_millis,
"dryRun": dry_run,
},
)
# logger.info(f"Response: {response}")
except OperationalError:
response = graph._post_generic(
url=url,
payload_dict={
"entityType": entity_type,
"aspect": aspect,
"endTimeMillis": days_ago_millis,
"dryRun": dry_run,
"forceDeleteByQuery": True,
},
)
# logger.info(f"Response: {response}")
return response

def revoke_expired_tokens(self) -> None:
total = 1
while total > 0:
Expand Down

0 comments on commit e0b20e1

Please sign in to comment.