Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix job status update #2699

Merged
merged 3 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fix the bug in the update diff check that replaces uuids
- Fix snowflake vector search issues.
- Fix crontab drop method.
- Fix job status update.
- Fix a random bug caused by queue thread-safety issues when OSS used crontab.

## [0.4.0](https://github.com/superduper-io/superduper/compare/0.4.0...0.3.0]) (2024-Nov-02)

Expand Down
2 changes: 1 addition & 1 deletion plugins/mongodb/superduper_mongodb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .query import MongoQuery
from .vector_search import MongoAtlasVectorSearcher as VectorSearcher

__version__ = "0.4.4"
__version__ = "0.4.5"

__all__ = [
"ArtifactStore",
Expand Down
5 changes: 5 additions & 0 deletions plugins/mongodb/superduper_mongodb/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,8 @@ def set_component_status(self, uuid, status: Status):
return self.component_collection.update_one(
{'uuid': uuid}, {'$set': {'status': status}}
)

def _get_component_status(self, uuid):
"""Get status of component."""
data = self.component_collection.find_one({'uuid': uuid}, {'status': 1})
return data['status'] if data else None
2 changes: 1 addition & 1 deletion plugins/sqlalchemy/superduper_sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .metadata import SQLAlchemyMetadata as MetaDataStore

__version__ = "0.4.3"
__version__ = "0.4.4"

__all__ = ['MetaDataStore']
17 changes: 14 additions & 3 deletions plugins/sqlalchemy/superduper_sqlalchemy/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,19 @@ def set_component_status(self, uuid, status: Status):
)
session.execute(stmt)

def _get_component_status(self, uuid):
"""Get status of component."""
with self.session_context() as session:
stmt = (
select(self.component_table)
.where(self.component_table.c.uuid == uuid)
.limit(1)
)
res = self.query_results(self.component_table, stmt, session)
if not res:
return None
return res[0]['status']

def _show_cdcs(self, table):
"""Show all triggers in the database.

Expand Down Expand Up @@ -634,9 +647,7 @@ def get_job(self, job_id: str):
"""
with self.session_context() as session:
stmt = (
select(self.job_table)
.where(self.job_table.c.identifier == job_id)
.limit(1)
select(self.job_table).where(self.job_table.c.job_id == job_id).limit(1)
)
res = self.query_results(self.job_table, stmt, session)
return res[0] if res else None
Expand Down
12 changes: 10 additions & 2 deletions superduper/backends/base/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

if t.TYPE_CHECKING:
from superduper.backends.base.query import Select
from superduper.components.component import Status


class NonExistentMetadataError(Exception):
Expand Down Expand Up @@ -158,7 +157,7 @@ def drop(self, force: bool = False):
pass

@abstractmethod
def set_component_status(self, uuid: str, status: 'Status'):
def set_component_status(self, uuid: str, status: str):
"""
Set the status of a component.

Expand All @@ -167,6 +166,15 @@ def set_component_status(self, uuid: str, status: 'Status'):
"""
pass

def get_component_status(self, uuid: str):
"""
Get the status of a component.

:param uuid: ``Component.uuid``
"""
status = self._get_component_status(uuid)
return status

# ------------------ JOBS ------------------

@abstractmethod
Expand Down
3 changes: 2 additions & 1 deletion superduper/backends/base/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def _consume_event_type(event_type, ids, table, db: 'Datalayer'):
context = str(uuid.uuid4())
jobs: t.List[Job] = []
job_lookup: t.DefaultDict = defaultdict(dict)
logging.info(f'Consuming {event_type} events on {table}')

from superduper.components.cdc import build_streaming_graph

Expand Down Expand Up @@ -186,7 +187,7 @@ def _consume_event_type(event_type, ids, table, db: 'Datalayer'):
for job in sub_jobs:
job_lookup[component.uuid][job.method] = job.job_id
jobs += sub_jobs
print(f'Streaming with {component.type_id}:{component.identifier}')
logging.info(f'Streaming with {component.type_id}:{component.identifier}')

for job in jobs:
job.execute(db)
Expand Down
7 changes: 4 additions & 3 deletions superduper/backends/local/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ def submit(self, job: Job) -> str:
"""
args, kwargs = job.get_args_kwargs(self.futures[job.context])

assert job.job_id is not None
component = self.db.load(uuid=job.uuid)
self.db.metadata.update_job(job.identifier, 'status', 'running')
self.db.metadata.update_job(job.job_id, 'status', 'running')

try:
logging.debug(
Expand All @@ -73,10 +74,10 @@ def submit(self, job: Job) -> str:
method = getattr(component, job.method)
output = method(*args, **kwargs)
except Exception as e:
self.db.metadata.update_job(job.identifier, 'status', 'failed')
self.db.metadata.update_job(job.job_id, 'status', 'failed')
raise e

self.db.metadata.update_job(job.identifier, 'status', 'success')
self.db.metadata.update_job(job.job_id, 'status', 'success')
self.futures[job.context][job.job_id] = output
assert job.job_id is not None
return job.job_id
Expand Down
12 changes: 8 additions & 4 deletions superduper/backends/local/queue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
import typing as t

from superduper import logging
Expand Down Expand Up @@ -27,6 +28,7 @@ def __init__(self, uri: t.Optional[str] = None):
super().__init__(uri=uri)
self.consumer = self.build_consumer()
self._component_uuid_mapping: t.Dict = {}
self.lock = threading.Lock()

def list(self):
"""List all components."""
Expand All @@ -46,7 +48,8 @@ def initialize(self):
identifier = component_data['identifier']
r = self.db.show(type_id=type_id, identifier=identifier, version=-1)
if r.get('trigger'):
self.queue[type_id, identifier] = []
with self.lock:
self.queue[type_id, identifier] = []

def _put(self, component):
msg = 'Table name "_apply" collides with Superduper namespace'
Expand Down Expand Up @@ -77,9 +80,10 @@ def publish(self, events: t.List[Event]):

:param events: list of events
"""
for event in events:
self.queue[event.queue].append(event)
self.consumer.consume(db=self.db, queue=self.queue)
with self.lock:
for event in events:
self.queue[event.queue].append(event)
self.consumer.consume(db=self.db, queue=self.queue)


class LocalQueueConsumer(BaseQueueConsumer):
Expand Down
4 changes: 2 additions & 2 deletions superduper/components/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,12 @@ def handle_update_or_same(self, other):
other.uuid = self.uuid
other.version = self.version

def set_db(self, value):
def set_db(self, value: "Datalayer"):
"""Set the db for the component.

:param value: The value to set the db to.
"""
self.db = value
self.db: "Datalayer" = value
for child in self.get_children():
child.set_db(value)

Expand Down
Loading