Skip to content

Commit

Permalink
feat(ingestion): Add execution request cleanup job (datahub-project#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
noggi authored Oct 31, 2024
1 parent 5c0377a commit 94bcb79
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@
keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}}
soft_deleted_entities_cleanup:
retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}}
execution_request_cleanup:
keep_history_min_count: {{execution_request_cleanup.keep_history_min_count}}{{^execution_request_cleanup.keep_history_min_count}}10{{/execution_request_cleanup.keep_history_min_count}}
keep_history_max_count: {{execution_request_cleanup.keep_history_max_count}}{{^execution_request_cleanup.keep_history_max_count}}1000{{/execution_request_cleanup.keep_history_max_count}}
keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}30{{/execution_request_cleanup.keep_history_max_days}}
batch_read_size: {{execution_request_cleanup.batch_read_size}}{{^execution_request_cleanup.batch_read_size}}100{{/execution_request_cleanup.batch_read_size}}
enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}false{{/execution_request_cleanup.enabled}}
extraArgs: {}
debugMode: false
executorId: default
headers: {}
headers: {}
11 changes: 11 additions & 0 deletions metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,14 @@ source:
soft_deleted_entities_cleanup:
# Delete soft deleted entities which were deleted 10 days ago
retention_days: 10
execution_request_cleanup:
# Minimum number of execution requests to keep, per ingestion source
keep_history_min_count: 10
# Maximum number of execution requests to keep, per ingestion source
keep_history_max_count: 1000
# Maximum number of days to keep execution requests for, per ingestion source
keep_history_max_days: 30
# Number of records per read operation
batch_read_size: 100
# Global switch for this cleanup task
enabled: true
25 changes: 24 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
DataProcessCleanupConfig,
DataProcessCleanupReport,
)
from datahub.ingestion.source.gc.execution_request_cleanup import (
DatahubExecutionRequestCleanup,
DatahubExecutionRequestCleanupConfig,
DatahubExecutionRequestCleanupReport,
)
from datahub.ingestion.source.gc.soft_deleted_entity_cleanup import (
SoftDeletedEntitiesCleanup,
SoftDeletedEntitiesCleanupConfig,
Expand Down Expand Up @@ -70,9 +75,18 @@ class DataHubGcSourceConfig(ConfigModel):
description="Configuration for soft deleted entities cleanup",
)

execution_request_cleanup: Optional[DatahubExecutionRequestCleanupConfig] = Field(
default=None,
description="Configuration for execution request cleanup",
)


@dataclass
class DataHubGcSourceReport(DataProcessCleanupReport, SoftDeletedEntitiesReport):
class DataHubGcSourceReport(
DataProcessCleanupReport,
SoftDeletedEntitiesReport,
DatahubExecutionRequestCleanupReport,
):
expired_tokens_revoked: int = 0


Expand All @@ -97,6 +111,7 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig):
self.graph = ctx.require_graph("The DataHubGc source")
self.dataprocess_cleanup: Optional[DataProcessCleanup] = None
self.soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanup] = None
self.execution_request_cleanup: Optional[DatahubExecutionRequestCleanup] = None

if self.config.dataprocess_cleanup:
self.dataprocess_cleanup = DataProcessCleanup(
Expand All @@ -109,6 +124,12 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig):
self.report,
self.config.dry_run,
)
if self.config.execution_request_cleanup:
self.execution_request_cleanup = DatahubExecutionRequestCleanup(
config=self.config.execution_request_cleanup,
graph=self.graph,
report=self.report,
)

