Source code for hipscat.catalog.association_catalog.association_catalog

from __future__ import annotations

from typing import Any, Dict, Tuple, Union

import pandas as pd
from mocpy import MOC
from typing_extensions import TypeAlias

from hipscat.catalog.association_catalog.association_catalog_info import AssociationCatalogInfo
from hipscat.catalog.association_catalog.partition_join_info import PartitionJoinInfo
from hipscat.catalog.catalog_type import CatalogType
from hipscat.catalog.healpix_dataset.healpix_dataset import HealpixDataset, PixelInputTypes
from hipscat.io import FilePointer, file_io, paths


[docs] class AssociationCatalog(HealpixDataset): """A HiPSCat Catalog for enabling fast joins between two HiPSCat catalogs Catalogs of this type are partitioned based on the partitioning of the left catalog. The `partition_join_info` metadata file specifies all pairs of pixels in the Association Catalog, corresponding to each pair of partitions in each catalog that contain rows to join. """ # Update CatalogInfoClass, used to check if the catalog_info is the correct type, and # set the catalog info to the correct type
[docs] CatalogInfoClass: TypeAlias = AssociationCatalogInfo
[docs] catalog_info: CatalogInfoClass
[docs] JoinPixelInputTypes = Union[list, pd.DataFrame, PartitionJoinInfo]
def __init__( self, catalog_info: CatalogInfoClass, pixels: PixelInputTypes, join_pixels: JoinPixelInputTypes, catalog_path=None, moc: MOC | None = None, storage_options: Union[Dict[Any, Any], None] = None, ) -> None: if not catalog_info.catalog_type == CatalogType.ASSOCIATION: raise ValueError("Catalog info `catalog_type` must be 'association'") super().__init__(catalog_info, pixels, catalog_path, moc=moc, storage_options=storage_options) self.join_info = self._get_partition_join_info_from_pixels(join_pixels)
[docs] def get_join_pixels(self) -> pd.DataFrame: """Get join pixels listing all pairs of pixels from left and right catalogs that contain matching association rows Returns: pd.DataFrame with each row being a pair of pixels from the primary and join catalogs """ return self.join_info.data_frame
@staticmethod
[docs] def _get_partition_join_info_from_pixels( join_pixels: JoinPixelInputTypes, ) -> PartitionJoinInfo: if isinstance(join_pixels, PartitionJoinInfo): return join_pixels if isinstance(join_pixels, pd.DataFrame): return PartitionJoinInfo(join_pixels) raise TypeError("join_pixels must be of type PartitionJoinInfo or DataFrame")
@classmethod
[docs] def _read_args( cls, catalog_base_dir: FilePointer, storage_options: Union[Dict[Any, Any], None] = None ) -> Tuple[CatalogInfoClass, PixelInputTypes, JoinPixelInputTypes]: # type: ignore[override] args = super()._read_args(catalog_base_dir, storage_options=storage_options) partition_join_info = PartitionJoinInfo.read_from_dir( catalog_base_dir, storage_options=storage_options ) return args + (partition_join_info,)
@classmethod
[docs] def _check_files_exist(cls, catalog_base_dir: FilePointer, storage_options: dict = None): super()._check_files_exist(catalog_base_dir, storage_options=storage_options) partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir) metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir) if not ( file_io.does_file_or_directory_exist(partition_join_info_file, storage_options=storage_options) or file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options) ): raise FileNotFoundError( f"_metadata or partition join info file is required in catalog directory {catalog_base_dir}" )