from __future__ import annotations
import logging
import warnings
from pathlib import Path
import numpy as np
import pyarrow.dataset as pds
from upath import UPath
from hats.catalog.catalog import Catalog
from hats.catalog.dataset.collection_properties import CollectionProperties
from hats.catalog.dataset.table_properties import TableProperties
from hats.catalog.healpix_dataset.healpix_dataset import HealpixDataset
from hats.catalog.index.index_catalog import IndexCatalog
from hats.catalog.margin_cache.margin_catalog import MarginCatalog
from hats.catalog.partition_info import PartitionInfo
from hats.io import get_common_metadata_pointer, get_parquet_metadata_pointer, get_partition_info_pointer
from hats.io.file_io import get_upath
from hats.io.paths import get_healpix_from_path
from hats.loaders import read_hats
from hats.pixel_math.healpix_pixel import INVALID_PIXEL
from hats.pixel_math.healpix_pixel_function import sort_pixels
# Allow f-strings here - this is all very logging-forward functionality.
# pylint: disable=logging-fstring-interpolation
[docs]
def is_valid_catalog(
pointer: str | Path | UPath,
strict: bool = False,
fail_fast: bool = False,
verbose: bool = True,
) -> bool:
"""Checks if a catalog is valid for a given base catalog pointer
NB: This method uses logging to issue INFO, WARNING, and ERROR
messages. Configure your logging environment to appropriately
channel messages.
Parameters
----------
pointer : str | Path | UPath
pointer to base catalog directory
strict : bool
should we perform additional checking that every optional
file exists, and contains valid, consistent information, AS WELL AS
all expected data partitions for the catalog's partitions.
(Default value = False)
fail_fast : bool
DEPRECATED - will be removed in a future version.
verbose : bool
DEPRECATED - will be removed in a future version.
Returns
-------
bool
True if both the properties and partition_info files are valid, False otherwise
"""
del fail_fast
del verbose
pointer = get_upath(pointer)
if is_collection_info_valid(pointer):
logging.warning("Looking for catalog - found collection.")
return is_valid_collection(pointer, strict=strict)
if not strict:
return _is_catalog_info_valid(pointer) and (
_is_partition_info_valid(pointer) or _is_metadata_valid(pointer)
)
is_valid, _ = _is_valid_catalog_strict(pointer)
return is_valid
[docs]
def is_valid_collection(
pointer: str | Path | UPath,
strict: bool = False,
fail_fast: bool = False,
verbose: bool = True,
) -> bool:
"""Checks if a COLLECTION is valid for a given base catalog pointer
NB: This method uses logging to issue INFO, WARNING, and ERROR
messages. Configure your logging environment to appropriately
channel messages.
Parameters
----------
pointer : str | Path | UPath
pointer to base catalog collection directory
strict : bool
should we perform additional checking that every optional
file exists, and contains valid, consistent information, AS WELL
AS strict checking on all sub-catalogs (primary, margin, and index).
(Default value = False)
fail_fast : bool
DEPRECATED - will be removed in a future version.
verbose : bool
DEPRECATED - will be removed in a future version.
Returns
-------
bool
True if the collection properties are valid, and all sub-catalogs pass
validation.
"""
del fail_fast
del verbose
pointer = get_upath(pointer)
if not is_collection_info_valid(pointer):
return False
if not strict:
collection_properties = CollectionProperties.read_from_dir(pointer)
return is_valid_catalog(pointer / collection_properties.hats_primary_table_url)
# For catalog collections, we will confirm that all the member catalogs listed in the
# collection properties exist and are valid, according to their expected types.
logging.info(f"Validating collection at path {pointer} ... ")
is_valid = True
collection_properties = CollectionProperties.read_from_dir(pointer)
subcatalog_valid, sub_catalog = _is_valid_catalog_strict(
pointer / collection_properties.hats_primary_table_url
)
is_valid &= subcatalog_valid
if sub_catalog and not isinstance(sub_catalog, Catalog):
logging.warning(
"Primary catalog is the wrong type (expected Catalog, "
f"found {sub_catalog.catalog_info.catalog_type})."
)
is_valid = False
if collection_properties.all_margins:
for margin in collection_properties.all_margins:
subcatalog_valid, sub_catalog = _is_valid_catalog_strict(pointer / margin)
is_valid &= subcatalog_valid
if sub_catalog and not isinstance(sub_catalog, MarginCatalog):
logging.warning(
"Margin catalog is the wrong type (expected margin, "
f"found {sub_catalog.catalog_info.catalog_type})."
)
is_valid = False
if collection_properties.all_indexes:
for index_field, index_dir in collection_properties.all_indexes.items():
subcatalog_valid, sub_catalog = _is_valid_catalog_strict(pointer / index_dir)
is_valid &= subcatalog_valid
if sub_catalog and not isinstance(sub_catalog, IndexCatalog):
logging.warning(
f"Index catalog is the wrong type (expected index, "
f"found {sub_catalog.catalog_info.catalog_type})."
)
is_valid = False
if sub_catalog and sub_catalog.catalog_info.indexing_column != index_field:
logging.warning(
f"Index catalog index columns don't match (expected {index_field}, "
f"found {sub_catalog.catalog_info.indexing_column})."
)
is_valid = False
return is_valid
# pylint: disable=too-many-statements
def _is_valid_catalog_strict(pointer):
"""Determine if this is a valid catalog, using strict criteria.
If a catalog object can be loaded (even if it's not strictly valid),
return it as well, for type-specific checks."""
logging.info(f"Validating catalog at path {pointer} ... ")
is_valid = True
if not _is_catalog_info_valid(pointer):
logging.warning("properties file does not exist or is invalid.")
is_valid = False
if not _is_common_metadata_valid(pointer):
logging.warning("_common_metadata file does not exist.")
is_valid = False
if not is_valid:
# Even if we're not failing fast, we need to stop here if the metadata
# files don't exist.
return (False, None)
# Load as a catalog object. Confirms that the catalog info matches type.
catalog = read_hats(pointer)
metadata_file = get_parquet_metadata_pointer(pointer)
if not isinstance(catalog, HealpixDataset):
if not _is_metadata_valid(pointer):
logging.warning("_metadata file does not exist.")
return (False, catalog)
## Load as parquet dataset. Allow errors, and check pixel set against _metadata
# As a side effect, this confirms that we can load the directory as a valid dataset.
dataset = pds.parquet_dataset(metadata_file.path, filesystem=metadata_file.fs)
return (is_valid, catalog)
if not _is_partition_info_valid(pointer):
logging.warning("partition_info.csv file does not exist.")
return (False, catalog)
expected_pixels = sort_pixels(catalog.get_healpix_pixels())
logging.info(f"Found {len(expected_pixels)} partitions.")
## Compare the pixels in _metadata with partition_info.csv
# We typically warn when reading from _metadata, but it's expected right now.
if _is_metadata_valid(pointer):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
metadata_pixels = sort_pixels(PartitionInfo.read_from_file(metadata_file).get_healpix_pixels())
if not np.array_equal(expected_pixels, metadata_pixels):
logging.warning("Partition pixels differ between catalog and _metadata file")
is_valid = False
## Load as parquet dataset. Allow errors, and check pixel set against _metadata
# As a side effect, this confirms that we can load the directory as a valid dataset.
dataset = pds.parquet_dataset(metadata_file.path, filesystem=metadata_file.fs)
parquet_path_pixels = []
for hats_file in dataset.files:
hats_fp = UPath(hats_file, protocol=metadata_file.protocol, **metadata_file.storage_options)
healpix_pixel = get_healpix_from_path(hats_file)
if healpix_pixel == INVALID_PIXEL:
logging.warning(f"Could not derive partition pixel from parquet path: {str(hats_fp)}")
is_valid = False
parquet_path_pixels.append(healpix_pixel)
parquet_path_pixels = sort_pixels(parquet_path_pixels)
if not np.array_equal(expected_pixels, parquet_path_pixels):
logging.warning(
"Partition pixels differ between _metadata and partition_info\n"
f"Extra: {set(parquet_path_pixels) - set(expected_pixels)} \n"
f"Missing: {set(expected_pixels) - set(parquet_path_pixels)}"
)
is_valid = False
else:
logging.warning("_metadata file does not exist.")
if len(catalog.get_healpix_pixels()) > 10_000:
logging.info(
f"Checking file existence for {len(catalog.get_healpix_pixels())} data partitions."
" This might take a while."
)
parquet_path_pixels = []
for pixel_path in pointer.rglob("Norder*/Dir*/Npix*"):
healpix_pixel = get_healpix_from_path(pixel_path)
if healpix_pixel == INVALID_PIXEL:
logging.warning(f"Could not derive partition pixel from parquet path: {str(pixel_path)}")
is_valid = False
parquet_path_pixels.append(healpix_pixel)
parquet_path_pixels = sort_pixels(parquet_path_pixels)
if not np.array_equal(expected_pixels, parquet_path_pixels):
logging.warning(
"Partition pixels differ between partition_info and parquet paths\n"
f"Extra: {sorted(set(parquet_path_pixels) - set(expected_pixels))} \n"
f"Missing: {sorted(set(expected_pixels) - set(parquet_path_pixels))}"
)
is_valid = False
# Print a few more stats
logging.info(
"Approximate coverage is "
f"{catalog.partition_info.calculate_fractional_coverage()*100:0.2f} % of the sky."
)
return (is_valid, catalog)
def _is_catalog_info_valid(pointer: str | Path | UPath) -> bool:
"""Checks if properties file is valid for a given base catalog pointer"""
try:
TableProperties.read_from_dir(pointer)
except (FileNotFoundError, ValueError, NotImplementedError):
return False
return True
[docs]
def is_collection_info_valid(pointer: str | Path | UPath) -> bool:
"""Checks if collection.properties file is valid for a given base catalog pointer"""
try:
CollectionProperties.read_from_dir(pointer)
except (FileNotFoundError, ValueError, NotImplementedError):
return False
return True
def _is_partition_info_valid(pointer: UPath) -> bool:
"""Checks if partition_info is valid for a given base catalog pointer"""
return get_partition_info_pointer(pointer).exists()
def _is_metadata_valid(pointer: UPath) -> bool:
"""Checks if _metadata is valid for a given base catalog pointer"""
return get_parquet_metadata_pointer(pointer).exists()
def _is_common_metadata_valid(pointer: UPath) -> bool:
"""Checks if _common_metadata is valid for a given base catalog pointer"""
return get_common_metadata_pointer(pointer).exists()