@classmethod
def create(cls, config_dict, ctx):
Expand All @@ -130,6 +151,8 @@ def get_workunits_internal(
yield from self.dataprocess_cleanup.get_workunits_internal()
if self.soft_deleted_entities_cleanup:
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
if self.execution_request_cleanup:
self.execution_request_cleanup.run()
yield from []

def truncate_indices(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
import logging
import time
from typing import Any, Dict, Iterator, Optional

from pydantic import BaseModel, Field

from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.graph.client import DataHubGraph

logger = logging.getLogger(__name__)

DATAHUB_EXECUTION_REQUEST_ENTITY_NAME = "dataHubExecutionRequest"
DATAHUB_EXECUTION_REQUEST_KEY_ASPECT_NAME = "dataHubExecutionRequestKey"
DATAHUB_EXECUTION_REQUEST_INPUT_ASPECT_NAME = "dataHubExecutionRequestInput"
DATAHUB_EXECUTION_REQUEST_RESULT_ASPECT_NAME = "dataHubExecutionRequestResult"


class DatahubExecutionRequestCleanupConfig(ConfigModel):
keep_history_min_count: int = Field(
10,
description="Minimum number of execution requests to keep, per ingestion source",
)

keep_history_max_count: int = Field(
1000,
description="Maximum number of execution requests to keep, per ingestion source",
)

keep_history_max_days: int = Field(
30,
description="Maximum number of days to keep execution requests for, per ingestion source",
)

batch_read_size: int = Field(
100,
description="Number of records per read operation",
)

enabled: bool = Field(
True,
description="Global switch for this cleanup task",
)

def keep_history_max_milliseconds(self):
return self.keep_history_max_days * 24 * 3600 * 1000


class DatahubExecutionRequestCleanupReport(SourceReport):
execution_request_cleanup_records_read: int = 0
execution_request_cleanup_records_preserved: int = 0
execution_request_cleanup_records_deleted: int = 0
execution_request_cleanup_read_errors: int = 0
execution_request_cleanup_delete_errors: int = 0


class CleanupRecord(BaseModel):
urn: str
request_id: str
status: str
ingestion_source: str
requested_at: int


class DatahubExecutionRequestCleanup:
def __init__(
self,
graph: DataHubGraph,
report: DatahubExecutionRequestCleanupReport,
config: Optional[DatahubExecutionRequestCleanupConfig] = None,
) -> None:

self.graph = graph
self.report = report
self.instance_id = int(time.time())

if config is not None:
self.config = config
else:
self.config = DatahubExecutionRequestCleanupConfig()

def _to_cleanup_record(self, entry: Dict) -> CleanupRecord:
input_aspect = (
entry.get("aspects", {})
.get(DATAHUB_EXECUTION_REQUEST_INPUT_ASPECT_NAME, {})
.get("value", {})
)
result_aspect = (
entry.get("aspects", {})
.get(DATAHUB_EXECUTION_REQUEST_RESULT_ASPECT_NAME, {})
.get("value", {})
)
key_aspect = (
entry.get("aspects", {})
.get(DATAHUB_EXECUTION_REQUEST_KEY_ASPECT_NAME, {})
.get("value", {})
)
return CleanupRecord(
urn=entry.get("urn"),
request_id=key_aspect.get("id"),
requested_at=input_aspect.get("requestedAt", 0),
status=result_aspect.get("status", "PENDING"),
ingestion_source=input_aspect.get("source", {}).get("ingestionSource", ""),
)

def _scroll_execution_requests(
self, overrides: Dict[str, Any] = {}
) -> Iterator[CleanupRecord]:
headers: Dict[str, Any] = {
"Accept": "application/json",
"Content-Type": "application/json",
}
params = {
"aspectNames": [
DATAHUB_EXECUTION_REQUEST_KEY_ASPECT_NAME,
DATAHUB_EXECUTION_REQUEST_INPUT_ASPECT_NAME,
DATAHUB_EXECUTION_REQUEST_RESULT_ASPECT_NAME,
],
"count": str(self.config.batch_read_size),
"sort": "requestTimeMs",
"sortOrder": "DESCENDING",
"systemMetadata": "false",
"skipCache": "true",
}
params.update(overrides)

while True:
try:
url = f"{self.graph.config.server}/openapi/v2/entity/{DATAHUB_EXECUTION_REQUEST_ENTITY_NAME}"
response = self.graph._session.get(url, headers=headers, params=params)
response.raise_for_status()
document = response.json()

entries = document.get("results", [])
for entry in entries:
yield self._to_cleanup_record(entry)

if "scrollId" not in document:
break
params["scrollId"] = document["scrollId"]
except Exception as e:
logger.error(
f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}"
)
self.report.execution_request_cleanup_read_errors += 1

def _scroll_garbage_records(self):
state: Dict[str, Dict] = {}

now_ms = int(time.time()) * 1000
running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000

for entry in self._scroll_execution_requests():
self.report.execution_request_cleanup_records_read += 1
key = entry.ingestion_source

# Always delete corrupted records
if not key:
logger.warning(
f"ergc({self.instance_id}): will delete corrupted entry with missing source key: {entry}"
)
yield entry
continue

if key not in state:
state[key] = {}
state[key]["cutoffTimestamp"] = (
entry.requested_at - self.config.keep_history_max_milliseconds()
)

state[key]["count"] = state[key].get("count", 0) + 1

# Do not delete if number of requests is below minimum
if state[key]["count"] < self.config.keep_history_min_count:
self.report.execution_request_cleanup_records_preserved += 1
continue

# Do not delete if number of requests do not exceed allowed maximum,
# or the cutoff date.
if (state[key]["count"] < self.config.keep_history_max_count) and (
entry.requested_at > state[key]["cutoffTimestamp"]
):
self.report.execution_request_cleanup_records_preserved += 1
continue

# Do not delete if status is RUNNING or PENDING and created within last month. If the record is >month old and it did not
# transition to a final state within that timeframe, it likely has no value.
if entry.requested_at > running_guard_timeout and entry.status in [
"RUNNING",
"PENDING",
]:
self.report.execution_request_cleanup_records_preserved += 1
continue

# Otherwise delete current record
logger.info(
(
f"ergc({self.instance_id}): going to delete {entry.request_id} in source {key}; "
f"source count: {state[key]['count']}; "
f"source cutoff: {state[key]['cutoffTimestamp']}; "
f"record timestamp: {entry.requested_at}."
)
)
self.report.execution_request_cleanup_records_deleted += 1
yield entry

def _delete_entry(self, entry: CleanupRecord) -> None:
try:
logger.info(
f"ergc({self.instance_id}): going to delete ExecutionRequest {entry.request_id}"
)
self.graph.delete_entity(entry.urn, True)
except Exception as e:
self.report.execution_request_cleanup_delete_errors += 1
logger.error(
f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}"
)

