Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-48288: Do not cache collection summaries by default in queries #1138

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 24 additions & 21 deletions python/lsst/daf/butler/_query_all_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,30 +113,33 @@
raise InvalidQueryError("Can not use wildcards in collections when find_first=True")

dataset_type_query = list(ensure_iterable(args.name))
dataset_type_collections = _filter_collections_and_dataset_types(
butler, args.collections, dataset_type_query
)

limit = args.limit
for dt, filtered_collections in sorted(dataset_type_collections.items()):
_LOG.debug("Querying dataset type %s", dt)
results = (
query.datasets(dt, filtered_collections, find_first=args.find_first)
.where(args.data_id, args.where, args.kwargs, bind=args.bind)
.limit(limit)
with butler.registry.caching_context():
dataset_type_collections = _filter_collections_and_dataset_types(
butler, args.collections, dataset_type_query
)
if args.with_dimension_records:
results = results.with_dimension_records()

for page in results._iter_pages():
if limit is not None:
# Track how much of the limit has been used up by each query.
limit -= len(page)

yield DatasetsPage(dataset_type=dt, data=page)

if limit is not None and limit <= 0:
break
limit = args.limit
for dt, filtered_collections in sorted(dataset_type_collections.items()):
_LOG.debug("Querying dataset type %s", dt)
results = (
query.datasets(dt, filtered_collections, find_first=args.find_first)
.where(args.data_id, args.where, args.kwargs, bind=args.bind)
.limit(limit)
)
if args.with_dimension_records:
results = results.with_dimension_records()

Check warning on line 131 in python/lsst/daf/butler/_query_all_datasets.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/_query_all_datasets.py#L131

Added line #L131 was not covered by tests

for page in results._iter_pages():
if limit is not None:
# Track how much of the limit has been used up by each
# query.
limit -= len(page)

yield DatasetsPage(dataset_type=dt, data=page)

if limit is not None and limit <= 0:
break


def _filter_collections_and_dataset_types(
Expand Down
25 changes: 13 additions & 12 deletions python/lsst/daf/butler/direct_butler/_direct_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1426,18 +1426,19 @@
names = list(names)
refs: list[DatasetRef] = []
all_dataset_types = [dt.name for dt in self._registry.queryDatasetTypes(...)]
for name in names:
collectionType = self._registry.getCollectionType(name)
if collectionType is not CollectionType.RUN:
raise TypeError(f"The collection type of '{name}' is {collectionType.name}, not RUN.")
with self.query() as query:
# Work out the dataset types that are relevant.
collections_info = self.collections.query_info(name, include_summary=True)
filtered_dataset_types = self.collections._filter_dataset_types(
all_dataset_types, collections_info
)
for dt in filtered_dataset_types:
refs.extend(query.datasets(dt, collections=name))
with self._caching_context():
for name in names:
collectionType = self._registry.getCollectionType(name)
if collectionType is not CollectionType.RUN:
raise TypeError(f"The collection type of '{name}' is {collectionType.name}, not RUN.")

Check warning on line 1433 in python/lsst/daf/butler/direct_butler/_direct_butler.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/direct_butler/_direct_butler.py#L1433

Added line #L1433 was not covered by tests
with self.query() as query:
# Work out the dataset types that are relevant.
collections_info = self.collections.query_info(name, include_summary=True)
filtered_dataset_types = self.collections._filter_dataset_types(
all_dataset_types, collections_info
)
for dt in filtered_dataset_types:
refs.extend(query.datasets(dt, collections=name))
with self._datastore.transaction(), self._registry.transaction():
if unstore:
self._datastore.trash(refs)
Expand Down
86 changes: 57 additions & 29 deletions python/lsst/daf/butler/registry/_caching_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@

from __future__ import annotations

from collections.abc import Callable, Iterator
from contextlib import AbstractContextManager, contextmanager
from typing import Generic, TypeVar

__all__ = ["CachingContext"]

from ._collection_record_cache import CollectionRecordCache
Expand All @@ -48,46 +52,70 @@ class is passed to the relevant managers that can use it to query or
"""

def __init__(self) -> None:
self._collection_records: CollectionRecordCache | None = None
self._collection_summaries: CollectionSummaryCache | None = None
self._depth = 0
self._collection_records = _CacheToggle(CollectionRecordCache)
self._collection_summaries = _CacheToggle(CollectionSummaryCache)

@property
def is_enabled(self) -> bool:
return self._collection_records is not None
def enable_collection_record_cache(self) -> AbstractContextManager[None]:
"""Enable the collection record cache.

def _enable(self) -> None:
"""Enable caches.

For use only by RegistryManagerInstances, which is the single point
of entry for enabling and disabling the caches.
Notes
-----
When this cache is enabled, any changes made by other processes to
collections in the database may not be visible.
"""
if self._depth == 0:
self._collection_records = CollectionRecordCache()
self._collection_summaries = CollectionSummaryCache()
self._depth += 1
return self._collection_records.enable()

def _disable(self) -> None:
"""Disable caches.
def enable_collection_summary_cache(self) -> AbstractContextManager[None]:
"""Enable the collection summary cache.

For use only by RegistryManagerInstances, which is the single point
of entry for enabling and disabling the caches.
Notes
-----
When this cache is enabled, changes made by other processes to
collections in the database may not be visible.

When the collection summary cache is enabled, the performance of
database lookups for summaries changes. Summaries will be aggressively
fetched for all dataset types in the collections, which can cause
significantly more rows to be returned than when the cache is disabled.
This should only be enabled when you know that you will be doing many
summary lookups for the same collections.
"""
if self._depth == 1:
self._collection_records = None
self._collection_summaries = None
self._depth = 0
elif self._depth > 1:
self._depth -= 1
else:
raise AssertionError("Bad caching context management detected.")
return self._collection_summaries.enable()

@property
def collection_records(self) -> CollectionRecordCache | None:
"""Cache for collection records (`CollectionRecordCache`)."""
return self._collection_records
return self._collection_records.cache

@property
def collection_summaries(self) -> CollectionSummaryCache | None:
"""Cache for collection summary records (`CollectionSummaryCache`)."""
return self._collection_summaries
return self._collection_summaries.cache


_T = TypeVar("_T")


class _CacheToggle(Generic[_T]):
"""Utility class to track nested enable/disable calls for a cache."""

def __init__(self, enable_function: Callable[[], _T]):
self.cache: _T | None = None
self._enable_function = enable_function
self._depth = 0

@contextmanager
def enable(self) -> Iterator[None]:
"""Context manager to enable the cache. This context may be nested any
number of times, and the cache will only be disabled once all callers
have exited the context manager.
"""
self._depth += 1
try:
if self._depth == 1:
self.cache = self._enable_function()
yield
finally:
self._depth -= 1
if self._depth == 0:
self.cache = None
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/registry/collections/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ def _modify_collection_chain(
skip_caching_check: bool = False,
skip_cycle_check: bool = False,
) -> Iterator[_CollectionChainModificationContext[K]]:
if (not skip_caching_check) and self._caching_context.is_enabled:
if (not skip_caching_check) and self._caching_context.collection_records is not None:
# Avoid having cache-maintenance code around that is unlikely to
# ever be used.
raise RuntimeError("Chained collection modification not permitted with active caching context.")
Expand Down
8 changes: 4 additions & 4 deletions python/lsst/daf/butler/registry/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ def caching_context_manager(self) -> Iterator[None]:
may even be closed out of order, with only the context manager entered
and the last context manager exited having any effect.
"""
self.caching_context._enable()
try:
with (
self.caching_context.enable_collection_record_cache(),
self.caching_context.enable_collection_summary_cache(),
):
yield
finally:
self.caching_context._disable()

@classmethod
def initialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def context(self) -> SqlQueryContext:

def get_collection_name(self, key: Any) -> str:
assert (
self._managers.caching_context.is_enabled
self._managers.caching_context.collection_records is not None
), "Collection-record caching should already been enabled any time this is called."
return self._managers.collections[key].name

Expand Down
4 changes: 3 additions & 1 deletion python/lsst/daf/butler/registry/sql_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2332,7 +2332,9 @@ def _query_driver(
default_data_id: DataCoordinate,
) -> Iterator[DirectQueryDriver]:
"""Set up a `QueryDriver` instance for query execution."""
with self.caching_context():
# Query internals do repeated lookups of the same collections, so it
# benefits from the collection record cache.
with self._managers.caching_context.enable_collection_record_cache():
driver = DirectQueryDriver(
self._db,
self.dimensions,
Expand Down
51 changes: 28 additions & 23 deletions python/lsst/daf/butler/script/removeRuns.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,34 @@ def _getCollectionInfo(
The dataset types and and how many will be removed.
"""
butler = Butler.from_config(repo)
try:
collections = butler.collections.query_info(
collection, CollectionType.RUN, include_chains=False, include_parents=True, include_summary=True
)
except MissingCollectionError:
# Act as if no collections matched.
collections = []
dataset_types = [dt.name for dt in butler.registry.queryDatasetTypes(...)]
dataset_types = list(butler.collections._filter_dataset_types(dataset_types, collections))

runs = []
datasets: dict[str, int] = defaultdict(int)
for collection_info in collections:
assert collection_info.type == CollectionType.RUN and collection_info.parents is not None
runs.append(RemoveRun(collection_info.name, list(collection_info.parents)))
with butler.query() as query:
for dt in dataset_types:
results = query.datasets(dt, collections=collection_info.name)
count = results.count(exact=False)
if count:
datasets[dt] += count

return runs, {k: datasets[k] for k in sorted(datasets.keys())}
with butler.registry.caching_context():
try:
collections = butler.collections.query_info(
collection,
CollectionType.RUN,
include_chains=False,
include_parents=True,
include_summary=True,
)
except MissingCollectionError:
# Act as if no collections matched.
collections = []
dataset_types = [dt.name for dt in butler.registry.queryDatasetTypes(...)]
dataset_types = list(butler.collections._filter_dataset_types(dataset_types, collections))

runs = []
datasets: dict[str, int] = defaultdict(int)
for collection_info in collections:
assert collection_info.type == CollectionType.RUN and collection_info.parents is not None
runs.append(RemoveRun(collection_info.name, list(collection_info.parents)))
with butler.query() as query:
for dt in dataset_types:
results = query.datasets(dt, collections=collection_info.name)
count = results.count(exact=False)
if count:
datasets[dt] += count

return runs, {k: datasets[k] for k in sorted(datasets.keys())}


def removeRuns(
Expand Down
Loading