Source code for ugtm.ugtm_core

"""Core linear algebra operations for GTM and kGTM
"""
# Authors: Helena A. Gaspar <hagax8@gmail.com>
# License: MIT

from __future__ import print_function
import numpy as np
from scipy.spatial import distance
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.preprocessing import scale


[docs] def createYMatrixInit(data, matW, matPhiMPlusOne): r"""Creates initial manifold matrix (Y). Parameters ========== data : array of shape (n_individuals, n_dimensions) Data matrix. matW : array of shape (n_dimensions, n_rbf_centers+1) Parameter matrix (PCA-initialized). matPhiMPlusOne : array of shape (n_nodes, n_rbf_centers+1) RBF matrix plus one dimension to include a term for bias. Returns ======= array of shape (n_dimensions, n_nodes) Manifold in n-dimensional space (projection of matX in data space); A point matY[:,i] is a center of Gaussian component in data space. :math:`\mathbf{Y}=\mathbf{W}\mathbf{\Phi}^T` """ shap1 = matW.shape[0] shap2 = matPhiMPlusOne.shape[0] TheMeans = data.mean(0) DMmeanMatrix = np.zeros([shap1, shap2]) for i in range(shap1): for j in range(shap2): DMmeanMatrix[i, j] = TheMeans[i] MatY = np.dot(matW, np.transpose(matPhiMPlusOne)) MatY = MatY + DMmeanMatrix return(MatY)
[docs] def createPhiMatrix(matX, matM, numX, numM, sigma): r"""Creates matrix of RBF functions. Parameters ========== matX : array of shape (n_nodes, 2) Coordinates of nodes defining a grid in the 2D space. matM : array of shape (n_rbf_centers, 2) Coordinates of radial basis function (RBF) centers, defining a grid in the 2D space. numX : int Number of nodes (n_nodes). numM : int Number of RBF centers (n_rbf_centers) sigma : float RBF width factor. Returns ======= array of shape (n_nodes, n_rbf_centers+1) RBF matrix plus one dimension to include a term for bias. """ Result = np.zeros([numX, numM + 1]) for i in range(numX): for j in range(numM): Coo1 = (matX[i][0] - matM[j][0]) * (matX[i][0] - matM[j][0]) Coo2 = (matX[i][1] - matM[j][1]) * (matX[i][1] - matM[j][1]) Distance = Coo1 + Coo2 Result[i, j] = np.exp(-(Distance) / (2 * sigma)) for i in range(numX): Result[i][numM] = 1 return(Result)
[docs] def computeWidth(matM, numM, sigma): r"""Initializes radial basis function width using hyperparameter sigma. Parameters ========== matM : array of shape (n_rbf_centers, 2) Coordinates of radial basis function (RBF) centers, defining a grid in the 2D space. numM : int Number of RBF centers (n_rbf_centers) sigma : float RBF width factor. Returns ======= float Initial radial basis function (RBF) width. """ Result = 0.0 if numM <= 1: return(sigma) else: Distances = np.zeros([numM, numM]) mins = np.zeros([numM, 1]) maxs = np.zeros([numM, 1]) for i in range(numM): for j in range(numM): Coo1 = (matM[i][0] - matM[j][0]) * (matM[i][0] - matM[j][0]) Coo2 = (matM[i][1] - matM[j][1]) * (matM[i][1] - matM[j][1]) Distances[i, j] = Coo1 + Coo2 for i in range(numM): mins[i] = np.min(Distances[i][np.nonzero(Distances[i])]) for i in range(numM): maxs[i] = np.max(Distances[i][np.nonzero(Distances[i])]) if (sigma > 0.0): Result = sigma * np.mean(mins) else: Result = np.max(maxs) return(Result)
[docs] def createWMatrix(matX, matPhiMPlusOne, matU, n_dimensions, n_rbf_centers): r"""Creates PCA-initialized parameter matrix W. Parameters ========== matX : array of shape (n_nodes, 2) Coordinates of nodes defining a grid in the 2D space. matPhiMPlusOne: array of shape (n_nodes, n_rbf_centers+1) RBF matrix plus one dimension to include a term for bias. matU : array of shape (n_dimensions, 2) 2 first principal axes of data covariance matrix. n_dimensions: int Data space dimensionality (number of variables). n_rbf_centers : int Number of RBF centers. sigma : float RBF width factor. Returns ======= array of shape (n_dimensions, n_rbf_centers+1) Parameter matrix W (PCA-initialized). """ NormX = scale(matX, axis=0, with_mean=True, with_std=True) myProd = np.dot(matU, np.transpose(NormX)) tinv = np.linalg.solve(matPhiMPlusOne.T.dot( matPhiMPlusOne), matPhiMPlusOne.T) Result = np.dot(myProd, np.transpose(tinv)) return(Result)
[docs] def createYMatrix(matW, matPhiMPlusOne): r"""Updates manifold matrix (Y) using new parameter matrix (W). Parameters ========== matW : array of shape (n_dimensions, n_rbf_centers+1) Parameter matrix (PCA-initialized). matPhiMPlusOne : array of shape (n_nodes, n_rbf_centers+1) RBF matrix plus one dimension to include a term for bias. Returns ======= array of shape (n_dimensions, n_nodes) Manifold in n-dimensional space (projection of matX in data space); A point matY[:,i] is a center of Gaussian component in data space. :math:`\mathbf{Y}=\mathbf{W}\mathbf{\Phi}^T` """ Result = np.dot(matW, np.transpose(matPhiMPlusOne)) return(Result)
[docs] def createDistanceMatrix(matY, data): r"""Computes distances between manifold centers and data vectors. Parameters ========== matY : array of shape (n_dimensions, n_nodes) Manifold in n-dimensional space (projection of matX in data space); data : array of shape (n_individuals, n_dimensions) Data matrix. Returns ======= array of shape (n_nodes, n_individuals) Matrix of squared Euclidean distances between manifold and data. """ Result = distance.cdist(matY.T, data, metric='sqeuclidean') return(Result)
[docs] def KERNELcreateDistanceMatrix(data, matL, matPhiMPlusOne): r"""Computes distances between data and manifold for kernel algorithm. Parameters ========== data : array of shape (n_individuals, n_dimensions) Data matrix. matL : array of shape (n_individuals, n_rbf_centers+1) Parameter matrix (regul). matPhiMPlusOne: array of shape (n_nodes, n_rbf_centers+1) RBF matrix plus one dimension to include a term for bias. Returns ======= array of shape (n_nodes, n_individuals) Matrix of distances between manifold and data. """ n_nodes = matPhiMPlusOne.shape[0] n_individuals = data.shape[0] Result = np.zeros([n_nodes, n_individuals]) thefloat = 0.0 for i in range(n_nodes): LPhim = np.dot(matL, matPhiMPlusOne[i]) thefloat = np.dot(np.dot(LPhim, data), LPhim) for j in range(n_individuals): Result[i, j] = data[j, j] + thefloat - 2*(np.dot(data[j], LPhim)) return(Result)
[docs] def exp_normalize(x): r"""Exp-normalize trick: compute exp(x-max(x)) Parameters ========== 2D array An array x Returns ======= 2D array y = exp(x-max(x)) """ y = np.array([], dtype=np.longdouble) y = x - np.expand_dims(np.max(x, axis=0), 0) y = np.exp(y) return(y)
[docs] def createPMatrix(matD, betaInv, n_dimensions): r"""Computes data distribution matrix = exp(-(parameter)*distances). Parameters ========== matD : array of shape (n_nodes, n_individuals) Matrix of squared Euclidean distances between manifold and data. betaInv : float Noise variance parameter for the data distribution. Written as :math:`\beta^{-1}` in the original paper. n_dimensions : int Data space dimensionality (number of variables). Returns ======= array of shape (n_nodes, n_individuals) Data distribution with variance betaInv (transformed: exp(x-max(x))) Notes ===== Important: this data distribution is not exact per se and is to be used as input for createRMatrix (responsibilities). """ matP = np.array([], dtype=np.longdouble) beta = 1/betaInv # exp_normalize computes exp(x-max(x)) to avoid overflow for createRMatrix matP = exp_normalize(-(beta/2)*matD) return(matP)
[docs] def createRMatrix(matP): r"""Computes responsibilities (posterior probabilities). Parameters ========== matP : array of shape (n_nodes, n_individuals) Data distribution with variance betaInv (transformed: exp(x-max(x))) Returns ======= array of shape (n_nodes, n_individuals) Posterior probabilities (responsibilities). """ matR = np.array([], dtype=np.longdouble) sums = np.sum(matP, axis=0) matR = (matP) / (sums[None, :]) return(matR)
[docs] def createGMatrix(matR): r"""Creates the G diagonal matrix from responsibilities (R) Parameters ========== matR : array of shape (n_nodes, n_individuals) Posterior probabilities (responsibilities). Returns ======= array of shape (n_nodes, n_nodes) Diagonal matrix with elements :math:`G_{ii}=\sum_{n}^{n\_individuals} R_{in}`. """ sums = np.sum(matR, axis=1) matG = np.diag(sums) return(matG)
[docs] def optimWMatrix(matR, matPhiMPlusOne, matG, data, betaInv, regul): r"""Updates parameter matrix W. Parameters ========== matR : array of shape (n_nodes, n_individuals) Posterior probabilities (responsibilities). matPhiMPlusOne: array of shape (n_nodes, n_rbf_centers+1) RBF matrix plus one dimension to include a term for bias. matG : array of shape (n_nodes, n_nodes) Diagonal matrix with elements :math:`G_{ii}=\sum_{n}^{n\_individuals} R_{in}`. data : array of shape (n_individuals, n_dimensions) Data matrix. betaInv : float Noise variance parameter for the data distribution. Written as :math:`\beta^{-1}` in the original paper. regul : float Regularization coefficient. Returns ======= array of shape (n_dimensions, n_rbf_centers+1) Updated parameter matrix W. """ n_rbf_centersP = matPhiMPlusOne.shape[1] LBmat = np.zeros([n_rbf_centersP, n_rbf_centersP]) PhiGPhi = np.dot( np.dot(np.transpose(matPhiMPlusOne), matG), matPhiMPlusOne) for i in range(n_rbf_centersP): LBmat[i][i] = regul * betaInv PhiGPhiLB = PhiGPhi + LBmat Ginv = np.linalg.inv(PhiGPhiLB) matW = np.transpose( np.dot(np.dot(np.dot(Ginv, np.transpose(matPhiMPlusOne)), matR), data)) return(matW)
[docs] def optimWMatrixAcc(g_vec, RT_acc, matPhiMPlusOne, betaInv, regul): r"""Updates W from block-accumulated sufficient statistics (iGTM M-step). Parameters ========== g_vec : array of shape (n_nodes,) Accumulated row sums of R across all blocks: :math:`\sum_b \mathbf{R}_b \mathbf{1}`. RT_acc : array of shape (n_nodes, n_dimensions) Accumulated R @ X across all blocks: :math:`\sum_b \mathbf{R}_b \mathbf{X}_b`. matPhiMPlusOne : array of shape (n_nodes, n_rbf_centers+1) RBF matrix plus bias column. betaInv : float Noise variance parameter. regul : float Regularization coefficient. Returns ======= array of shape (n_dimensions, n_rbf_centers+1) Updated parameter matrix W. Notes ===== Equivalent to :func:`~ugtm.ugtm_core.optimWMatrix` but avoids forming the full N×N responsibility matrix by using pre-accumulated statistics. :math:`\mathbf{\Phi}^T \mathbf{G} \mathbf{\Phi}` is computed as :math:`(\mathbf{\Phi} \odot \mathbf{g})^T \mathbf{\Phi}` to avoid an explicit n_nodes×n_nodes diagonal matrix. """ n_rbf_centersP = matPhiMPlusOne.shape[1] PhiGPhi = (matPhiMPlusOne * g_vec[:, None]).T @ matPhiMPlusOne LBmat = np.eye(n_rbf_centersP) * (regul * betaInv) Ginv = np.linalg.inv(PhiGPhi + LBmat) matW = (Ginv @ matPhiMPlusOne.T @ RT_acc).T return matW
[docs] def optimLMatrix(matR, matPhiMPlusOne, matG, betaInv, regul): r"""Updates parameter matrix regul for kernel GTM. Parameters ========== matR : array of shape (n_nodes, n_individuals) Posterior probabilities (responsibilities). matPhiMPlusOne: array of shape (n_nodes, n_rbf_centers+1) RBF matrix plus one dimension to include a term for bias. matG : array of shape (n_nodes, n_nodes) Diagonal matrix with elements :math:`G_{ii}=\sum_{n}^{n\_individuals} R_{in}`. betaInv : float Noise variance parameter for the data distribution. Written as :math:`\beta^{-1}` in the original paper. regul : float Regularization coefficient. Returns ======= array of shape (n_individuals, n_rbf_centers+1) Updated parameter matrix regul. """ n_rbf_centersP = matPhiMPlusOne.shape[1] LBmat = np.zeros([n_rbf_centersP, n_rbf_centersP]) PhiGPhi = np.dot( np.dot(np.transpose(matPhiMPlusOne), matG), matPhiMPlusOne) for i in range(n_rbf_centersP): LBmat[i][i] = regul * betaInv PhiGPhiLB = PhiGPhi + LBmat Ginv = np.linalg.inv(PhiGPhiLB) matW = np.transpose( np.dot(np.dot(Ginv, np.transpose(matPhiMPlusOne)), matR)) return(matW)
[docs] def optimBetaInv(matR, matD, n_dimensions): r"""Updates noise variance parameter. Parameters ========== matR : array of shape (n_nodes, n_individuals) Posterior probabilities (responsibilities). matD : array of shape (n_nodes, n_individuals) Matrix of squared Euclidean distances between manifold and data. Returns ======= float Updated noise variance parameter (:math:`\beta^{-1}`). """ n_individuals = matR.shape[1] betaInv = np.sum(np.multiply(matR, matD))/(n_individuals*n_dimensions) return(betaInv)
[docs] def meanPoint(matR, matX): r"""Computes mean positions for data points (usual GTM output). Parameters ========== matR : array of shape (n_nodes, n_individuals) Posterior probabilities (responsibilities). matX : array of shape (n_nodes, 2) Coordinates of nodes defining a grid in the 2D space. Returns ======= array of shape (n_individuals, 2) Data representation in 2D space: mean positions (usual GTM output). """ matMeans = np.dot(np.transpose(matR), matX) return(matMeans)
[docs] def modePoint(matR, matX): r"""Computes modes (nodes with maximum responsibility for each data point). Parameters ========== matR : array of shape (n_nodes, n_individuals) Posterior probabilities (responsibilities). matX : array of shape (n_nodes, 2) Coordinates of nodes defining a grid in the 2D space. Returns ======= array of shape (n_individuals, 2) Data representation in 2D space: modes (nodes with max responsibility). """ matModes = matX[np.argmax(matR, axis=0), :] return(matModes)
[docs] def computelogLikelihood(matP, betaInv, n_dimensions): r"""Computes log likelihood = GTM objective function Parameters ========== matP : array of shape (n_nodes, n_individuals) Data distribution with variance betaInv (transformed: exp(x-max(x))) betaInv : float Noise variance parameter for the data distribution. Written as :math:`\beta^{-1}` in the original paper. n_dimensions: int Data space dimensionality (number of variables). Returns ======= float Log likelihood. """ LogLikelihood = np.longdouble() n_nodes = matP.shape[0] n_individuals = matP.shape[1] LogLikelihood = 0.0 prior = 1.0/n_nodes placeholder = 50 # this constant was not introduced in matP calculation # and is re-introduced here constante = np.longdouble(np.power(((1/betaInv)/(2*np.pi)), np.minimum( n_dimensions/2, placeholder))) LogLikelihood = np.sum(np.log( np.maximum(np.sum(constante*matP, axis=0)*prior, np.finfo(np.longdouble).tiny))) LogLikelihood /= n_individuals return(-LogLikelihood)
[docs] def evalBetaInv(matY, betaInv, random_state=1234): r"""Decides which value to use for initial noise variance parameter. Parameters ========== matY : array of shape (n_dimensions, n_nodes) Manifold in n-dimensional space (projection of matX in data space); betaInv : float The 3rd eigenvalue of the data covariance matrix. random_state : int, optional Random state used to initialize BetaInv randomly in case of bad initialization. Returns ======= float Noise variance parameter for the data distribution (betaInv). Written as :math:`\beta^{-1}` in the original paper. Initialized to be the larger between: (1) the 3rd eigenvalue of the data covariance matrix (function parameter), (2) half the average distance between centers of Gaussian components. In case of bad initialization (betaInv = 0), betaInv is set to a random value (a message would then be displayed on screen). """ r = np.random.RandomState(random_state) Distances = euclidean_distances(np.transpose(matY)) myMin = np.mean(Distances)/2 myMin *= myMin if((myMin < betaInv) or (betaInv == 0)): betaInv = myMin if betaInv == 0.0: print("bad initialization (0 variance), " "setting variance to random number...") betaInv = abs(r.uniform(-1, 1, size=1)) return(betaInv)
[docs] def initBetaInvRandom(matD, n_nodes, n_individuals, n_dimensions): r"""Computes initial noise variance parameter for kernel GTM. Parameters ========== matD : array of shape (n_nodes, n_individuals) Matrix of squared Euclidean distances between manifold and data. n_nodes : int The number of nodes defining a grid in the 2D space. n_individuals : int The number of data instances. n_dimensions : int Data space dimensionality (number of variables). Returns ======= float Noise variance parameter for the data distribution (betaInv). Written as :math:`\beta^{-1}` in the original paper. """ betaInv = np.sum(matD*1/n_nodes)/(n_individuals*n_dimensions) return(betaInv)