Skip to content

Commit

Permalink
Refactor basebackends cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kartik4949 committed Jan 2, 2025
1 parent 504f2ad commit 04e77ca
Show file tree
Hide file tree
Showing 15 changed files with 88 additions and 25 deletions.
14 changes: 11 additions & 3 deletions superduper/backends/base/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ def __init__(self):
self._db = None

@abstractmethod
def drop(self):
"""Drop the backend."""
pass
def drop(self, component: t.Optional['Component'] = None):
"""Drop the backend.
:param component: Component to Drop.
"""

@abstractmethod
def list_uuids(self):
Expand Down Expand Up @@ -63,6 +65,12 @@ def put(self, component: 'Component', **kwargs):
del self[component.identifier]
self._put(component, **kwargs)

def drop_component(self, identifier: str):
"""Drop the component from backend.
:param identifier: Component identifier
"""

@property
def db(self) -> 'Datalayer':
"""Get the ``db``."""
Expand Down
7 changes: 5 additions & 2 deletions superduper/backends/local/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ def __delitem__(self, item):
def initialize(self):
"""Initialize the cache."""

def drop(self):
"""Drop the cache."""
def drop(self, component: t.Optional['Component'] = None):
"""Drop the cache.
:param component: Component to drop.
"""
self._cache = {}
self._component_to_uuid = {}

Expand Down
12 changes: 10 additions & 2 deletions superduper/backends/local/cdc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import typing as t

from superduper.backends.base.cdc import CDCBackend
from superduper.components.cdc import CDC

if t.TYPE_CHECKING:
from superduper import Component


class LocalCDCBackend(CDCBackend):
"""Local CDC backend."""
Expand Down Expand Up @@ -44,7 +49,10 @@ def initialize(self):
self.put(self.db.load(type_id=type_id, identifier=identifier))
# TODO consider re-initialzing CDC jobs since potentially failure

def drop(self):
"""Drop the CDC."""
def drop(self, component: t.Optional['Component'] = None):
"""Drop the CDC.
:param component: Component to remove.
"""
self.triggers = set()
self._trigger_uuid_mapping = {}
11 changes: 8 additions & 3 deletions superduper/backends/local/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from superduper.backends.base.compute import ComputeBackend
from superduper.base.event import Job

if t.TYPE_CHECKING:
from superduper import Component


class LocalComputeBackend(ComputeBackend):
"""
Expand Down Expand Up @@ -101,9 +104,11 @@ def initialize(self):
"""Initialize the compute."""
pass

def drop(self):
"""Drop the compute."""
pass
def drop(self, component: t.Optional['Component'] = None):
"""Drop the compute.
:param component: Component to remove.
"""

@property
def tasks(self) -> t.Dict[str, t.Any]:
Expand Down
26 changes: 22 additions & 4 deletions superduper/backends/local/crontab.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import typing as t

from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.background import BackgroundScheduler

from superduper import logging
from superduper.backends.base.crontab import CrontabBackend

if t.TYPE_CHECKING:
from superduper import Component


class LocalCrontabBackend(CrontabBackend):
"""Local crontab backend."""
Expand Down Expand Up @@ -58,10 +63,23 @@ def list_uuids(self):
"""List UUIDs of components."""
return list(self._job_uuids)

def drop(self):
"""Drop the crontab."""
for job_id in self._job_uuids:
self.scheduler.remove_job(job_id)
def drop_component(self, uuid: str):
"""Drop the crontab.
:param uuid: Component uuid to remove.
"""
self.scheduler.remove_job(uuid)

def drop(self, component: t.Optional['Component'] = None):
"""Drop the crontab.
:param component: Component to remove.
"""
if component:
self.scheduler.remove_job(component.uuid)
else:
for job_id in self._job_uuids:
self.scheduler.remove_job(job_id)

def initialize(self):
"""Initialize the crontab."""
Expand Down
16 changes: 11 additions & 5 deletions superduper/backends/local/vector_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
measures,
)

if t.TYPE_CHECKING:
from superduper import Component


class LocalVectorSearchBackend(VectorSearchBackend):
"""Local vector search backend.
Expand Down Expand Up @@ -67,14 +70,17 @@ def __delitem__(self, identifier):
def __getitem__(self, identifier):
return self._cache[identifier]

