Source code for hipscat.inspection.almanac_info
from __future__ import annotations
import dataclasses
import os
from dataclasses import dataclass, field
from typing import Any, Dict, List, Union
import yaml
from typing_extensions import Self
from hipscat.catalog.dataset import catalog_info_factory
from hipscat.catalog.dataset.base_catalog_info import BaseCatalogInfo
from hipscat.io import file_io
@dataclass
[docs]
class AlmanacInfo:
"""Container for parsed almanac information."""
[docs]
storage_options: Union[Dict[Any, Any], None] = field(default_factory=dict)
[docs]
primary: str | None = None
[docs]
join: str | None = None
[docs]
primary_link: Self | None = None
[docs]
join_link: Self | None = None
[docs]
sources: List[Self] = field(default_factory=list)
[docs]
objects: List[Self] = field(default_factory=list)
[docs]
margins: List[Self] = field(default_factory=list)
[docs]
associations: List[Self] = field(default_factory=list)
[docs]
associations_right: List[Self] = field(default_factory=list)
[docs]
indexes: List[Self] = field(default_factory=list)
[docs]
creators: List[str] = field(default_factory=list)
[docs]
catalog_info: dict = field(default_factory=dict)
[docs]
catalog_info_object: BaseCatalogInfo | None = None
[docs]
def __post_init__(self):
if len(self.catalog_info):
self.catalog_info_object = catalog_info_factory.create_catalog_info(self.catalog_info)
if self.catalog_info and "primary_catalog" in self.catalog_info and not self.primary:
self.primary = self.catalog_info["primary_catalog"]
if self.catalog_info and "join_catalog" in self.catalog_info and not self.join:
self.join = self.catalog_info["join_catalog"]
## Allows use of $HIPSCAT_DEFAULT_DIR in paths
self.catalog_path = os.path.expandvars(self.catalog_path)
@staticmethod
[docs]
def get_default_dir() -> str:
"""Fetch the default directory for environment variables.
This is set via the environment variable: HIPSCAT_ALMANAC_DIR
To set this in a linux-like environment, use a command like::
export HIPSCAT_ALMANAC_DIR=/data/path/to/almanacs
This will also attempt to expand any environment variables WITHIN the default
directory environment variable. This can be useful in cases where::
$HIPSCAT_ALMANAC_DIR=$HIPSCAT_DEFAULT_DIR/almanacs/
"""
default_dir = os.environ.get("HIPSCAT_ALMANAC_DIR", "")
if default_dir:
default_dir = os.path.expandvars(default_dir)
return default_dir
@classmethod
[docs]
def from_catalog_dir(
cls, catalog_base_dir: str, storage_options: Union[Dict[Any, Any], None] = None
) -> Self:
"""Create almanac information from the catalog information found at the target directory"""
catalog_info = catalog_info_factory.from_catalog_dir(
catalog_base_dir=file_io.get_file_pointer_from_path(catalog_base_dir),
storage_options=storage_options,
)
args = {
"catalog_path": catalog_base_dir,
"catalog_name": catalog_info.catalog_name,
"catalog_type": catalog_info.catalog_type,
"catalog_info_object": catalog_info,
"catalog_info": dataclasses.asdict(catalog_info),
}
return cls(**args)
@classmethod
[docs]
def from_file(cls, file: str, storage_options: Union[Dict[Any, Any], None] = None) -> Self:
"""Create almanac information from an almanac file."""
_, fmt = os.path.splitext(file)
if fmt != ".yml":
raise ValueError(f"Unsupported file format {fmt}")
metadata = file_io.file_io.read_yaml(file, storage_options=storage_options)
return cls(**metadata)
[docs]
def write_to_file(
self,
directory=None,
default_dir=True,
fmt="yml",
storage_options: Union[Dict[Any, Any], None] = None,
):
"""Write the almanac to an almanac file"""
if default_dir and directory:
raise ValueError("Use only one of dir and default_dir")
if default_dir:
directory = AlmanacInfo.get_default_dir()
file_path = file_io.append_paths_to_pointer(
file_io.get_file_pointer_from_path(directory), f"{self.catalog_name}.{fmt}"
)
if file_io.does_file_or_directory_exist(file_path, storage_options=storage_options):
raise ValueError(f"File already exists at path {str(file_path)}")
args = {
"catalog_path": self.catalog_path,
"catalog_name": self.catalog_name,
"catalog_type": self.catalog_type,
"creators": self.creators,
"description": self.description,
"catalog_info": self.catalog_info,
}
if self.primary:
args["primary"] = self.primary
if self.join:
args["join"] = self.join
if self.version:
args["version"] = self.version
if self.deprecated:
args["deprecated"] = self.deprecated
if fmt == "yml":
encoded_string = yaml.dump(args, sort_keys=False)
else:
raise ValueError(f"Unsupported file format {fmt}")
file_io.write_string_to_file(file_path, encoded_string, storage_options=storage_options)