def run(self) -> None:
if not self.config.enabled:
logger.info(
f"ergc({self.instance_id}): ExecutionRequest cleaner is disabled."
)
return

logger.info(
(
f"ergc({self.instance_id}): Starting cleanup of ExecutionRequest records; "
f"max days: {self.config.keep_history_max_days}, "
f"min records: {self.config.keep_history_min_count}, "
f"max records: {self.config.keep_history_max_count}."
)
)

for entry in self._scroll_garbage_records():
self._delete_entry(entry)

logger.info(
f"ergc({self.instance_id}): Finished cleanup of ExecutionRequest records."
)
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ bootstrap:

# Ingestion Recipes
- name: ingestion-datahub-gc
version: v1
version: v2
optional: true
mcps_location: "bootstrap_mcps/ingestion-datahub-gc.yaml"
values_env: "DATAHUB_GC_BOOTSTRAP_VALUES"
values_env: "DATAHUB_GC_BOOTSTRAP_VALUES"
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@
keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}}
soft_deleted_entities_cleanup:
retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}}
extraArgs: {}
execution_request_cleanup:
keep_history_min_count: {{execution_request_cleanup.keep_history_min_count}}{{^execution_request_cleanup.keep_history_min_count}}10{{/execution_request_cleanup.keep_history_min_count}}
keep_history_max_count: {{execution_request_cleanup.keep_history_max_count}}{{^execution_request_cleanup.keep_history_max_count}}1000{{/execution_request_cleanup.keep_history_max_count}}
keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}30{{/execution_request_cleanup.keep_history_max_days}}
batch_read_size: {{execution_request_cleanup.batch_read_size}}{{^execution_request_cleanup.batch_read_size}}100{{/execution_request_cleanup.batch_read_size}}
enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}false{{/execution_request_cleanup.enabled}}
extraArgs: {}
debugMode: false
executorId: default
source:
Expand Down

0 comments on commit 94bcb79

Please sign in to comment.