Source code for hipscat.inspection.almanac

from __future__ import annotations

import os
import warnings
from typing import Any, Dict, List, Union

import pandas as pd

from hipscat.catalog.catalog import CatalogType
from hipscat.catalog.dataset.dataset import Dataset
from hipscat.inspection.almanac_info import AlmanacInfo
from hipscat.io.file_io import file_pointer
from hipscat.loaders import read_from_hipscat


[docs] class Almanac: """Single instance of an almanac, and available catalogs within namespaces Params: include_default_dir: include directory indicated in HIPSCAT_ALMANAC_DIR environment variable. see AlmanacInfo.get_default_dir dirs: additional directories or files to look for almanac files in. we support a few types of input, with different behaviors: - ``str`` - a single directory (or file) - ``list[str]`` - multiple directories (or files) - ``dict[str:str]`` / ``dict[str:list[str]]`` - namespace dictionary. for each key in the dictionary, we put all almanac entries under a namespace. this is useful if you have name collisions e.g. between multiple surveys or user-provided catalogs. """ def __init__( self, include_default_dir=True, dirs=None, storage_options: Union[Dict[Any, Any], None] = None ): """Create new almanac.""" self.files = {} self.entries = {} self.dir_to_catalog_name = {} self.storage_options = storage_options self._init_files(include_default_dir=include_default_dir, dirs=dirs) self._init_catalog_objects() self._init_catalog_links()
[docs] def _init_files(self, include_default_dir=True, dirs=None): """Create a list of all the almanac files we want to add to this instance. Each almanac file corresponds to a single catalog. Args: include_default_dir: include directory indicated in HIPSCAT_ALMANAC_DIR environment variable. see AlmanacInfo.get_default_dir dirs: additional directories to look for almanac files in """ if include_default_dir: default_dir = AlmanacInfo.get_default_dir() if default_dir: self._add_files_to_namespace(default_dir) if pd.api.types.is_dict_like(dirs): for key, value in dirs.items(): self._add_files_to_namespace(value, key) elif pd.api.types.is_list_like(dirs): self._add_files_to_namespace(dirs) elif dirs is not None: self._add_files_to_namespace(dirs)
[docs] def _add_files_to_namespace(self, directory, namespace=""): """Get almanac files within a directory or list of directories. Args: directory: directory to scan namespace: if provided, files in this directory will be in their own namespace in the almanac """ if not pd.api.types.is_list_like(directory): directory = [directory] files = [] for input_path in directory: if os.path.isfile(input_path): files.append(input_path) continue path_contents = file_pointer.get_directory_contents( input_path, include_protocol=True, storage_options=self.storage_options ) input_paths = [x for x in path_contents if str(x).endswith(".yml")] input_paths.sort() files.extend(input_paths) if namespace in self.files: self.files[namespace].extend(files) else: self.files[namespace] = files
[docs] def _init_catalog_objects(self): """Create (unlinked) almanac info objects for all the files found in the previous steps.""" for namespace, files in self.files.items(): for file in files: catalog_info = AlmanacInfo.from_file(file, storage_options=self.storage_options) catalog_info.namespace = namespace if namespace: full_name = f"{namespace}:{catalog_info.catalog_name}" else: full_name = catalog_info.catalog_name if full_name in self.entries: warnings.warn(f"Duplicate catalog name ({full_name}). Try using namespaces.") else: self.entries[full_name] = catalog_info self.dir_to_catalog_name[catalog_info.catalog_path] = full_name
[docs] def _get_linked_catalog(self, linked_text, namespace) -> AlmanacInfo | None: """Find a catalog to be used for linking catalogs within the almanac. e.g. for an association table, we will have a primary and join catalog. the association catalog is "receiving" the link of primary catalog info, and a link of join catalog info. Args: linked_text: text provided for the linked catalog. this could take a few different forms: - empty or None (returns None) - short name of a catalog - namespaced name of a catalog - full path to a catalog base directory - path to a catalog base directory, with environment variables namespace: the namespace in the catalog **receiving** the link. this is used to resolve the linked_text argument, so if you're relying on namespaces, the receiving and linking catalog should be in the same namespace Returns: almanac info for the linked catalog, if found """ resolved_path = os.path.expandvars(linked_text) if linked_text in self.dir_to_catalog_name: # pragma: no cover linked_text = self.dir_to_catalog_name[linked_text] elif resolved_path in self.dir_to_catalog_name: linked_text = self.dir_to_catalog_name[resolved_path] resolved_name = linked_text if not resolved_name in self.entries: resolved_name = f"{namespace}:{linked_text}" if not resolved_name in self.entries: return None return self.entries[resolved_name]
[docs] def catalogs(self, include_deprecated=False, types: List[str] | None = None): """Get names of catalogs in the almanac, matching the provided conditions. Catalogs must meet all criteria provided in order to be returned (e.g. the criteria are ANDED together). Args: include_deprecated: include catalogs which contain some text in their ``deprecated`` field. types: include ONLY catalogs within the list of provided types. """ selected = [] for full_name, catalog_info in self.entries.items(): include = True if not include_deprecated and catalog_info.deprecated: include = False if types and catalog_info.catalog_type not in types: include = False if include: selected.append(full_name) return selected
[docs] def get_almanac_info(self, catalog_name: str) -> AlmanacInfo: """Fetch the almanac info for a single catalog.""" return self.entries[catalog_name]
[docs] def get_catalog(self, catalog_name: str) -> Dataset: """Fetch the fully-populated hipscat metadata for the catalog name. This will load the ``catalog_info.join`` and other relevant metadata files from disk.""" return read_from_hipscat( self.get_almanac_info(catalog_name=catalog_name).catalog_path, storage_options=self.storage_options, )