Skip to content

Commit

Permalink
Add wait in db apply on create events
Browse files Browse the repository at this point in the history
  • Loading branch information
kartik4949 committed Dec 21, 2024
1 parent fade47a commit 17c6648
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Remove `_Encodable` from project
- Connect to Snowflake using the incluster oauth token
- Add postprocess in apibase model.
- Add create events waiting on db apply.

#### New Features & Functionality

Expand Down
33 changes: 33 additions & 0 deletions superduper/base/apply.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import typing as t

import click
Expand All @@ -15,11 +16,14 @@
from superduper.base.datalayer import Datalayer
from superduper.base.event import Job

_WAIT_TIMEOUT = 60


def apply(
db: 'Datalayer',
object: t.Union['Component', t.Sequence[t.Any], t.Any],
force: bool | None = None,
wait: bool = False,
):
"""
Add functionality in the form of components.
Expand All @@ -31,6 +35,8 @@ def apply(
:param object: Object to be stored.
:param force: List of jobs which should execute before component
initialization begins.
:param wait: Blocks execution till create events finish.
:return: Tuple containing the added object(s) and the original object(s).
"""
if force is None:
Expand Down Expand Up @@ -119,9 +125,36 @@ def apply(
):
return object
db.cluster.queue.publish(events=events)
if wait:
unique_create_events = list(create_events.values())
_wait_on_events(db, unique_create_events)
return object


def _wait_on_events(db, events):
remaining = len(events)
time_left = _WAIT_TIMEOUT
while True:
for event in events:
identifier = event.component['identifier']
type_id = event.component['type_id']
version = event.component['version']
try:
db.show(type_id=type_id, identifier=identifier, version=version)
except FileNotFoundError:
pass
else:
remaining -= 1

if remaining <= 0:
return
elif time_left == 0:
raise TimeoutError("Timeout error while waiting for create events.")
else:
time.sleep(1)
time_left -= 1


def _apply(
db: 'Datalayer',
object: 'Component',
Expand Down
32 changes: 1 addition & 31 deletions superduper/base/datalayer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import random
import time
import typing as t
import warnings
from collections import namedtuple
Expand Down Expand Up @@ -37,8 +36,6 @@
PredictResult = t.Union[Document, t.Sequence[Document]]
ExecuteResult = t.Union[SelectResult, DeleteResult, UpdateResult, InsertResult]

_WAIT_TIMEOUT = 60


class Datalayer:
"""
Expand Down Expand Up @@ -456,34 +453,7 @@ def apply(
:param wait: Wait for apply events.
:return: Tuple containing the added object(s) and the original object(s).
"""
return apply.apply(
db=self,
object=object,
force=force,
)

def _wait_on_events(self, events):
remaining = len(events)
time_left = _WAIT_TIMEOUT
while True:
for event in events:
identifier = event.component['identifier']
type_id = event.component['type_id']
version = event.component['version']
try:
self.show(type_id=type_id, identifier=identifier, version=version)
except FileNotFoundError:
pass
else:
remaining -= 1

if remaining <= 0:
return
elif time_left == 0:
raise TimeoutError("Timeout error while waiting for create events.")
else:
time.sleep(1)
time_left -= 1
return apply.apply(db=self, object=object, force=force, wait=wait)

def remove(
self,
Expand Down

0 comments on commit 17c6648

Please sign in to comment.