"""Methods for creating partitioned data paths"""
from __future__ import annotations
import re
from typing import Dict, List
from hipscat.io.file_io.file_pointer import FilePointer, append_paths_to_pointer, get_fs
from hipscat.pixel_math.healpix_pixel import INVALID_PIXEL, HealpixPixel
[docs]
ORDER_DIRECTORY_PREFIX = "Norder"
[docs]
DIR_DIRECTORY_PREFIX = "Dir"
[docs]
PIXEL_DIRECTORY_PREFIX = "Npix"
[docs]
JOIN_ORDER_DIRECTORY_PREFIX = "join_Norder"
[docs]
JOIN_DIR_DIRECTORY_PREFIX = "join_Dir"
[docs]
JOIN_PIXEL_DIRECTORY_PREFIX = "join_Npix"
[docs]
CATALOG_INFO_FILENAME = "catalog_info.json"
[docs]
PARTITION_INFO_FILENAME = "partition_info.csv"
[docs]
PARTITION_JOIN_INFO_FILENAME = "partition_join_info.csv"
[docs]
PROVENANCE_INFO_FILENAME = "provenance_info.json"
[docs]
POINT_MAP_FILENAME = "point_map.fits"
[docs]
def pixel_directory(
catalog_base_dir: FilePointer,
pixel_order: int,
pixel_number: int | None = None,
directory_number: int | None = None,
) -> FilePointer:
"""Create path pointer for a pixel directory. This will not create the directory.
One of pixel_number or directory_number is required. The directory name will
take the HiPS standard form of::
<catalog_base_dir>/Norder=<pixel_order>/Dir=<directory number>
Where the directory number is calculated using integer division as::
(pixel_number/10000)*10000
Args:
catalog_base_dir (FilePointer): base directory of the catalog (includes catalog name)
pixel_order (int): the healpix order of the pixel
directory_number (int): directory number
pixel_number (int): the healpix pixel
Returns:
FilePointer directory name
"""
norder = int(pixel_order)
if directory_number is not None:
ndir = directory_number
elif pixel_number is not None:
npix = int(pixel_number)
ndir = int(npix / 10_000) * 10_000
else:
raise ValueError("One of pixel_number or directory_number is required to create pixel directory")
return create_hive_directory_name(
catalog_base_dir,
[ORDER_DIRECTORY_PREFIX, DIR_DIRECTORY_PREFIX],
[norder, ndir],
)
[docs]
def get_healpix_from_path(path: str) -> HealpixPixel:
"""Find the `pixel_order` and `pixel_number` from a string like the
following::
Norder=<pixel_order>/Dir=<directory number>/Npix=<pixel_number>.parquet
NB: This expects the format generated by the `pixel_catalog_file` method
Args:
path (str): path to parse
Returns:
Constructed HealpixPixel object representing the pixel in the path.
`INVALID_PIXEL` if the path doesn't match the expected pattern for any reason.
"""
match = re.match(r".*Norder=(\d*).*Npix=(\d*).*", path)
if not match:
return INVALID_PIXEL
order, pixel = match.groups()
return HealpixPixel(int(order), int(pixel))
[docs]
def pixel_catalog_files(
catalog_base_dir: FilePointer, pixels: List[HealpixPixel], storage_options: Dict | None = None
) -> List[FilePointer]:
"""Create a list of path *pointers* for pixel catalog files. This will not create the directory
or files.
The catalog file names will take the HiPS standard form of::
<catalog_base_dir>/Norder=<pixel_order>/Dir=<directory number>/Npix=<pixel_number>.parquet
Where the directory number is calculated using integer division as::
(pixel_number/10000)*10000
Args:
catalog_base_dir (FilePointer): base directory of the catalog (includes catalog name)
pixels (List[HealpixPixel]): the healpix pixels to create pointers to
storage_options (dict): the storage options for the file system to target when generating the paths
Returns (List[FilePointer]):
A list of paths to the pixels, in the same order as the input pixel list.
"""
fs, _ = get_fs(catalog_base_dir, storage_options)
base_path_stripped = catalog_base_dir.removesuffix(fs.sep)
return [
fs.sep.join(
[
base_path_stripped,
f"{ORDER_DIRECTORY_PREFIX}={pixel.order}",
f"{DIR_DIRECTORY_PREFIX}={pixel.dir}",
f"{PIXEL_DIRECTORY_PREFIX}={pixel.pixel}.parquet",
]
)
for pixel in pixels
]
[docs]
def pixel_catalog_file(catalog_base_dir: FilePointer, pixel_order: int, pixel_number: int) -> FilePointer:
"""Create path *pointer* for a pixel catalog file. This will not create the directory
or file.
The catalog file name will take the HiPS standard form of::
<catalog_base_dir>/Norder=<pixel_order>/Dir=<directory number>/Npix=<pixel_number>.parquet
Where the directory number is calculated using integer division as::
(pixel_number/10000)*10000
Args:
catalog_base_dir (FilePointer): base directory of the catalog (includes catalog name)
pixel_order (int): the healpix order of the pixel
pixel_number (int): the healpix pixel
Returns:
string catalog file name
"""
return create_hive_parquet_file_name(
catalog_base_dir,
[ORDER_DIRECTORY_PREFIX, DIR_DIRECTORY_PREFIX, PIXEL_DIRECTORY_PREFIX],
[int(pixel_order), int(pixel_number / 10_000) * 10_000, int(pixel_number)],
)
[docs]
def create_hive_directory_name(base_dir, partition_token_names, partition_token_values):
"""Create path *pointer* for a directory with hive partitioning naming.
This will not create the directory.
The directory name will have the form of::
<catalog_base_dir>/<name_1>=<value_1>/.../<name_n>=<value_n>
Args:
catalog_base_dir (FilePointer): base directory of the catalog (includes catalog name)
partition_token_names (list[string]): list of partition name parts.
partition_token_values (list[string]): list of partition values that
correspond to the token name parts.
"""
partition_tokens = [
f"{name}={value}" for name, value in zip(partition_token_names, partition_token_values)
]
return append_paths_to_pointer(base_dir, *partition_tokens)
[docs]
def create_hive_parquet_file_name(base_dir, partition_token_names, partition_token_values):
"""Create path *pointer* for a single parquet with hive partitioning naming.
The file name will have the form of::
<catalog_base_dir>/<name_1>=<value_1>/.../<name_n>=<value_n>.parquet
Args:
catalog_base_dir (FilePointer): base directory of the catalog (includes catalog name)
partition_token_names (list[string]): list of partition name parts.
partition_token_values (list[string]): list of partition values that
correspond to the token name parts.
"""
partition_tokens = [
f"{name}={value}" for name, value in zip(partition_token_names, partition_token_values)
]
return f"{append_paths_to_pointer(base_dir, *partition_tokens)}.parquet"
[docs]
def get_catalog_info_pointer(catalog_base_dir: FilePointer) -> FilePointer:
"""Get file pointer to `catalog_info.json` metadata file
Args:
catalog_base_dir: pointer to base catalog directory
Returns:
File Pointer to the catalog's `catalog_info.json` file
"""
return append_paths_to_pointer(catalog_base_dir, CATALOG_INFO_FILENAME)
[docs]
def get_partition_info_pointer(catalog_base_dir: FilePointer) -> FilePointer:
"""Get file pointer to `partition_info.csv` metadata file
Args:
catalog_base_dir: pointer to base catalog directory
Returns:
File Pointer to the catalog's `partition_info.csv` file
"""
return append_paths_to_pointer(catalog_base_dir, PARTITION_INFO_FILENAME)
[docs]
def get_provenance_pointer(catalog_base_dir: FilePointer) -> FilePointer:
"""Get file pointer to `provenance_info.json` metadata file
Args:
catalog_base_dir: pointer to base catalog directory
Returns:
File Pointer to the catalog's `provenance_info.json` file
"""
return append_paths_to_pointer(catalog_base_dir, PROVENANCE_INFO_FILENAME)
[docs]
def get_point_map_file_pointer(catalog_base_dir: FilePointer) -> FilePointer:
"""Get file pointer to `point_map.fits` FITS image file.
Args:
catalog_base_dir: pointer to base catalog directory
Returns:
File Pointer to the catalog's `point_map.fits` FITS image file.
"""
return append_paths_to_pointer(catalog_base_dir, POINT_MAP_FILENAME)
[docs]
def get_partition_join_info_pointer(catalog_base_dir: FilePointer) -> FilePointer:
"""Get file pointer to `partition_join_info.csv` association metadata file
Args:
catalog_base_dir: pointer to base catalog directory
Returns:
File Pointer to the catalog's `partition_join_info.csv` association metadata file
"""
return append_paths_to_pointer(catalog_base_dir, PARTITION_JOIN_INFO_FILENAME)