def drop(self, identifier=None):
"""Drop the vector search."""
def drop(self, component: t.Optional['Component'] = None):
"""Drop the CDC.
:param component: Component to remove.
"""
# TODO: drop actual vector search not the cache
if identifier is None:
if component is None:
self._cache = {}
else:
del self._cache[identifier]
del self._identifier_uuid_map[identifier]
del self._cache[component.identifier]
del self._identifier_uuid_map[component.identifier]


class InMemoryVectorSearcher(BaseVectorSearcher):
Expand Down
7 changes: 7 additions & 0 deletions superduper/base/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ class Create(Event):
def execute(self, db: 'Datalayer'):
"""Execute the create event."""
# TODO decide where to assign version
if (
self.component['type_id'] == 'table'
and self.component['identifier'] == 'IRS'
):
import time

time.sleep(10)
db.metadata.create_component(self.component)
# TODO - do we really need to load the whole component?
component = db.load(uuid=self.component['uuid'])
Expand Down
2 changes: 1 addition & 1 deletion superduper/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def drop(data: bool = False, force: bool = False, data_backend: str | None = Non
:param force: Force the drop.
"""
data_backend = data_backend or CFG.data_backend
db = superduper(data_backend)
db = superduper(data_backend, initialize_cluster=False)
db.drop(force=force, data=data)
db.disconnect()

Expand Down
2 changes: 1 addition & 1 deletion superduper/components/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def _find_refs(r):

def get_children(self, deep: bool = False) -> t.List["Component"]:
"""Get all the children of the component."""
r = self.dict().encode(leaves_to_keep=Component)
r = self.dict().encode(leaves_to_keep=Component) # type: ignore[arg-type]
out = [v for v in r['_builds'].values() if isinstance(v, Component)]
lookup = {}
for v in out:
Expand Down
7 changes: 7 additions & 0 deletions superduper/components/cron_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ def run(self):
"""Run the job."""
raise NotImplementedError

def cleanup(self, db):
"""Cleanup crontab service.
:param db: Database instance.
"""
db.cluster.crontab.drop(self)


class FunctionCronJob(CronJob):
"""
Expand Down
1 change: 1 addition & 0 deletions superduper/components/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,4 @@ def cleanup(self, db: "Datalayer") -> None:
"""
if self.select is not None:
db[self.select.table].drop_outputs(self.predict_id)
db.cluster.cdc.drop(self)
2 changes: 1 addition & 1 deletion superduper/components/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def cleanup(self, db: "Datalayer") -> None:
:param db: Data layer instance to process.
"""
db.cluster.compute.drop_component(self.uuid)
db.cluster.compute.drop(self)

@property
def inputs(self) -> Inputs:
Expand Down
2 changes: 1 addition & 1 deletion superduper/components/vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def cleanup(self, db: Datalayer):
:param db: The datalayer to cleanup
"""
db.cluster.vector_search.drop(self.identifier)
db.cluster.vector_search.drop(self)

@property
def models_keys(self) -> t.Tuple[t.List[str], t.List[ModelInputType]]:
Expand Down
2 changes: 1 addition & 1 deletion superduper/rest/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import sys
import threading
import time
import os
import typing as t
from functools import cached_property
from traceback import format_exc
Expand Down
2 changes: 1 addition & 1 deletion test/unittest/test_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
ALLOWABLE_DEFECTS = {
'cast': 1, # Try to keep this down
'noqa': 3, # This should never change
'type_ignore': 7, # This should only ever increase in obscure edge cases
'type_ignore': 8, # This should only ever increase in obscure edge cases
}


Expand Down

0 comments on commit 04e77ca

Please sign in to comment.