"""Container class to hold primary-to-join partition metadata"""
from __future__ import annotations
import warnings
from typing import Dict, List
import numpy as np
import pandas as pd
import pyarrow as pa
from hipscat.catalog.partition_info import PartitionInfo
from hipscat.io import FilePointer, file_io, paths
from hipscat.io.parquet_metadata import (
read_row_group_fragments,
row_group_stat_single_value,
write_parquet_metadata_for_batches,
)
from hipscat.pixel_math.healpix_pixel import HealpixPixel
[docs]
class PartitionJoinInfo:
"""Association catalog metadata with which partitions matches occur in the join"""
[docs]
PRIMARY_ORDER_COLUMN_NAME = "Norder"
[docs]
PRIMARY_PIXEL_COLUMN_NAME = "Npix"
[docs]
JOIN_ORDER_COLUMN_NAME = "join_Norder"
[docs]
JOIN_PIXEL_COLUMN_NAME = "join_Npix"
[docs]
COLUMN_NAMES = [
PRIMARY_ORDER_COLUMN_NAME,
PRIMARY_PIXEL_COLUMN_NAME,
JOIN_ORDER_COLUMN_NAME,
JOIN_PIXEL_COLUMN_NAME,
]
def __init__(self, join_info_df: pd.DataFrame, catalog_base_dir: str = None) -> None:
self.data_frame = join_info_df
self.catalog_base_dir = catalog_base_dir
self._check_column_names()
[docs]
def _check_column_names(self):
for column in self.COLUMN_NAMES:
if column not in self.data_frame.columns:
raise ValueError(f"join_info_df does not contain column {column}")
[docs]
def primary_to_join_map(self) -> Dict[HealpixPixel, List[HealpixPixel]]:
"""Generate a map from a single primary pixel to one or more pixels in the join catalog.
Lots of cute comprehension is happening here, so watch out!
We create tuple of (primary order/pixel) and [array of tuples of (join order/pixel)]
Returns:
dictionary mapping (primary order/pixel) to [array of (join order/pixel)]
"""
primary_map = self.data_frame.groupby(
[self.PRIMARY_ORDER_COLUMN_NAME, self.PRIMARY_PIXEL_COLUMN_NAME], group_keys=True
)
primary_to_join = [
(
HealpixPixel(int(primary_pixel[0]), int(primary_pixel[1])),
[
HealpixPixel(int(object_elem[0]), int(object_elem[1]))
for object_elem in join_group.dropna().to_numpy().T[2:4].T
],
)
for primary_pixel, join_group in primary_map
]
## Treat the array of tuples as a dictionary.
primary_to_join = dict(primary_to_join)
return primary_to_join
[docs]
def write_to_csv(self, catalog_path: FilePointer = None, storage_options: dict = None):
"""Write all partition data to CSV files.
Two files will be written:
- partition_info.csv - covers all primary catalog pixels, and should match the file structure
- partition_join_info.csv - covers all pairwise relationships between primary and
join catalogs.
Args:
catalog_path: FilePointer to the directory where the
`partition_join_info.csv` file will be written
storage_options (dict): dictionary that contains abstract filesystem credentials
Raises:
ValueError: if no path is provided, and could not be inferred.
"""
if catalog_path is None:
if self.catalog_base_dir is None:
raise ValueError("catalog_path is required if info was not loaded from a directory")
catalog_path = self.catalog_base_dir
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_path)
file_io.write_dataframe_to_csv(
self.data_frame, partition_join_info_file, index=False, storage_options=storage_options
)
primary_pixels = self.primary_to_join_map().keys()
partition_info_pointer = paths.get_partition_info_pointer(catalog_path)
partition_info = PartitionInfo.from_healpix(primary_pixels)
partition_info.write_to_file(
partition_info_file=partition_info_pointer, storage_options=storage_options
)
@classmethod
[docs]
def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> PartitionJoinInfo:
"""Read partition join info from a file within a hipscat directory.
This will look for a `partition_join_info.csv` file, and if not found, will look for
a `_metadata` file. The second approach is typically slower for large catalogs
therefore a warning is issued to the user. In internal testing with large catalogs,
the first approach takes less than a second, while the second can take 10-20 seconds.
Args:
catalog_base_dir: path to the root directory of the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials
Returns:
A `PartitionJoinInfo` object with the data from the file
Raises:
FileNotFoundError: if neither desired file is found in the catalog_base_dir
"""
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(partition_join_info_file, storage_options=storage_options):
pixel_frame = PartitionJoinInfo._read_from_csv(
partition_join_info_file, storage_options=storage_options
)
elif file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
warnings.warn("Reading partitions from parquet metadata. This is typically slow.")
pixel_frame = PartitionJoinInfo._read_from_metadata_file(
metadata_file, storage_options=storage_options
)
else:
raise FileNotFoundError(
f"_metadata or partition join info file is required in catalog directory {catalog_base_dir}"
)
return cls(pixel_frame, catalog_base_dir)
@classmethod
[docs]
def read_from_file(
cls, metadata_file: FilePointer, strict: bool = False, storage_options: dict = None
) -> PartitionJoinInfo:
"""Read partition join info from a `_metadata` file to create an object
Args:
metadata_file (FilePointer): FilePointer to the `_metadata` file
storage_options (dict): dictionary that contains abstract filesystem credentials
strict (bool): use strict parsing of _metadata file. this is slower, but
gives more helpful error messages in the case of invalid data.
Returns:
A `PartitionJoinInfo` object with the data from the file
"""
return cls(cls._read_from_metadata_file(metadata_file, strict, storage_options))
@classmethod
@classmethod
[docs]
def read_from_csv(
cls, partition_join_info_file: FilePointer, storage_options: dict = None
) -> PartitionJoinInfo:
"""Read partition join info from a `partition_join_info.csv` file to create an object
Args:
partition_join_info_file (FilePointer): FilePointer to the `partition_join_info.csv` file
storage_options (dict): dictionary that contains abstract filesystem credentials
Returns:
A `PartitionJoinInfo` object with the data from the file
"""
return cls(cls._read_from_csv(partition_join_info_file, storage_options))
@classmethod
[docs]
def _read_from_csv(
cls, partition_join_info_file: FilePointer, storage_options: dict = None
) -> pd.DataFrame:
"""Read partition join info from a `partition_join_info.csv` file to create an object
Args:
partition_join_info_file (FilePointer): FilePointer to the `partition_join_info.csv` file
storage_options (dict): dictionary that contains abstract filesystem credentials
Returns:
A `PartitionJoinInfo` object with the data from the file
"""
if not file_io.does_file_or_directory_exist(
partition_join_info_file, storage_options=storage_options
):
raise FileNotFoundError(
f"No partition join info found where expected: {str(partition_join_info_file)}"
)
return file_io.load_csv_to_pandas(partition_join_info_file, storage_options=storage_options)