"""
HistFactory Distribution implementation.
Provides the HistFactoryDist class for handling binned statistical models
with samples and modifiers as defined in the HS3 specification.
"""
from __future__ import annotations
from collections.abc import Iterator
from typing import Any, Literal, cast
import hist
import numpy as np
import pytensor.tensor as pt
from pydantic import Field, PrivateAttr, model_validator
from pyhs3.axes import BinnedAxes
from pyhs3.context import Context
# Import existing distributions for constraint terms
from pyhs3.distributions.basic import GaussianDist, PoissonDist
from pyhs3.distributions.core import Distribution
from pyhs3.distributions.histfactory.data import SampleData
from pyhs3.distributions.histfactory.modifiers import (
HasConstraint,
Modifier,
ParameterModifier,
StatErrorModifier,
)
from pyhs3.distributions.histfactory.samples import Sample, Samples
from pyhs3.networks import HasDependencies, HasInternalNodes
from pyhs3.typing.aliases import TensorVar
class ModifierNode(HasDependencies):
"""Wrapper giving a modifier a globally unique name for the dependency graph.
Modifiers can share the same name across samples/types (e.g., a ``Lumi``
modifier appearing in several places), but the dependency graph requires
unique node identifiers. This lightweight wrapper provides the unique name
while delegating all functionality to the wrapped modifier.
"""
def __init__(self, name: str, modifier: Modifier):
"""Store the unique node name and the wrapped modifier."""
self.name = name
self._modifier = modifier
@property
def dependencies(self) -> set[str]:
"""Parameter names the wrapped modifier depends on."""
return self._modifier.dependencies
def expression(self, context: Context) -> TensorVar:
"""Delegate expression building to the wrapped modifier."""
return self._modifier.expression(context)
[docs]
class HistFactoryDistChannel(Distribution, HasInternalNodes):
r"""
HistFactory probability distribution for a single channel/region.
Implements binned statistical models consisting of histograms (step functions)
with various modifiers as defined in the HS3 specification. Each HistFactoryDistChannel
represents one independent measurement channel/region with its own observed data.
Multiple channels can be combined in a workspace to form a complete HistFactory model.
The total likelihood consists of:
1. **Main likelihood**: Poisson likelihood for observed bin counts vs expected rates
2. **Constraint likelihoods**: Constraint terms for nuisance parameters
The prediction for a binned region is given as:
.. math::
\lambda(x) = \sum_{s \in \text{samples}} \left[
\left( d_s(x) + \sum_{\delta \in M_\delta} \delta(x,\theta_\delta) \right)
\prod_{\kappa \in M_\kappa} \kappa(x,\theta_\kappa)
\right]
where:
- :math:`d_s(x)` is the nominal prediction for sample :math:`s`
- :math:`M_\delta` are additive modifiers (histosys)
- :math:`M_\kappa` are multiplicative modifiers (normfactor, normsys, shapefactor, etc.)
**Observed Data Convention:**
Observed data must be provided in the context as ``{name}_observed`` where
``name`` is the HistFactory distribution name. This is required for likelihood
evaluation.
**Constraint Types:**
- **Gaussian constraints** (default): histosys, normsys, staterror
- **Poisson constraints** (default): shapesys
- All constraint types can be overridden via the ``constraint`` field
Parameters:
axes (list): Array of axis definitions with binning information
samples (list): Array of sample definitions with data and modifiers
Supported Modifiers:
- normfactor: Multiplicative scaling by parameter value
- normsys: Multiplicative systematic with hi/lo interpolation
- histosys: Additive correlated shape systematic
- shapefactor: Uncorrelated multiplicative bin-by-bin scaling
- shapesys: Uncorrelated shape systematic with Poisson constraints
- staterror: Statistical uncertainty via Barlow-Beeston method
Modifier Naming in Dependency Graph:
Modifiers have simple names (e.g., "lumi") in the HS3 specification, but are
given unique identifiers in the dependency graph by prepending the full context:
``{dist_name}/{sample_name}/{modifier_type}/{modifier_name}``
This design distinguishes individual modifier instances while allowing parameters
to indicate correlation - modifiers sharing the same parameter name are correlated.
Example: Two modifiers both named "lumi" in different samples will have unique
graph nodes like "SR/signal/normsys/lumi" and "CR/background/normsys/lumi", but
if they share the same parameter name, they are correlated.
HS3 Reference:
:hs3:label:`histfactory_dist <hs3.histfactory-distribution>`
"""
type: Literal["histfactory_dist"] = "histfactory_dist"
axes: BinnedAxes = Field(..., json_schema_extra={"preprocess": False})
samples: Samples = Field(..., json_schema_extra={"preprocess": False})
barlow_beeston_method: Literal["full", "lite"] = Field(
default="lite",
json_schema_extra={"preprocess": False},
)
_normalizable: bool = PrivateAttr(default=False)
@model_validator(mode="after")
def _validate_staterror(self) -> HistFactoryDistChannel:
total_bins = self.axes.get_total_bins()
for sample in self.samples:
for mod in sample.modifiers:
if not isinstance(mod, StatErrorModifier):
continue
if self.barlow_beeston_method == "full" and mod.data is None:
msg = (
f"StatErrorModifier '{mod.name}' in channel '{self.name}' "
f"requires 'data' (uncertainties) in BB-full mode"
)
raise ValueError(msg)
if self.barlow_beeston_method == "lite" and mod.data is not None:
msg = (
f"StatErrorModifier '{mod.name}' in channel '{self.name}' "
f"must not specify 'data' in BB-lite mode; per-bin errors "
f"come from the sample data"
)
raise ValueError(msg)
if not mod.parameters:
mod.parameters = [
f"staterror_{self.name}_bin{i}" for i in range(total_bins)
]
return self
def get_internal_nodes(self) -> list[Any]:
"""
Return all internal nodes that need to be in the dependency graph.
Modifiers can have the same name across different samples/types (e.g., "Lumi"
appearing in multiple places), but the dependency graph requires unique node
identifiers. We create wrapper objects that provide unique names while
delegating to the original modifier's functionality.
"""
nodes = []
for sample in self.samples:
for modifier in sample.modifiers:
# Create unique node name: {dist_name}/{sample_name}/{modifier_type}/{modifier_name}
# This ensures uniqueness even when modifier names are reused across samples
node_name = f"{self.name}/{sample.name}/{modifier.type}/{modifier.name}"
nodes.append(ModifierNode(node_name, modifier))
return nodes
@property
def parameters(self) -> set[str]:
"""Return all parameters used by this HistFactory distribution."""
params = set()
# HistFactory distribution requires observed data parameter
params.add(f"{self.name}_observed")
# Collect parameters from all modifiers
for sample in self.samples:
for modifier in sample.modifiers:
params.update(modifier.dependencies)
return params
def likelihood(self, context: Context) -> TensorVar:
"""
Build the HistFactory main Poisson likelihood.
Returns the Poisson probability for observed bin counts vs expected rates.
Does NOT include constraint terms - those are added via extended_likelihood().
Args:
context: Mapping of parameter names to PyTensor variables
Returns:
PyTensor expression for the main Poisson model probability
"""
# Extract binning information
total_bins = self._get_total_bins()
# Process all samples and compute expected rates
expected_rates = self._compute_expected_rates(context, total_bins)
# Build main Poisson model for observed data
return self._build_main_model(context, expected_rates)
def constraint_specs(
self,
) -> Iterator[tuple[str | None, HasConstraint, SampleData]]:
"""Yield ``(dedup_key, modifier, sample_data)`` for each constraint modifier.
``dedup_key`` is the modifier's parameter name for single-parameter
modifiers (``normsys``, ``histosys``); callers may use it to dedup
constraints when multiple modifier instances reference the same nuisance
parameter — within a channel or across channels in a joint fit. For
multi-parameter modifiers (``shapesys``, ``staterror``) ``dedup_key``
is ``None`` — these constraints are channel-local by workspace validation
and are always emitted as-is.
"""
for sample in self.samples:
for modifier in sample.modifiers:
if not isinstance(modifier, HasConstraint):
continue
if isinstance(modifier, ParameterModifier):
yield modifier.parameter, modifier, sample.data
else:
yield None, modifier, sample.data
def extended_likelihood(
self, context: Context, _data: TensorVar | None = None
) -> TensorVar:
"""Build constraint product for this channel.
Constraints are deduped by parameter — multiple ``ParameterModifier``
instances sharing one nuisance parameter (e.g., two ``normsys`` on
different samples both pointing at ``alpha_lumi``) emit a single
constraint factor, not one per modifier. ``ParametersModifier``
constraints (``shapesys``, ``staterror``) carry per-bin nominal yields
and are always emitted per-modifier.
"""
seen: set[str] = set()
constraint_probs: list[TensorVar] = []
for dedup_key, modifier, sample_data in self.constraint_specs():
# Skip StatErrorModifier in lite mode - constraint built at channel level
if self.barlow_beeston_method == "lite" and isinstance(
modifier, StatErrorModifier
):
continue
if dedup_key is not None:
if dedup_key in seen:
continue
seen.add(dedup_key)
constraint_probs.append(modifier.make_constraint(context, sample_data))
# Add channel-level BB-lite constraint if in lite mode
if self.barlow_beeston_method == "lite":
lite_constraint = self._make_barlow_beeston_lite_constraint(context)
if lite_constraint is not None:
constraint_probs.append(lite_constraint)
if not constraint_probs:
return pt.constant(1.0)
return cast(TensorVar, pt.prod(pt.stack(constraint_probs))) # type: ignore[no-untyped-call]
def _get_total_bins(self) -> int:
"""Calculate total number of bins across all axes."""
return self.axes.get_total_bins()
def _find_staterror_modifier(self) -> StatErrorModifier | None:
"""Find the first staterror modifier from any sample.
In BB-lite mode, staterror modifiers share gamma parameters across samples,
so we only need one modifier to get the parameter names and constraint type.
"""
for sample in self.samples:
for modifier in sample.modifiers:
if isinstance(modifier, StatErrorModifier):
return modifier
return None
def _make_barlow_beeston_lite_constraint(
self, context: Context
) -> TensorVar | None:
"""Build BB-lite constraint from combined sample uncertainties.
BB-lite uses shared gamma parameters across samples with a channel-level
constraint built from combined MC statistical uncertainties.
The constraint can be either Poisson or Gaussian:
- Poisson: Poisson(tau | gamma * tau) where tau = (nu/sigma)^2
- Gaussian: N(1.0 | gamma, relerr) where relerr = sigma/nu
Combined uncertainties from samples:
- total_nominal = sum(sample.data.contents)
- total_sigma = sqrt(sum(sample.data.errors^2))
"""
total_bins = self._get_total_bins()
# Find staterror modifier to get gamma params and constraint type
staterror_mod = self._find_staterror_modifier()
if staterror_mod is None:
return None
gamma_params = staterror_mod.parameters
constraint_type = staterror_mod.constraint
# Compute combined uncertainties from sample.data.errors
total_nominal = np.zeros(total_bins)
total_variance = np.zeros(total_bins)
for sample in self.samples:
# Only include samples that have the staterror modifier
has_staterror = any(
isinstance(mod, StatErrorModifier) for mod in sample.modifiers
)
if has_staterror:
total_nominal += np.array(sample.data.contents)
total_variance += np.square(sample.data.errors)
total_sigma = np.sqrt(total_variance)
augmented_context = dict(context)
dists: list[GaussianDist | PoissonDist] = []
for i, param_name in enumerate(gamma_params):
nu, sigma = total_nominal[i], total_sigma[i]
# Skip bins with zero nominal yield; sigma=0 is caught by the parsing layer
if nu <= 0:
continue
if constraint_type == "Poisson":
# Poisson: Poisson(tau | gamma * tau) where tau = (nu/sigma)^2
tau = (nu / sigma) ** 2
scaled_name = f"{param_name}_scaled"
augmented_context[scaled_name] = context[param_name] * tau
dists.append(
PoissonDist(
name=f"constraint_bblite_{self.name}_{i}",
x=float(tau),
mean=scaled_name,
)
)
else: # "Gauss"
# Gaussian: N(1.0 | gamma, relerr) where relerr = sigma / nu
relerr = sigma / nu
sigma_name = f"{param_name}_sigma"
augmented_context[sigma_name] = pt.constant(relerr)
dists.append(
GaussianDist(
name=f"constraint_bblite_{self.name}_{i}",
x=1.0,
mean=param_name,
sigma=sigma_name,
)
)
if not dists:
return None
# Evaluate all distributions with augmented context and multiply
factors = []
for dist in dists:
dist_ctx = Context({**augmented_context, **dist.constants})
factors.append(dist.expression(dist_ctx))
return cast(TensorVar, pt.prod(pt.stack(factors), axis=0)) # type: ignore[no-untyped-call]
def _compute_expected_rates(self, context: Context, total_bins: int) -> TensorVar:
"""
Compute expected event rates for all bins.
Applies all modifiers to sample predictions to get final rates.
"""
# Start with zeros for total prediction
total_rates = pt.zeros(total_bins)
# Process each sample
for sample in self.samples:
sample_rates = self._process_sample(context, sample, total_bins)
total_rates = total_rates + sample_rates
return cast(TensorVar, total_rates)
def _process_sample(
self, context: Context, sample: Sample, total_bins: int
) -> TensorVar:
"""Process a single sample with its modifiers.
The HistFactory formula is lambda = (N + sum(delta_histosys(N))) * prod(kappa_multiplicative).
Additive variations (histosys) must each be computed against the sample
nominal N, then summed; multiplicative modifiers apply to the combined
result. Applying modifiers sequentially against accumulating rates would
cause histosys to compute its variation against already-scaled rates,
violating the formula when a multiplicative modifier precedes it.
"""
contents = sample.data.contents
if len(contents) != total_bins:
msg = (
f"Sample {sample.name} has {len(contents)} bins, expected {total_bins}"
)
raise ValueError(msg)
nominal_rates = pt.as_tensor_variable(contents)
# Pass 1: accumulate additive variations against the nominal.
# modifier.apply(ctx, nominal) returns nominal + variation; subtract to
# isolate the variation so multiple histosys modifiers each reference
# the same nominal rather than each other's output.
additive_sum = pt.zeros(total_bins)
for modifier in sample.modifiers:
if modifier.is_additive:
additive_sum = additive_sum + (
modifier.apply(context, nominal_rates) - nominal_rates
)
rates = nominal_rates + additive_sum
# Pass 2: apply multiplicative modifiers to (nominal + Σ additive).
for modifier in sample.modifiers:
if modifier.is_multiplicative:
rates = modifier.apply(context, rates)
return cast(TensorVar, rates)
def _build_main_model(
self, context: Context, expected_rates: TensorVar
) -> TensorVar:
"""
Build the main Poisson model for observed data.
Observed data must be provided in the context as '{name}_observed' where
name is the HistFactory distribution name. This is a required parameter
for likelihood evaluation.
Returns:
PyTensor expression for the Poisson probability (not log probability)
"""
# Create a Poisson likelihood for the observed bin counts
# Observed data is required - no defensive programming needed
observed_data_param = f"{self.name}_observed"
observed_data = context[observed_data_param]
# Observables are reshaped to (N, 1) by the model builder for broadcasting.
# Flatten to 1-D here so element-wise Poisson matches the (N,) expected_rates.
if observed_data.ndim == 2:
observed_data = observed_data[:, 0]
# Build product of individual Poisson probabilities for each bin
# P(observed_i | expected_i) = exp(observed_i * log(expected_i) - expected_i - log(observed_i!))
log_probs = (
observed_data * pt.log(expected_rates)
- expected_rates
- pt.gammaln(observed_data + 1)
)
# Convert from log probabilities to probabilities
probs = pt.exp(log_probs)
main_prob = pt.prod(probs) # type: ignore[no-untyped-call]
return cast(TensorVar, main_prob)
def to_hist(self) -> Any:
"""
Convert HistFactory channel to hist.Hist with categorical process axis.
Creates a single histogram combining all samples using a categorical axis.
The first axis is a categorical axis with sample names (labeled "process"),
followed by the original binning axes.
Returns:
hist.Hist: Histogram with shape (n_samples, *binning_shape) where:
- Axis 0: Categorical axis "process" with sample names
- Remaining axes: Original binning axes from self.axes
- Values: Sample contents (with errors as variances)
Examples:
>>> channel = HistFactoryDistChannel(
... name="SR",
... axes=[{"name": "mass", "min": 100, "max": 150, "nbins": 5}],
... samples=[
... {"name": "signal", "data": {"contents": [105, 106, 107, 108, 109], "errors": [0.5, 0.6, 0.7, 0.8, 0.9]}},
... {"name": "background", "data": {"contents": [110, 111, 112, 113, 114], "errors": [0.05, 0.06, 0.07, 0.08, 0.09]}}
... ]
... )
>>> h = channel.to_hist()
>>> h
Hist(
StrCategory(['signal', 'background'], name='process'),
Regular(5, 100, 150, name='mass'),
storage=Weight()) # Sum: WeightedSum(value=1095, variance=2.5755)
>>> h.axes[0]
StrCategory(['signal', 'background'], name='process')
>>> h["signal", :] # Get all mass bins for signal sample
Hist(Regular(5, 100, 150, name='mass'), storage=Weight()) # Sum: WeightedSum(value=535, variance=2.55)
"""
# First axis: categorical sample axis (use "process" since "sample" is a protected keyword in hist)
sample_names = [sample.name for sample in self.samples]
process_axis = hist.axis.StrCategory(sample_names, name="process")
# Convert remaining axes to hist.axis objects
binning_axes = [axis.to_hist() for axis in self.axes]
# Create histogram with all axes (categorical first, then binning axes)
h = hist.Hist(process_axis, *binning_axes, storage=hist.storage.Weight())
# Calculate shape from axes (excluding the sample axis)
binning_shape = tuple(axis.nbins for axis in self.axes)
# Fill histogram by iterating over samples
for i, sample in enumerate(self.samples):
# Reshape contents and variances for this sample
contents_nd = np.array(sample.data.contents).reshape(binning_shape)
variances_nd = np.square(sample.data.errors).reshape(binning_shape)
# Set values for this sample using integer indexing
# The sample order matches the categorical axis order, so h[i, ...] corresponds to sample.name
stacked = np.stack([contents_nd, variances_nd], axis=-1)
h[i, ...] = stacked
return h
# Registry of histfactory distributions
distributions: dict[str, type[Distribution]] = {
"histfactory_dist": HistFactoryDistChannel,
}
# Define what should be exported from this module
__all__ = [
"HistFactoryDistChannel",
"distributions",
]