Source code for pyphase.phase

# -*- coding: utf-8 -*-
r"""Module pyphase.phase.

This module allows you to work with Phase-Type distributions.

The CDF for PH variable is given by

.. math:: F(t) = 1 - \boldsymbol{\alpha} e^{\boldsymbol{A} t} \boldsymbol{1} \qquad x \geq 0,

where :math:`\boldsymbol{\alpha}` is a probability vector and the matrix :math:`\boldsymbol{A}` has
the transition rates between the phases.

To build a PH variable you need the vectoe ``alpha`` and matrix ``A``

.. doctest::

    >>> from pyphase import *
    >>> alpha = [0.1 , 0.9]
    >>> A = [[-2.0 , 1.0], [1.5, -3.0]]
    >>> v = phase(alpha, A)
    >>> print(v) #doctest: +REPORT_NDIFF
    PhaseType:
      alpha = [[0.1 0.9]]
      A     = [[-2.   1. ]
               [ 1.5 -3. ]]

After you have the variable you can do all operations that are typical with random variables in scipy.stats::

    >>> print(f"The mean is {v.mean():.3f}")
    The mean is 0.789
    >>> print(f"v.cdf(1.0)={v.cdf(1.0):.3f}")
    v.cdf(1.0)=0.721"""

import numpy as np
from numpy import matlib as ml
from scipy import linalg
from scipy.special import binom
from scipy.stats import rv_continuous
from scipy.stats._distn_infrastructure import rv_frozen


def _solve_vector_A(vector, A):
    # Computes vector * A^(1), without actually inverting the matrix, by solving x * A = vector
    return np.linalg.solve(A.T, vector.T).T


class _PhaseTypeGen(rv_continuous):
    def _set_pars(self, alpha, A):
        alpha = ml.mat(alpha)
        A = ml.mat(A)
        self.alpha = alpha
        self.A = A
        self.alphaAi = None  # alpha * A^(-1), computed lazily
        self.erd = None  # Equilibrium residual distribution

    def _cdf1(self, x):
        return 1.0 - np.sum(self.alpha @ linalg.expm(self.A.A * x), axis=1) if x > 0 else 0.0

    def _cdf(self, x):
        return np.vectorize(self._cdf1)(x)

    def _pdf1(self, x):
        res = self.alpha @ linalg.expm(self.A.A * x) @ self._get_a() if x >= 0 else 0.0
        return res

    def _pdf(self, x):
        return np.vectorize(self._pdf1)(x)

    def _get_a(self):
        return -np.sum(self.A, axis=1)

    def _get_alpha_Ai(self):
        # Computes vector * A^(1), without actually inverting the matrix, by solving x * A = vector
        if self.alphaAi is None:
            self.alphaAi = _solve_vector_A(self.alpha, self.A)
        return self.alphaAi

    def _get_erd(self):
        if self.erd is None:
            gamma = 1 / self.mean()
            self.erd = phase(-gamma * self._get_alpha_Ai(), self.A)
        return self.erd

    def _moments(self, k=None):
        if k is None:
            k = 2
        moms = list()
        left = self.alpha
        for i in range(1, k + 1):
            left = -i * _solve_vector_A(left, self.A)
            moms.append(np.sum(left.flatten()))
        return moms

    def _stats(self):
        # Mean(‘m’), variance(‘v’), skew(‘s’), and/or kurtosis(‘k’).
        moments = self._moments(4)
        moments = np.concatenate(([1.0], moments))
        mean = moments[1]
        centered_moments = {
            n: sum([binom(n, k) * moments[k] * (-mean) ** (n - k) for k in range(0, n + 1)]) for n in range(1, 5)
        }
        var = centered_moments[2]
        sd = np.sqrt(var)
        skew = centered_moments[3] / sd ** 3
        # kurtosis is fourth central moment / variance**2 - 3
        kurt = centered_moments[4] / (var * var) - 3.0
        return mean, var, skew, kurt


