Skip to content

Commit

Permalink
Add t-distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex-JG3 committed Aug 27, 2023
1 parent 7fa2aa6 commit 9f20201
Showing 1 changed file with 166 additions and 0 deletions.
166 changes: 166 additions & 0 deletions skpro/distributions/t.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# copyright: sktime developers, BSD-3-Clause License (see LICENSE file)
"""Student's t-distribution."""

__author__ = ["Alex-JG3"]

import numpy as np
import pandas as pd
from scipy.special import betaincinv, gamma, hyp2f1, loggamma

from skpro.distributions.base import BaseDistribution


class TDistribution(BaseDistribution):
"""Student's t-distribution (sktime native).
Parameters
----------
mean : float or array of float (1D or 2D)
mean of the t-distribution distribution
sd : float or array of float (1D or 2D), must be positive
standard deviation of the t-distribution distribution
df : float or array of float (1D or 2D), must be positive
Degrees of freedom of the t-distribution
index : pd.Index, optional, default = RangeIndex
columns : pd.Index, optional, default = RangeIndex
Example
-------
>>> from sktime.proba.t import TDistribution
>>> n = TDistribution(mu=[[0, 1], [2, 3], [4, 5]], sigma=1, df=10)
"""

_tags = {
"capabilities:approx": ["pdfnorm", "energy"],
"capabilities:exact": ["mean", "var", "pdf", "log_pdf", "cdf", "ppf"],
"distr:measuretype": "continuous",
}

def __init__(self, mu, sigma, df=1, index=None, columns=None):
self.mu = mu
self.sigma = sigma
self.df = df
self.index = index
self.columns = columns

self._mu, self._sigma, self._df = self._get_bc_params()
shape = self._mu.shape

if index is None:
index = pd.RangeIndex(shape[0])

if columns is None:
columns = pd.RangeIndex(shape[1])

super().__init__(index=index, columns=columns)

def _get_bc_params(self):
"""Fully broadcast parameters of self, given param shapes and index, columns."""
to_broadcast = [self.mu, self.sigma, self.df]
if hasattr(self, "index") and self.index is not None:
to_broadcast += [self.index.to_numpy().reshape(-1, 1)]
if hasattr(self, "columns") and self.columns is not None:
to_broadcast += [self.columns.to_numpy()]
bc = np.broadcast_arrays(*to_broadcast)
return bc[0], bc[1], bc[2]

def mean(self):
r"""Return expected value of the distribution.
Let :math:`X` be a random variable with the distribution of `self`.
Returns the expectation :math:`\mathbb{E}[X]`. The expectation,
:math:`\mathbb{E}[X]`, as infinite if :math:`\nu \le 1`.
Returns
-------
pd.DataFrame with same rows, columns as `self`
expected value of distribution (entry-wise)
"""
mean_arr = self._mu.copy()
if (self._df <= 1).any():
mean_arr = mean_arr.astype(np.float32)
mean_arr[self._df <= 1] = np.inf
return pd.DataFrame(mean_arr, index=self.index, columns=self.columns)

def var(self):
r"""Return element/entry-wise variance of the distribution.
Let :math:`X` be a random variable with the distribution of `self`.
Returns,
.. math::
\mathbb{V}[X] = \begin{cases}
\frac{\nu}{\nu - 2} & \text{if} \nu > 2, \\
\infty & \text{if} \nu \le 2, \\
\begin{cases}
Where :math:`\nu` is the degrees of freedom of the t-distribution.
Returns
-------
pd.DataFrame with same rows, columns as `self`
variance of distribution (entry-wise)
"""
df_arr = self._df.copy()
df_arr = df_arr.astype(np.float32)
df_arr[df_arr <= 2] = np.inf
mask = (df_arr > 2) & (df_arr != np.inf)
df_arr[mask] = df_arr[mask] / (df_arr[mask] - 2)
return pd.DataFrame(df_arr, index=self.index, columns=self.columns)

def pdf(self, x):
"""Probability density function."""
d = self.loc[x.index, x.columns]
pdf_arr = gamma((d._df + 1) / 2)
pdf_arr = pdf_arr / (np.sqrt(np.pi * d._df) * gamma(d._df / 2))
pdf_arr = pdf_arr * (1 + x**2 / d._df) ** (-(d._df + 1) / 2)
return pd.DataFrame(pdf_arr, index=x.index, columns=x.columns)

def log_pdf(self, x):
"""Logarithmic probability density function."""
d = self.loc[x.index, x.columns]
lpdf_arr = loggamma((d._df + 1) / 2)
lpdf_arr = lpdf_arr - 0.5 * np.log(d._df * np.pi)
lpdf_arr = lpdf_arr - loggamma(d._df / 2)
lpdf_arr = lpdf_arr - ((d._df + 1) / 2) * np.log(1 + x**2 / d._df)
return pd.DataFrame(lpdf_arr, index=x.index, columns=x.columns)

def cdf(self, x):
"""Cumulative distribution function."""
d = self.loc[x.index, x.columns]
cdf_arr = x * gamma((d._df + 1) / 2)
cdf_arr = cdf_arr * hyp2f1(0.5, (d._df + 1) / 2, 3 / 2, -(x**2) / d._df)
cdf_arr = 0.5 + cdf_arr / (np.sqrt(np.pi * d._df) * gamma(d._df / 2))
return pd.DataFrame(cdf_arr, index=x.index, columns=x.columns)

def ppf(self, p):
"""Quantile function = percent point function = inverse cdf."""
d = self.loc[p.index, p.columns]
ppf_arr = p.to_numpy(copy=True)
ppf_arr[p.values == 0.5] = 0.0
ppf_arr[p.values <= 0] = -np.inf
ppf_arr[p.values >= 1] = np.inf

mask1 = (p.values < 0.5) & (p.values > 0)
mask2 = (p.values < 1) & (p.values > 0.5)
ppf_arr[mask1] = 1 / betaincinv(0.5 * d._df[mask1], 0.5, 2 * ppf_arr[mask1])
ppf_arr[mask2] = 1 / betaincinv(
0.5 * d._df[mask2], 0.5, 2 * (1 - ppf_arr[mask2])
)
ppf_arr[mask1 | mask2] = np.sqrt(ppf_arr[mask1 | mask2] - 1)
ppf_arr[mask1 | mask2] = np.sqrt(d._df[mask1 | mask2]) * ppf_arr[mask1 | mask2]
ppf_arr[mask1] = -ppf_arr[mask1]
return pd.DataFrame(ppf_arr, index=p.index, columns=p.columns)

@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator."""
params1 = {"mu": [[0, 1], [2, 3], [4, 5]], "sigma": 1}
params2 = {
"mu": 0,
"sigma": 1,
"index": pd.Index([1, 2, 5]),
"columns": pd.Index(["a", "b"]),
}
return [params1, params2]

0 comments on commit 9f20201

Please sign in to comment.