diff --git a/.github/workflows/smoke-test.yml b/.github/workflows/smoke-test.yml index 53816631..cc16ebeb 100644 --- a/.github/workflows/smoke-test.yml +++ b/.github/workflows/smoke-test.yml @@ -17,7 +17,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ['3.9', '3.10', '3.11'] + python-version: ['3.9', '3.10', '3.11.8'] steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/testing-and-coverage.yml b/.github/workflows/testing-and-coverage.yml index c59acf91..3a99ccd4 100644 --- a/.github/workflows/testing-and-coverage.yml +++ b/.github/workflows/testing-and-coverage.yml @@ -15,7 +15,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ['3.9', '3.10', '3.11'] + python-version: ['3.9', '3.10', '3.11.8'] steps: - uses: actions/checkout@v4 diff --git a/pyproject.toml b/pyproject.toml index cca86e9c..9838f139 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,8 +19,8 @@ dependencies = [ # Includes dask[array,dataframe,distributed,diagnostics]. # dask distributed eases the creation of parallel dask clients. # dask diagnostics is required to spin up the dashboard for profiling. - "dask[complete]>=2024.3.0", # Includes dask expressions. - "hipscat>=0.3", + "dask[complete]<=2024.2.1", + "hipscat>=0.2.8", "pyarrow", "deprecated", "scipy", # kdtree diff --git a/src/lsdb/catalog/dataset/dataset.py b/src/lsdb/catalog/dataset/dataset.py index 34d7dd33..1cd7f9f6 100644 --- a/src/lsdb/catalog/dataset/dataset.py +++ b/src/lsdb/catalog/dataset/dataset.py @@ -11,7 +11,7 @@ class Dataset: def __init__( self, - ddf: dd.DataFrame, + ddf: dd.core.DataFrame, hc_structure: hc.catalog.Dataset, ): """Initialise a Catalog object. diff --git a/src/lsdb/catalog/dataset/healpix_dataset.py b/src/lsdb/catalog/dataset/healpix_dataset.py index afee0294..58c4a1c7 100644 --- a/src/lsdb/catalog/dataset/healpix_dataset.py +++ b/src/lsdb/catalog/dataset/healpix_dataset.py @@ -58,7 +58,7 @@ def __init__( def __getitem__(self, item): result = self._ddf.__getitem__(item) - if isinstance(result, dd.DataFrame): + if isinstance(result, dd.core.DataFrame): return self.__class__(result, self._ddf_pixel_map, self.hc_structure) return result @@ -168,9 +168,6 @@ def _construct_search_ddf( Returns: The catalog pixel map and the respective Dask DataFrame """ - filtered_partitions = ( - filtered_partitions if len(filtered_partitions) > 0 else [delayed(self._ddf._meta)] - ) divisions = get_pixels_divisions(filtered_pixels) search_ddf = dd.from_delayed(filtered_partitions, meta=self._ddf._meta, divisions=divisions) search_ddf = cast(dd.DataFrame, search_ddf) diff --git a/src/lsdb/dask/crossmatch_catalog_data.py b/src/lsdb/dask/crossmatch_catalog_data.py index 68f701a7..b718c252 100644 --- a/src/lsdb/dask/crossmatch_catalog_data.py +++ b/src/lsdb/dask/crossmatch_catalog_data.py @@ -77,7 +77,7 @@ def crossmatch_catalog_data( Type[AbstractCrossmatchAlgorithm] | BuiltInCrossmatchAlgorithm ) = BuiltInCrossmatchAlgorithm.KD_TREE, **kwargs, -) -> Tuple[dd.DataFrame, DaskDFPixelMap, PixelAlignment]: +) -> Tuple[dd.core.DataFrame, DaskDFPixelMap, PixelAlignment]: """Cross-matches the data from two catalogs Args: diff --git a/src/lsdb/dask/join_catalog_data.py b/src/lsdb/dask/join_catalog_data.py index 7bfe9ac4..241e31f5 100644 --- a/src/lsdb/dask/join_catalog_data.py +++ b/src/lsdb/dask/join_catalog_data.py @@ -176,7 +176,7 @@ def perform_join_through( def join_catalog_data_on( left: Catalog, right: Catalog, left_on: str, right_on: str, suffixes: Tuple[str, str] -) -> Tuple[dd.DataFrame, DaskDFPixelMap, PixelAlignment]: +) -> Tuple[dd.core.DataFrame, DaskDFPixelMap, PixelAlignment]: """Joins two catalogs spatially on a specified column Args: @@ -214,7 +214,7 @@ def join_catalog_data_on( def join_catalog_data_through( left: Catalog, right: Catalog, association: AssociationCatalog, suffixes: Tuple[str, str] -) -> Tuple[dd.DataFrame, DaskDFPixelMap, PixelAlignment]: +) -> Tuple[dd.core.DataFrame, DaskDFPixelMap, PixelAlignment]: """Joins two catalogs with an association table Args: diff --git a/src/lsdb/dask/merge_catalog_functions.py b/src/lsdb/dask/merge_catalog_functions.py index 3f8ab0c9..bd828816 100644 --- a/src/lsdb/dask/merge_catalog_functions.py +++ b/src/lsdb/dask/merge_catalog_functions.py @@ -6,7 +6,7 @@ import numpy as np import numpy.typing as npt import pandas as pd -from dask.delayed import Delayed, delayed +from dask.delayed import Delayed from hipscat.catalog import PartitionInfo from hipscat.pixel_math import HealpixPixel from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, healpix_to_hipscat_id @@ -146,7 +146,7 @@ def filter_by_hipscat_index_to_pixel(dataframe: pd.DataFrame, order: int, pixel: def construct_catalog_args( partitions: List[Delayed], meta_df: pd.DataFrame, alignment: PixelAlignment -) -> Tuple[dd.DataFrame, DaskDFPixelMap, PixelAlignment]: +) -> Tuple[dd.core.DataFrame, DaskDFPixelMap, PixelAlignment]: """Constructs the arguments needed to create a catalog from a list of delayed partitions Args: @@ -160,9 +160,9 @@ def construct_catalog_args( """ # generate dask df partition map from alignment partition_map = get_partition_map_from_alignment_pixels(alignment.pixel_mapping) + # create dask df from delayed partitions divisions = get_pixels_divisions(list(partition_map.keys())) - partitions = partitions if len(partitions) > 0 else [delayed(pd.DataFrame([]))] ddf = dd.from_delayed(partitions, meta=meta_df, divisions=divisions) ddf = cast(dd.DataFrame, ddf) return ddf, partition_map, alignment diff --git a/src/lsdb/loaders/dataframe/from_dataframe_utils.py b/src/lsdb/loaders/dataframe/from_dataframe_utils.py index abcdf434..b0135c5d 100644 --- a/src/lsdb/loaders/dataframe/from_dataframe_utils.py +++ b/src/lsdb/loaders/dataframe/from_dataframe_utils.py @@ -23,9 +23,7 @@ def _generate_dask_dataframe( Returns: The catalog's Dask Dataframe and its total number of rows. """ - # Get one partition to find how the df schema - one_partition = pixel_dfs[0] if len(pixel_dfs) > 0 else pd.DataFrame([]) - schema = one_partition.iloc[:0, :].copy() + schema = pixel_dfs[0].iloc[:0, :].copy() if len(pixels) > 0 else [] divisions = get_pixels_divisions(pixels) delayed_dfs = [delayed(df) for df in pixel_dfs] ddf = dd.from_delayed(delayed_dfs, meta=schema, divisions=divisions) diff --git a/src/lsdb/loaders/dataframe/margin_catalog_generator.py b/src/lsdb/loaders/dataframe/margin_catalog_generator.py index 6cbd0493..c2e565cd 100644 --- a/src/lsdb/loaders/dataframe/margin_catalog_generator.py +++ b/src/lsdb/loaders/dataframe/margin_catalog_generator.py @@ -70,27 +70,15 @@ def create_catalog(self) -> MarginCatalog | None: Returns: Margin catalog object, or None if the margin is empty. """ - pixels, partitions = self._get_margins() - if len(pixels) == 0: - return None - ddf, ddf_pixel_map, total_rows = self._generate_dask_df_and_map(pixels, partitions) + ddf, ddf_pixel_map, total_rows = self._generate_dask_df_and_map() margin_pixels = list(ddf_pixel_map.keys()) + if total_rows == 0: + return None margin_catalog_info = self._create_catalog_info(total_rows) margin_structure = hc.catalog.MarginCatalog(margin_catalog_info, margin_pixels) return MarginCatalog(ddf, ddf_pixel_map, margin_structure) - def _get_margins(self): - combined_pixels = ( - self.hc_structure.get_healpix_pixels() + self.hc_structure.generate_negative_tree_pixels() - ) - margin_pairs_df = self._find_margin_pixel_pairs(combined_pixels) - margins_pixel_df = self._create_margins(margin_pairs_df) - pixels, partitions = list(margins_pixel_df.keys()), list(margins_pixel_df.values()) - return pixels, partitions - - def _generate_dask_df_and_map( - self, pixels, partitions - ) -> Tuple[dd.DataFrame, Dict[HealpixPixel, int], int]: + def _generate_dask_df_and_map(self) -> Tuple[dd.DataFrame, Dict[HealpixPixel, int], int]: """Create the Dask Dataframe containing the data points in the margins for the catalog as well as the mapping of those HEALPix to Dataframes @@ -98,11 +86,21 @@ def _generate_dask_df_and_map( Tuple containing the Dask Dataframe, the mapping of margin HEALPix to the respective partitions and the total number of rows. """ + healpix_pixels = self.hc_structure.get_healpix_pixels() + negative_pixels = self.hc_structure.generate_negative_tree_pixels() + combined_pixels = healpix_pixels + negative_pixels + margin_pairs_df = self._find_margin_pixel_pairs(combined_pixels) + + # Compute points for each margin pixels + margins_pixel_df = self._create_margins(margin_pairs_df) + pixels, partitions = list(margins_pixel_df.keys()), list(margins_pixel_df.values()) + # Generate pixel map ordered by _hipscat_index pixel_order = get_pixel_argsort(pixels) ordered_pixels = np.asarray(pixels)[pixel_order] ordered_partitions = [partitions[i] for i in pixel_order] ddf_pixel_map = {pixel: index for index, pixel in enumerate(ordered_pixels)} + # Generate the dask dataframe with the pixels and partitions ddf, total_rows = _generate_dask_dataframe(ordered_partitions, ordered_pixels) return ddf, ddf_pixel_map, total_rows diff --git a/src/lsdb/loaders/hipscat/association_catalog_loader.py b/src/lsdb/loaders/hipscat/association_catalog_loader.py index ee48badd..06790ca5 100644 --- a/src/lsdb/loaders/hipscat/association_catalog_loader.py +++ b/src/lsdb/loaders/hipscat/association_catalog_loader.py @@ -24,5 +24,5 @@ def load_catalog(self) -> AssociationCatalog: def _load_empty_dask_df_and_map(self, hc_catalog): metadata_schema = self._load_parquet_metadata_schema(hc_catalog) dask_meta_schema = metadata_schema.empty_table().to_pandas() - ddf = dd.from_pandas(dask_meta_schema, npartitions=1) + ddf = dd.from_pandas(dask_meta_schema, npartitions=0) return ddf, {} diff --git a/tests/conftest.py b/tests/conftest.py index d9e3a944..c8694d21 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -188,10 +188,7 @@ def small_sky_order1_df(small_sky_order1_dir): @pytest.fixture def small_sky_source_df(test_data_dir): - return pd.read_csv( - os.path.join(test_data_dir, "raw", "small_sky_source", "small_sky_source.csv"), - dtype={"band": "string[pyarrow]"}, - ) + return pd.read_csv(os.path.join(test_data_dir, "raw", "small_sky_source", "small_sky_source.csv")) @pytest.fixture