"""Utility functions for handling parquet metadata files"""
# pylint: disable=too-many-lines
from __future__ import annotations
import io
import itertools
import random
from pathlib import Path
import nested_pandas as npd
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from astropy.io.votable.tree import FieldRef, Group, Param, VOTableFile
from astropy.table import Table
from upath import UPath
from hats.io import file_io, paths
from hats.io.file_io.file_pointer import get_upath
from hats.pixel_math.healpix_pixel import HealpixPixel
from hats.pixel_math.healpix_pixel_function import get_pixel_argsort
from hats.pixel_math.spatial_index import SPATIAL_INDEX_COLUMN
# pylint: disable=too-many-locals,too-many-statements
def _nonemin(value1, value2):
"""Similar to numpy's nanmin, but excludes `None` values.
NB: If both values are `None`, this will still return `None`
"""
if value1 is None:
return value2
if value2 is None:
return value1
return min(value1, value2)
def _nonemax(value1, value2):
"""Similar to numpy's nanmax, but excludes `None` values.
NB: If both values are `None`, this will still return `None`
"""
if value1 is None:
return value2
if value2 is None:
return value1
return max(value1, value2)
def _pick_columns(
first_row_group,
exclude_hats_columns: bool = True,
exclude_columns: list[str] = None,
include_columns: list[str] = None,
only_numeric_columns: bool = False,
):
"""Convenience method to find the desired columns and their indexes, given
some conventional user preferences.
"""
if include_columns is None:
include_columns = []
if exclude_columns is None:
exclude_columns = []
if exclude_hats_columns:
exclude_columns.extend(["Norder", "Dir", "Npix", "_healpix_29"])
num_columns = first_row_group.num_columns
column_names = [
first_row_group.column(col).path_in_schema for col in range(0, first_row_group.num_columns)
]
numeric_columns = [
first_row_group.column(col).path_in_schema
for col in range(0, num_columns)
if first_row_group.column(col).physical_type in ("DOUBLE", "FLOAT", "DECIMAL")
or "INT" in first_row_group.column(col).physical_type
]
column_names = [name.removesuffix(".list.element") for name in column_names]
numeric_columns = [name.removesuffix(".list.element") for name in numeric_columns]
good_column_indexes = []
for index, name in enumerate(column_names):
base_name = name.split(".")[0]
included = len(include_columns) == 0 or name in include_columns or base_name in include_columns
excluded = len(exclude_columns) > 0 and (name in exclude_columns or base_name in exclude_columns)
numeric_ok = not only_numeric_columns or name in numeric_columns
if included and not excluded and numeric_ok:
good_column_indexes.append(index)
column_names = [column_names[i] for i in good_column_indexes]
return good_column_indexes, column_names
[docs]
def aggregate_column_statistics(
metadata_file: str | Path | UPath,
*,
exclude_hats_columns: bool = True,
exclude_columns: list[str] = None,
include_columns: list[str] = None,
only_numeric_columns: bool = False,
include_pixels: list[HealpixPixel] = None,
):
"""Read footer statistics in parquet metadata, and report on global min/max values.
Parameters
----------
metadata_file : str | Path | UPath
path to `_metadata` file
exclude_hats_columns : bool
exclude HATS spatial and partitioning fields
from the statistics. Defaults to True.
exclude_columns : list[str]
additional columns to exclude from the statistics.
include_columns : list[str]
if specified, only return statistics for the column
names provided. Defaults to None, and returns all non-hats columns.
only_numeric_columns : bool
only include columns that are numeric (integer or floating point) in the
statistics. If True, the entire frame should be numeric.
(Default value = False)
include_pixels : list[HealpixPixel]
if specified, only return statistics
for the pixels indicated. Defaults to none, and returns all pixels.
Returns
-------
pd.Dataframe
Pandas dataframe with global summary statistics
"""
total_metadata = file_io.read_parquet_metadata(metadata_file)
num_row_groups = total_metadata.num_row_groups
if num_row_groups == 0:
return pd.DataFrame()
first_row_group = total_metadata.row_group(0)
good_column_indexes, column_names = _pick_columns(
first_row_group=first_row_group,
exclude_hats_columns=exclude_hats_columns,
exclude_columns=exclude_columns,
include_columns=include_columns,
only_numeric_columns=only_numeric_columns,
)
if not good_column_indexes:
return pd.DataFrame()
extrema = None
for row_group_index in range(0, num_row_groups):
row_group = total_metadata.row_group(row_group_index)
if include_pixels is not None:
pixel = paths.get_healpix_from_path(row_group.column(0).file_path)
if pixel not in include_pixels:
continue
row_stats = [
(
(None, None, 0, 0)
if row_group.column(col).statistics is None
else (
row_group.column(col).statistics.min,
row_group.column(col).statistics.max,
row_group.column(col).statistics.null_count,
row_group.column(col).num_values,
)
)
for col in good_column_indexes
]
if extrema is None:
extrema = row_stats
## This is annoying, but avoids extra copies, or none comparison.
else:
extrema = [
(
(_nonemin(extrema[col][0], row_stats[col][0])),
(_nonemax(extrema[col][1], row_stats[col][1])),
extrema[col][2] + row_stats[col][2],
extrema[col][3] + row_stats[col][3],
)
for col in range(0, len(good_column_indexes))
]
if extrema is None:
return pd.DataFrame()
stats_lists = np.array(extrema).T
frame = (
pd.DataFrame(
{
"column_names": column_names,
"min_value": stats_lists[0],
"max_value": stats_lists[1],
"null_count": stats_lists[2],
"row_count": stats_lists[3],
}
)
.set_index("column_names")
.astype({"null_count": int, "row_count": int})
)
return frame
[docs]
def aggregate_column_statistics_from_cache(
metadata_file: str | Path | UPath,
*,
exclude_hats_columns: bool = True,
exclude_columns: list[str] = None,
include_columns: list[str] = None,
only_numeric_columns: bool = False,
include_pixels: list[HealpixPixel] = None,
):
"""Using cached footer statistics in parquet metadata, and report on global min/max values.
Parameters
----------
metadata_file : str | Path | UPath
path to `_metadata` file
exclude_hats_columns : bool
exclude HATS spatial and partitioning fields
from the statistics. Defaults to True.
exclude_columns : list[str]
additional columns to exclude from the statistics.
include_columns : list[str]
if specified, only return statistics for the column
names provided. Defaults to None, and returns all non-hats columns.
only_numeric_columns : bool
only include columns that are numeric (integer or floating point) in the
statistics. If True, the entire frame should be numeric.
(Default value = False)
include_pixels : list[HealpixPixel]
if specified, only return statistics
for the pixels indicated. Defaults to none, and returns all pixels.
Returns
-------
pd.Dataframe
Pandas dataframe with global summary statistics
"""
frame = npd.read_parquet(metadata_file)
if len(frame) == 0:
return pd.DataFrame()
if include_columns is None:
include_columns = []
if exclude_columns is None:
exclude_columns = []
if exclude_hats_columns:
exclude_columns.extend(["Norder", "Dir", "Npix", "_healpix_29"])
# Pick one stat to extract column names.
column_names = [name[:-11] for name in frame.columns if name.endswith("::min_value")]
column_names = [name.removesuffix(".list.element") for name in column_names]
good_column_indexes = []
for index, name in enumerate(column_names):
base_name = name.split(".")[0]
included = len(include_columns) == 0 or name in include_columns or base_name in include_columns
excluded = len(exclude_columns) > 0 and (name in exclude_columns or base_name in exclude_columns)
## TODO - we don't have types on the original columns, just types on the new stat columns.
numeric_ok = not only_numeric_columns
if included and not excluded and numeric_ok:
good_column_indexes.append(index)
column_names = [column_names[i] for i in good_column_indexes]
if not good_column_indexes:
return pd.DataFrame()
frame["_healpix_pixel"] = frame[["Norder", "Npix"]].apply(
lambda mini_frame: HealpixPixel(mini_frame["Norder"], mini_frame["Npix"]), axis=1
)
frame = frame.set_index("_healpix_pixel")
if include_pixels is not None and len(include_pixels) > 0:
good_pixels = frame.index.intersection(include_pixels)
if len(good_pixels) == 0:
return pd.DataFrame()
frame = frame.loc[good_pixels]
all_min = []
all_max = []
all_null_count = []
all_row_count = []
for column in column_names:
all_min.append(frame[f"{column}::min_value"].min())
all_max.append(frame[f"{column}::max_value"].max())
all_null_count.append(frame[f"{column}::null_count"].sum())
all_row_count.append(frame[f"{column}::row_count"].sum())
result_frame = (
pd.DataFrame(
{
"column_names": column_names,
"min_value": all_min,
"max_value": all_max,
"null_count": all_null_count,
"row_count": all_row_count,
}
)
.set_index("column_names")
.astype({"null_count": int, "row_count": int})
)
return result_frame
# pylint: disable=too-many-positional-arguments,too-many-statements
[docs]
def per_partition_statistics_from_cache(
metadata_file: str | Path | UPath,
*,
exclude_hats_columns: bool = True,
exclude_columns: list[str] = None,
include_columns: list[str] = None,
only_numeric_columns: bool = False,
include_stats: list[str] = None,
multi_index: bool = False,
include_pixels: list[HealpixPixel] = None,
per_row_group: bool = False,
):
"""Read footer statistics in parquet metadata, and report on statistics about
each pixel partition.
The statistics gathered are a subset of the available attributes in the
``pyarrow.parquet.ColumnChunkMetaData``:
- ``min_value`` - minimum value seen in a single data partition
- ``max_value`` - maximum value seen in a single data partition
- ``null_count`` - number of null values
- ``row_count`` - total number of values. note that this will only vary by column
if you have some nested columns in your dataset
- ``disk_bytes`` - Compressed size of the data in the parquet file, in bytes
- ``memory_bytes`` - Uncompressed size, in bytes
Parameters
----------
metadata_file : str | Path | UPath
path to `_metadata` file
exclude_hats_columns : bool
exclude HATS spatial and partitioning fields
from the statistics. Defaults to True.
exclude_columns : list[str]
additional columns to exclude from the statistics.
include_columns : list[str]
if specified, only return statistics for the column
names provided. Defaults to None, and returns all non-hats columns.
only_numeric_columns : bool
only include columns that are numeric (integer or
floating point) in the statistics. If True, the entire frame should be numeric.
(Default value = False)
include_stats : list[str]
if specified, only return the kinds of values from list
(min_value, max_value, null_count, row_count, disk_bytes, memory_bytes).
Defaults to None, and returns all values.
multi_index : bool
should the returned frame be created with a multi-index, first on
pixel, then on column name? Default is False, and instead indexes on pixel, with
separate columns per-data-column and stat value combination.
(Default value = False)
include_pixels : list[HealpixPixel]
if specified, only return statistics
for the pixels indicated. Defaults to none, and returns all pixels.
per_row_group : bool
should the returned data be even more fine-grained and provide
per row group (within each pixel) level statistics? Default is currently False.
Returns
-------
pd.Dataframe
Pandas dataframe with granular per-pixel statistics
"""
frame = npd.read_parquet(metadata_file)
if len(frame) == 0:
return pd.DataFrame()
if include_columns is None:
include_columns = []
if exclude_columns is None:
exclude_columns = []
if exclude_hats_columns:
exclude_columns.extend(["Norder", "Dir", "Npix", "_healpix_29"])
# Pick one stat to extract column names.
column_names = [name[:-11] for name in frame.columns if name.endswith("::min_value")]
column_names = [name.removesuffix(".list.element") for name in column_names]
good_column_indexes = []
for index, name in enumerate(column_names):
base_name = name.split(".")[0]
included = len(include_columns) == 0 or name in include_columns or base_name in include_columns
excluded = len(exclude_columns) > 0 and (name in exclude_columns or base_name in exclude_columns)
## TODO - we don't have types on the original columns, just types on the new stat columns.
numeric_ok = not only_numeric_columns
if included and not excluded and numeric_ok:
good_column_indexes.append(index)
column_names = [column_names[i] for i in good_column_indexes]
all_stats = ["min_value", "max_value", "null_count", "row_count", "disk_bytes", "memory_bytes"]
if include_stats is None or len(include_stats) == 0:
include_stats = all_stats
else:
for stat in include_stats:
if stat not in all_stats:
raise ValueError(f"include_stats must be from list {all_stats} (found {stat})")
include_stats = [stat for stat in all_stats if stat in include_stats]
table_cols = ["Norder", "Npix", "row_group_index"] + list(
itertools.chain.from_iterable(
[[f"{col_name}::{stat}" for stat in include_stats] for col_name in column_names]
)
)
stat_col_names = ["row_group_index"] + list(
itertools.chain.from_iterable(
[[f"{col_name}: {stat}" for stat in include_stats] for col_name in column_names]
)
)
mod_col_names = ["Norder", "Npix"] + stat_col_names
renamer = dict(zip(table_cols, mod_col_names))
frame["_healpix_pixel"] = frame[["Norder", "Npix"]].apply(
lambda mini_frame: HealpixPixel(mini_frame["Norder"], mini_frame["Npix"]), axis=1
)
frame = frame.set_index(frame["_healpix_pixel"])
frame = frame.rename(columns=renamer)
if include_pixels is not None and len(include_pixels) > 0:
good_pixels = frame.index.intersection(include_pixels)
if len(good_pixels) == 0:
return pd.DataFrame()
frame = frame.loc[good_pixels]
if not per_row_group:
def aggregator(row):
## TODO - This sucks but it works.
returns = {}
for col_name in stat_col_names:
if col_name == "row_group_index":
continue
if "min_value" in col_name:
single_value = np.min(row[f"stats.{col_name}"])
elif "max_value" in col_name:
single_value = np.max(row[f"stats.{col_name}"])
else:
single_value = np.sum(row[f"stats.{col_name}"])
returns |= {col_name: single_value}
return returns
frame = npd.NestedFrame.from_flat(
frame,
on="_healpix_pixel",
base_columns=["Norder", "Npix"],
nested_columns=stat_col_names,
name="stats",
)
frame = frame.map_rows(aggregator)
frame = frame.rename_axis(None)
if multi_index:
stats_lists = frame.to_numpy().reshape(len(frame) * len(column_names), len(include_stats))
actual_pixels = np.unique(frame.index)
frame = pd.DataFrame(
stats_lists,
index=pd.MultiIndex.from_product([actual_pixels, column_names], names=["pixel", "column"]),
columns=include_stats,
) # .astype({stat_name: int for stat_name in int_stats})
elif multi_index:
## TODO - should this include the row group index in the multi-index? probably yes?
pass
return frame
# pylint: disable=too-many-positional-arguments,too-many-statements
[docs]
def per_partition_statistics(
metadata_file: str | Path | UPath,
*,
exclude_hats_columns: bool = True,
exclude_columns: list[str] = None,
include_columns: list[str] = None,
only_numeric_columns: bool = False,
include_stats: list[str] = None,
multi_index: bool = False,
include_pixels: list[HealpixPixel] = None,
per_row_group: bool = False,
):
"""Read footer statistics in parquet metadata, and report on statistics about
each pixel partition.
The statistics gathered are a subset of the available attributes in the
``pyarrow.parquet.ColumnChunkMetaData``:
- ``min_value`` - minimum value seen in a single data partition
- ``max_value`` - maximum value seen in a single data partition
- ``null_count`` - number of null values
- ``row_count`` - total number of values. note that this will only vary by column
if you have some nested columns in your dataset
- ``disk_bytes`` - Compressed size of the data in the parquet file, in bytes
- ``memory_bytes`` - Uncompressed size, in bytes
Parameters
----------
metadata_file : str | Path | UPath
path to `_metadata` file
exclude_hats_columns : bool
exclude HATS spatial and partitioning fields
from the statistics. Defaults to True.
exclude_columns : list[str]
additional columns to exclude from the statistics.
include_columns : list[str]
if specified, only return statistics for the column
names provided. Defaults to None, and returns all non-hats columns.
only_numeric_columns : bool
only include columns that are numeric (integer or
floating point) in the statistics. If True, the entire frame should be numeric.
(Default value = False)
include_stats : list[str]
if specified, only return the kinds of values from list
(min_value, max_value, null_count, row_count, disk_bytes, memory_bytes).
Defaults to None, and returns all values.
multi_index : bool
should the returned frame be created with a multi-index, first on
pixel, then on column name? Default is False, and instead indexes on pixel, with
separate columns per-data-column and stat value combination.
(Default value = False)
include_pixels : list[HealpixPixel]
if specified, only return statistics
for the pixels indicated. Defaults to none, and returns all pixels.
per_row_group : bool
should the returned data be even more fine-grained and provide
per row group (within each pixel) level statistics? Default is currently False.
Returns
-------
pd.Dataframe
Pandas dataframe with granular per-pixel statistics
"""
total_metadata = file_io.read_parquet_metadata(metadata_file)
num_row_groups = total_metadata.num_row_groups
if num_row_groups == 0:
return pd.DataFrame()
first_row_group = total_metadata.row_group(0)
good_column_indexes, column_names = _pick_columns(
first_row_group=first_row_group,
exclude_hats_columns=exclude_hats_columns,
exclude_columns=exclude_columns,
include_columns=include_columns,
only_numeric_columns=only_numeric_columns,
)
if not good_column_indexes:
return pd.DataFrame()
all_stats = ["min_value", "max_value", "null_count", "row_count", "disk_bytes", "memory_bytes"]
int_stats = ["null_count", "row_count"]
if include_stats is None or len(include_stats) == 0:
include_stats = all_stats
else:
for stat in include_stats:
if stat not in all_stats:
raise ValueError(f"include_stats must be from list {all_stats} (found {stat})")
include_stats = [stat for stat in all_stats if stat in include_stats]
int_stats = [stat for stat in int_stats if stat in include_stats]
stat_mask = np.array([ind for ind, stat in enumerate(all_stats) if stat in include_stats])
combined_stats = {}
pixels = []
leaf_stats = []
for row_group_index in range(0, num_row_groups):
row_group = total_metadata.row_group(row_group_index)
pixel = paths.get_healpix_from_path(row_group.column(0).file_path)
if include_pixels is not None and pixel not in include_pixels:
continue
row_stats = [
(
[None, None, 0, 0, 0, 0]
if row_group.column(col).statistics is None
else [
row_group.column(col).statistics.min,
row_group.column(col).statistics.max,
row_group.column(col).statistics.null_count,
row_group.column(col).num_values,
row_group.column(col).total_compressed_size,
row_group.column(col).total_uncompressed_size,
]
)
for col in good_column_indexes
]
if per_row_group:
row_stats = np.take(row_stats, stat_mask, axis=1)
pixels.append(pixel)
leaf_stats.append(row_stats)
else:
if pixel not in combined_stats:
combined_stats[pixel] = row_stats
else:
current_stats = combined_stats[pixel]
combined_stats[pixel] = [
(
_nonemin(current_stats[i][0], row_stats[i][0]),
_nonemax(current_stats[i][1], row_stats[i][1]),
current_stats[i][2] + row_stats[i][2],
current_stats[i][3] + row_stats[i][3],
current_stats[i][4] + row_stats[i][4],
current_stats[i][5] + row_stats[i][5],
)
for i in range(0, len(good_column_indexes))
]
if per_row_group:
stats_lists = np.array(leaf_stats)
else:
pixels = list(combined_stats.keys())
stats_lists = np.array(
[np.take(row_stats, stat_mask, axis=1) for row_stats in combined_stats.values()]
)
original_shape = stats_lists.shape
if len(stats_lists) == 0:
return pd.DataFrame()
if multi_index:
stats_lists = stats_lists.reshape((original_shape[0] * original_shape[1], original_shape[2]))
frame = pd.DataFrame(
stats_lists,
index=pd.MultiIndex.from_product([pixels, column_names], names=["pixel", "column"]),
columns=include_stats,
).astype({stat_name: int for stat_name in int_stats})
else:
stats_lists = stats_lists.reshape((original_shape[0], original_shape[1] * original_shape[2]))
mod_col_names = [[f"{col_name}: {stat}" for stat in include_stats] for col_name in column_names]
mod_col_names = np.array(mod_col_names).flatten()
int_col_names = [[f"{col_name}: {stat}" for stat in int_stats] for col_name in column_names]
int_col_names = np.array(int_col_names).flatten()
frame = pd.DataFrame(stats_lists, index=pixels, columns=mod_col_names).astype(
{stat_name: int for stat_name in int_col_names}
)
return frame
# pylint: disable=protected-access
[docs]
def nested_frame_to_vo_schema(
nested_frame: npd.NestedFrame,
*,
verbose: bool = False,
field_units: dict | None = None,
field_ucds: dict | None = None,
field_descriptions: dict | None = None,
field_utypes: dict | None = None,
):
"""Create VOTableFile metadata, based on the names and types of fields in the NestedFrame.
Add ancillary attributes to fields where they are provided in the optional dictionaries.
Note on field names with nested columns: to include ancillary attributes (units, ucds, etc)
for a nested sub-column, use dot notation (e.g. ``"lightcurve.band"``). You can add ancillary
attributes for the entire nested column group using the nested column name (e.g. ``"lightcurve"``).
Parameters
----------
nested_frame : npd.NestedFrame
nested frame representing catalog data. this can be empty, as we only need to
know about the column names and types.
verbose: bool
Should we print out additional debugging statements about the vo metadata?
field_units: dict | None
dictionary mapping column names to astropy units (or string representation of units)
field_ucds: dict | None
dictionary mapping column names to UCDs (Uniform Content Descriptors)
field_descriptions: dict | None
dictionary mapping column names to free-text descriptions
field_utypes: dict | None
dictionary mapping column names to utypes
Returns
-------
VOTableFile
VO object containing all relevant metadata (but no data)
"""
field_units = field_units or {}
field_ucds = field_ucds or {}
field_descriptions = field_descriptions or {}
field_utypes = field_utypes or {}
# Collate and tidy up the column names and data types.
df_types = nested_frame.to_pandas().dtypes
names = []
data_types = []
for col in nested_frame.base_columns:
names.append(col)
data_types.append(str(df_types[col]))
for col in nested_frame.nested_columns:
for key, val in nested_frame[col].dtype.column_dtypes.items():
names.append(f"{col}.{key}")
data_types.append(str(val))
# astropy.Table uses numpy-style dtypes, and this cleans up type strings.
data_types = ["U" if "string" in t else t.removesuffix("[pyarrow]") for t in data_types]
# Might have extra content for nested columns.
named_descriptions = {key: field_descriptions[key] for key in field_descriptions if key in names}
named_units = {key: field_units[key] for key in field_units if key in names}
if verbose:
dropped_keys_units = set(field_units.keys()) - set(named_units.keys())
dropped_keys_desc = set(field_descriptions.keys()) - set(named_descriptions.keys())
if dropped_keys_units or dropped_keys_desc:
print("================== Extra Fields ==================")
if dropped_keys_units:
print(f"warning - dropping some units ({len(dropped_keys_units)}):")
print(dropped_keys_units)
if dropped_keys_desc:
print(f"warning - dropping some descriptions ({len(dropped_keys_desc)}):")
print(dropped_keys_desc)
t = Table(names=names, dtype=data_types, units=named_units, descriptions=named_descriptions)
votablefile = VOTableFile()
votablefile = votablefile.from_table(t)
## TODO - add info to root resource, e.g. obsregime.
## Add groups for nested columns
vo_table = votablefile.get_first_table()
for col in nested_frame.nested_columns:
new_group = Group(vo_table, name=col, config=vo_table._config, pos=vo_table._pos)
if col in field_descriptions:
new_group.description = field_descriptions[col]
else:
new_group.description = "multi-column nested format"
vo_table.groups.append(new_group)
new_param = Param(vo_table, name="is_nested_column", datatype="boolean", value="t")
new_group.entries.append(new_param)
for key in nested_frame[col].columns:
new_field = FieldRef(vo_table, ref=f"{col}.{key}")
new_group.entries.append(new_field)
## Go back and add UCD/utypes to fields
for field in vo_table.iter_fields_and_params():
field_name = field.name
if field_name in field_ucds:
field.ucd = field_ucds[field_name]
if field_name in field_utypes:
field.utype = field_utypes[field_name]
return votablefile