From 17c6648bcabca493d9730b823e4de32e876cfdc7 Mon Sep 17 00:00:00 2001 From: TheDude Date: Mon, 16 Dec 2024 16:26:44 +0530 Subject: [PATCH] Add wait in db apply on create events --- CHANGELOG.md | 1 + superduper/base/apply.py | 33 +++++++++++++++++++++++++++++++++ superduper/base/datalayer.py | 32 +------------------------------- 3 files changed, 35 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 102aaa8e3..9040a7f01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Remove `_Encodable` from project - Connect to Snowflake using the incluster oauth token - Add postprocess in apibase model. +- Add create events waiting on db apply. #### New Features & Functionality diff --git a/superduper/base/apply.py b/superduper/base/apply.py index 2cc7fd505..c8e7b32d7 100644 --- a/superduper/base/apply.py +++ b/superduper/base/apply.py @@ -1,3 +1,4 @@ +import time import typing as t import click @@ -15,11 +16,14 @@ from superduper.base.datalayer import Datalayer from superduper.base.event import Job +_WAIT_TIMEOUT = 60 + def apply( db: 'Datalayer', object: t.Union['Component', t.Sequence[t.Any], t.Any], force: bool | None = None, + wait: bool = False, ): """ Add functionality in the form of components. @@ -31,6 +35,8 @@ def apply( :param object: Object to be stored. :param force: List of jobs which should execute before component initialization begins. + :param wait: Blocks execution till create events finish. + :return: Tuple containing the added object(s) and the original object(s). """ if force is None: @@ -119,9 +125,36 @@ def apply( ): return object db.cluster.queue.publish(events=events) + if wait: + unique_create_events = list(create_events.values()) + _wait_on_events(db, unique_create_events) return object +def _wait_on_events(db, events): + remaining = len(events) + time_left = _WAIT_TIMEOUT + while True: + for event in events: + identifier = event.component['identifier'] + type_id = event.component['type_id'] + version = event.component['version'] + try: + db.show(type_id=type_id, identifier=identifier, version=version) + except FileNotFoundError: + pass + else: + remaining -= 1 + + if remaining <= 0: + return + elif time_left == 0: + raise TimeoutError("Timeout error while waiting for create events.") + else: + time.sleep(1) + time_left -= 1 + + def _apply( db: 'Datalayer', object: 'Component', diff --git a/superduper/base/datalayer.py b/superduper/base/datalayer.py index dceaff887..35d4e36db 100644 --- a/superduper/base/datalayer.py +++ b/superduper/base/datalayer.py @@ -1,5 +1,4 @@ import random -import time import typing as t import warnings from collections import namedtuple @@ -37,8 +36,6 @@ PredictResult = t.Union[Document, t.Sequence[Document]] ExecuteResult = t.Union[SelectResult, DeleteResult, UpdateResult, InsertResult] -_WAIT_TIMEOUT = 60 - class Datalayer: """ @@ -456,34 +453,7 @@ def apply( :param wait: Wait for apply events. :return: Tuple containing the added object(s) and the original object(s). """ - return apply.apply( - db=self, - object=object, - force=force, - ) - - def _wait_on_events(self, events): - remaining = len(events) - time_left = _WAIT_TIMEOUT - while True: - for event in events: - identifier = event.component['identifier'] - type_id = event.component['type_id'] - version = event.component['version'] - try: - self.show(type_id=type_id, identifier=identifier, version=version) - except FileNotFoundError: - pass - else: - remaining -= 1 - - if remaining <= 0: - return - elif time_left == 0: - raise TimeoutError("Timeout error while waiting for create events.") - else: - time.sleep(1) - time_left -= 1 + return apply.apply(db=self, object=object, force=force, wait=wait) def remove( self,