[docs]class PhaseType(rv_frozen): """ Represent a Phase-Type distribution. Users should not call it directly but rather call ``phase(alpha, A)`` """ def __init__(self, alpha, A, dist, *args, **kwds): super(PhaseType, self).__init__(dist, *args, **kwds) self.alpha = ml.mat(alpha) self.A = ml.mat(A) self.dist._set_pars(alpha, A) def _pre_append(self, stg, pre): lines = stg.splitlines() result = [lines[0]] for line in lines[1:]: result.append(pre + line) return "\n".join(result) def __str__(self): options = {"precision": 2} alpha_stg = np.array2string(self.alpha, **options) A_stg = np.array2string(self.A, **options) return "PhaseType:\n" + f" alpha = {alpha_stg}\n" + f" A = {self._pre_append(A_stg,' ')}" def __repr__(self): return f"PH({self.alpha.__repr__()}, {self.A.__repr__()})"
[docs] def params(self): r""" Returns the parameters of this PH distribution. Returns ------- tuple (:math:`\pmb\alpha, \pmb A, \pmb a=- \pmb A \pmb 1`, dimension) """ return self.alpha, self.A, self.dist._get_a(), self.A.shape[0]
[docs] def loss1(self, x): r"""First order loss function. For a random variable C teh first order loss function is defined as .. math:: L(x) = [E(X-x)^+] Parameters ---------- x: float The value at which the first order distribution is evaluated Returns ------- float Value of the loss function at x. """ return self.mean() * self.dist._get_erd().sf(x)
[docs] def equilibrium_pi(self): r""" Return the equilibrium distribution of the associated PH distribution. In other word, it finds the vector \pi that solves .. math :: \pmb \pi = \pmb\pi(\pmb A + \pmb \alpha \pmb a), \qquad \pmb\pi\pmb 1=1 Returns ------- array vector :math"`\pmb\pi` that solves the equilibrium equations. """ n = self.A.shape[0] a = self.dist._get_a() matrix = self.A + a @ self.alpha + np.ones((n, n)) return np.linalg.solve(matrix.T, np.ones(n).T).T
[docs]def phase(alpha, A): r""" Creates a new PH variable. This method builds a PhaseType object by given the array :math:`\pmb\alpha` and matrix :math:`\pmb A`. The CDF for PH variable is given .. math:: F(t) = 1 - \boldsymbol{\alpha} e^{\boldsymbol{A} t} \boldsymbol{1} \qquad x \geq 0 Parameters ---------- alpha: array of float The initial probabilities vector. It must satisfy :math:`\sum_i \alpha_i \leq 1`. A: matrix of float The transition matrix between phases. Returns ------- PH An object representing this distribution. """ generator = _PhaseTypeGen(name="phase") frozen = generator.__call__() return PhaseType(alpha, A, frozen.dist)
[docs]def ph_expon(lambd=None, mean=None): """ Builds an exponential distribution represented as PH. If mean is provided then lambda = 1/mean. Parameters ---------- lambd : float, optional Rate value. One between lambd an mean must be given. mean : float, optional Mean value. One between lambd an mean must be given. Returns ------- PH PH object. """ return ph_erlang(1, lambd, mean)
[docs]def ph_erlang(n, lambd=None, mean=None): """ Builds an Erlang(n,lmbda). If mean is provided then lambda = n/mean. Parameters ---------- n : int The order of this Erlang lambd : float, optional Provide only one between lambd or mean mean : float, optional Provide only one between lambd or mean Returns ------- PH PH representation for an Erlang """ if lambd is None and mean is None: raise ValueError('"You must provide one between mean and lambda') lambd = lambd if lambd is not None else n / mean A = -lambd * np.eye(n) for i in range(n - 1): A[i, i + 1] = lambd alpha = np.zeros(n) alpha[0] = 1 return phase(alpha, A)
def _z(n1, n2): # quiclky generates zeros return np.zeros((n1, n2))
[docs]def ph_sum(ph1, ph2): """ Produces a new PH that is the sum of the given PH variables. Parameters ---------- ph1: PH The first variable. ph2: PH The second variable. Returns ------- PH The resulting PH variable. """ alpha1, A1, a1, n1 = ph1.params() alpha2, A2, a2, n2 = ph2.params() alpha = np.block([alpha1, _z(1, n2)]) A = np.block([[A1, a1 * alpha2], [_z(n2, n1), A2]]) return phase(alpha, A)
[docs]def ph_mix(ph1, ph2, p1): """ Produces a PH variable thet is a mixture of the two given PH variables. Parameters ---------- ph1: PH The first variable. ph2: PH The second variable. p1: float Probability of choosing the first variable. 0 <= p1 <= 1 Returns ------- PH The resulting PH variable. """ alpha1, A1, a1, n1 = ph1.params() alpha2, A2, a2, n2 = ph2.params() alpha = np.block([p1 * alpha1, (1 - p1) * alpha2]) A = np.block([[A1, _z(n1, n2)], [_z(n2, n1), A2]]) return phase(alpha, A)