Skip to content

Commit

Permalink
Merge pull request #64 from astronomy-commons/sean/parallelize-catalo…
Browse files Browse the repository at this point in the history
…g-saving

Parallelize `to_hipscat` using dask delayed
  • Loading branch information
smcguire-cmu authored Nov 14, 2023
2 parents d57f770 + 61b8890 commit 49a169b
Showing 1 changed file with 57 additions and 14 deletions.
71 changes: 57 additions & 14 deletions src/lsdb/io/to_hipscat.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,54 @@
from __future__ import annotations

import dataclasses
from copy import copy
from importlib.metadata import version
from typing import Any, Dict, Union
from typing import TYPE_CHECKING, Any, Dict, Union

import dask
import hipscat as hc
import pandas as pd
from hipscat.io import FilePointer
from hipscat.pixel_math import HealpixPixel

from lsdb.types import HealpixInfo

if TYPE_CHECKING:
from lsdb.catalog.catalog import Catalog


@dask.delayed
def perform_write(
df: pd.DataFrame,
hp_pixel: HealpixPixel,
base_catalog_dir: FilePointer,
storage_options: dict | None = None,
**kwargs
) -> int:
"""Performs a write of a pandas dataframe to a single parquet file, following the hipscat structure.
To be used as a dask delayed method as part of a dask task graph.
Args:
df (pd.DataFrame): dataframe to write to file
hp_pixel (HealpixPixel): HEALPix pixel of file to be written
base_catalog_dir (FilePointer): Location of the base catalog directory to write to
storage_options (dict): fsspec storage options
**kwargs: other kwargs to pass to pd.to_parquet method
Returns:
number of rows written to disk
"""
pixel_dir = hc.io.pixel_directory(base_catalog_dir, hp_pixel.order, hp_pixel.pixel)
hc.io.file_io.make_directory(pixel_dir, exist_ok=True, storage_options=storage_options)
pixel_path = hc.io.paths.pixel_catalog_file(base_catalog_dir, hp_pixel.order, hp_pixel.pixel)
hc.io.file_io.write_dataframe_to_parquet(df, pixel_path, storage_options, **kwargs)
return len(df)


# pylint: disable=W0212
def to_hipscat(
catalog,
catalog: Catalog,
base_catalog_path: str,
catalog_name: Union[str, None] = None,
storage_options: Union[Dict[Any, Any], None] = None,
Expand Down Expand Up @@ -61,7 +97,10 @@ def to_hipscat(


def write_partitions(
catalog, base_catalog_dir_fp: FilePointer, storage_options: Union[Dict[Any, Any], None] = None, **kwargs
catalog: Catalog,
base_catalog_dir_fp: FilePointer,
storage_options: Union[Dict[Any, Any], None] = None,
**kwargs
) -> Dict[HealpixPixel, int]:
"""Saves catalog partitions as parquet to disk
Expand All @@ -74,17 +113,21 @@ def write_partitions(
Returns:
A dictionary mapping each HEALPix pixel to the number of data points in it.
"""
pixel_to_partition_size_map = {}

for hp_pixel, partition_index in catalog._ddf_pixel_map.items():
# Create pixel directory if it does not exist
pixel_dir = hc.io.pixel_directory(base_catalog_dir_fp, hp_pixel.order, hp_pixel.pixel)
hc.io.file_io.make_directory(pixel_dir, exist_ok=True, storage_options=storage_options)
# Write parquet file
partition = catalog._ddf.partitions[partition_index].compute()
pixel_path = hc.io.paths.pixel_catalog_file(base_catalog_dir_fp, hp_pixel.order, hp_pixel.pixel)
hc.io.file_io.write_dataframe_to_parquet(partition, pixel_path, storage_options, **kwargs)
pixel_to_partition_size_map[hp_pixel] = len(partition)
results = []
pixel_to_result_index = {}

partitions = catalog._ddf.to_delayed()
for index, (pixel, partition_index) in enumerate(catalog._ddf_pixel_map.items()):
results.append(
perform_write(partitions[partition_index], pixel, base_catalog_dir_fp, storage_options, **kwargs)
)
pixel_to_result_index[pixel] = index

partition_sizes = dask.compute(*results)

pixel_to_partition_size_map = {
pixel: partition_sizes[index] for pixel, index in pixel_to_result_index.items()
}

return pixel_to_partition_size_map

Expand Down

0 comments on commit 49a169b

Please sign in to comment.