Source code for hats.catalog.dataset.dataset

from __future__ import annotations

import warnings
from pathlib import Path

import pandas as pd
import pyarrow as pa
from deprecated import deprecated  # type: ignore
from upath import UPath

from hats.catalog.catalog_snapshot import CatalogSnapshot
from hats.catalog.dataset.table_properties import TableProperties
from hats.io import file_io
from hats.io.parquet_metadata import aggregate_column_statistics, per_partition_statistics


# pylint: disable=too-few-public-methods
[docs] class Dataset: """A base HATS dataset that contains a properties file and the data contained in parquet files""" def __init__( self, catalog_info: TableProperties, catalog_path: str | Path | UPath | None = None, schema: pa.Schema | None = None, snapshot: CatalogSnapshot | None = None, generate_snapshot: bool = False, ) -> None: """Initializes a Dataset Parameters ---------- catalog_info: TableProperties A TableProperties object with the catalog metadata catalog_path: str | Path | UPath | None If the catalog is stored on disk, specify the location of the catalog Does not load the catalog from this path, only store as metadata schema : pa.Schema The pyarrow schema for the catalog. May be modified e.g. based on loaded columns snapshot : CatalogSnapshot Immutable snapshot of the catalog's original on-disk schema and partition info. May NOT be modified e.g. based on loaded columns or filtered pixels. generate_snapshot : bool If True and no snapshot is provided, automatically generate one from the current schema and catalog_info. Default False. """
[docs] self.catalog_info = catalog_info
[docs] self.catalog_name = self.catalog_info.catalog_name
[docs] self.catalog_path = catalog_path
[docs] self.catalog_base_dir = file_io.get_upath(self.catalog_path)
[docs] self.schema = schema
if snapshot is None and generate_snapshot: snapshot = CatalogSnapshot(schema=schema, catalog_info=catalog_info)
[docs] self.snapshot = snapshot
@property
[docs] def original_schema(self) -> pa.Schema | None: """The original on-disk schema, before any column selection.""" return self.snapshot.schema if self.snapshot is not None else None
@property
[docs] def on_disk(self) -> bool: """Is the catalog stored on disk?""" return self.catalog_path is not None
@property
[docs] def unmodified(self) -> bool: """Has the catalog been modified from its original on disk state?""" return self.catalog_info.total_rows is not None
[docs] def aggregate_column_statistics( self, exclude_hats_columns: bool = True, exclude_columns: list[str] = None, include_columns: list[str] = None, ): """Read footer statistics in parquet metadata, and report on global min/max values. Parameters ---------- exclude_hats_columns : bool exclude HATS spatial and partitioning fields from the statistics. Defaults to True. exclude_columns : list[str] additional columns to exclude from the statistics. include_columns : list[str] if specified, only return statistics for the column names provided. Defaults to None, and returns all non-hats columns. Returns ------- Dataframe aggregated statistics. """ if not self.on_disk: warnings.warn("Calling aggregate_column_statistics on an in-memory catalog. No results.") return pd.DataFrame() if not self.unmodified: warnings.warn( "Calling aggregate_column_statistics on a modified catalog. Results may be inaccurate." ) return aggregate_column_statistics( self.catalog_base_dir / "dataset" / "_metadata", exclude_hats_columns=exclude_hats_columns, exclude_columns=exclude_columns, include_columns=include_columns, )
@deprecated( version="0.9.1", reason="`per_pixel_statistics` will be removed in the future, " "use `per_partition_statistics` instead.", )
[docs] def per_pixel_statistics( self, *, exclude_hats_columns: bool = True, exclude_columns: list[str] | None = None, include_columns: list[str] | None = None, only_numeric_columns: bool = False, include_stats: list[str] | None = None, multi_index=False, per_row_group: bool = False, ): """Read footer statistics in parquet metadata, and report on statistics about each pixel partition.""" return self.per_partition_statistics( exclude_hats_columns=exclude_hats_columns, exclude_columns=exclude_columns, include_columns=include_columns, only_numeric_columns=only_numeric_columns, include_stats=include_stats, multi_index=multi_index, per_row_group=per_row_group, )
[docs] def per_partition_statistics( self, *, exclude_hats_columns: bool = True, exclude_columns: list[str] = None, include_columns: list[str] = None, only_numeric_columns: bool = False, include_stats: list[str] = None, multi_index=False, per_row_group: bool = False, ): """Read footer statistics in parquet metadata, and report on statistics about each pixel partition. Parameters ---------- exclude_hats_columns : bool exclude HATS spatial and partitioning fields from the statistics. Defaults to True. exclude_columns : list[str] additional columns to exclude from the statistics. include_columns : list[str] if specified, only return statistics for the column names provided. Defaults to None, and returns all non-hats columns. include_stats : list[str] if specified, only return the kinds of values from list (min_value, max_value, null_count, row_count). Defaults to None, and returns all values. multi_index : bool should the returned frame be created with a multi-index, first on pixel, then on column name? Default is False, and instead indexes on pixel, with separate columns per-data-column and stat value combination. (Default value = False) Returns ------- Dataframe all statistics. """ if not self.on_disk: warnings.warn("Calling per_partition_statistics on an in-memory catalog. No results.") return pd.DataFrame() if not self.unmodified: warnings.warn( "Calling per_partition_statistics on a modified catalog. Results may be inaccurate." ) return per_partition_statistics( self.catalog_base_dir / "dataset" / "_metadata", exclude_hats_columns=exclude_hats_columns, exclude_columns=exclude_columns, include_columns=include_columns, only_numeric_columns=only_numeric_columns, include_stats=include_stats, multi_index=multi_index, per_row_group=per_row_group, )