Skip to content

Commit

Permalink
Fix version issue in metadata cache
Browse files Browse the repository at this point in the history
  • Loading branch information
kartik4949 committed Jan 4, 2025
1 parent 00f3cd6 commit b0b84e4
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 10 deletions.
1 change: 0 additions & 1 deletion plugins/ibis/superduper_ibis/data_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def __init__(self, uri: str, flavour: t.Optional[str] = None):
'vector': 'superduper.components.datatype.NativeVector'
}


def _setup(self, conn):
self.dialect = getattr(conn, "name", "base")
self.db_helper = get_db_helper(self.dialect)
Expand Down
1 change: 1 addition & 0 deletions plugins/sqlalchemy/plugin_test/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
@pytest.fixture
def metadata():
store = SQLAlchemyMetadata(DATABASE_URL)
store._batched = False
yield store
store.drop(force=True)

Expand Down
30 changes: 21 additions & 9 deletions plugins/sqlalchemy/superduper_sqlalchemy/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ def __init__(self):
self._uuid2metadata: t.Dict[str, t.Dict] = {}
self._type_id_identifier2metadata = defaultdict(dict)

def replace_metadata(self, metadata, uuid=None, type_id=None, version=None, identifier=None):
def replace_metadata(
self, metadata, uuid=None, type_id=None, version=None, identifier=None
):
metadata = copy.deepcopy(metadata)
if 'dict' in metadata:
dict_ = metadata['dict']
Expand Down Expand Up @@ -107,7 +109,8 @@ def get_metadata_by_identifier(self, type_id, identifier, version):
metadata = self._type_id_identifier2metadata[(type_id, identifier)]
if not metadata:
return None
version = version or max(metadata.keys())
if version is None:
version = max(metadata.keys())
return metadata[version]

def update_metadata(self, metadata):
Expand Down Expand Up @@ -155,7 +158,7 @@ def __init__(
'parent_child': [],
'component': [],
'_artifact_relations': [],
'job': []
'job': [],
}
self._parent_relation_cache = []
self._batched = True
Expand Down Expand Up @@ -206,7 +209,6 @@ def reconnect(self):
# a reconnect.
self._init_tables()


def _init_tables(self):
# Get the DB config for the given dialect
DBConfig = get_db_config(self.dialect)
Expand Down Expand Up @@ -416,7 +418,10 @@ def component_version_has_parents(
res = self.query_results(self.parent_child_association_table, stmt, session)
return len(res) > 0

def create_component(self, info: t.Dict, ):
def create_component(
self,
info: t.Dict,
):
"""Create a component in the metadata store.
:param info: the information to create the component
Expand Down Expand Up @@ -453,7 +458,11 @@ def delete_parent_child(self, parent_id: str, child_id: str | None = None):
)
session.execute(stmt)

def create_parent_child(self, parent_id: str, child_id: str, ):
def create_parent_child(
self,
parent_id: str,
child_id: str,
):
"""Create a parent-child relationship between two components.
:param parent_id: the parent component
Expand Down Expand Up @@ -668,7 +677,12 @@ def _replace_object(
.values(**info)
)
session.execute(stmt)
self._cache.replace_metadata(type_id=type_id, identifier=identifier, version=version, metadata=info)
self._cache.replace_metadata(
type_id=type_id,
identifier=identifier,
version=version,
metadata=info,
)
else:
with self.session_context() as session:
stmt = (
Expand Down Expand Up @@ -791,8 +805,6 @@ def create_job(self, info: t.Union[t.Dict, t.List[t.Dict]]):
else:
self._insert_flush['job'].append(info)



def get_job(self, job_id: str):
"""Get the job with the given job_id.
Expand Down

0 comments on commit b0b84e4

Please sign in to comment.