{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6" }, "colab": { "name": "Gaussian_Mixture_Model.ipynb", "provenance": [] } }, "cells": [ { "cell_type": "markdown", "metadata": { "id": "nTKnq86xzpLl" }, "source": [ "# Gaussian Mixture Model and the EM algorithm\n", "\n", "Code implementing the process of clustering via the Gaussian mixture model, using the expectation maximization (EM) algorithm (see chapter 10 of [Bishop's PRML](https://www.microsoft.com/en-us/research/people/cmbishop/prml-book/)). " ] }, { "cell_type": "code", "metadata": { "collapsed": true, "id": "kMhxOfOrzpLv" }, "source": [ "import numpy as np\n", "from scipy.optimize import minimize\n", "from matplotlib import pyplot as plt\n", "\n", "\n", "# Defining epsilon for limits\n", "eps = 1e-6\n", " \n", "# Configuring matplotlib\n", "import matplotlib.pyplot as plt\n", "plt.rcParams[\"figure.figsize\"] = (8,8)\n", "plt.rcParams['axes.labelsize'] = 14\n", "plt.rcParams['xtick.labelsize'] = 12\n", "plt.rcParams['ytick.labelsize'] = 12\n", "plt.rcParams['lines.linewidth'] = 3.0\n", "plt.style.use('dark_background')\n", "plt.rcParams[\"image.cmap\"] = 'spring'\n", "\n", "# Fix np.random seed for replicability\n", "np.random.seed(0)" ], "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": { "id": "K3MOPa3SzpLw" }, "source": [ "# Setting and Model\n", "\n", "## Input\n", "* $N \\in \\mathbb{N}$: number of data points, and\n", "* $d \\in \\mathbb{N}$: dimension of each data point\n", "* Input data: $x_0, x_1, \\ldots , x_{N-1} \\in \\mathbb{R}^d$. \n", "* Input data represented as matrix $X$, where $X_{n,i}$ is $i$-th component of $x_n$.\n", "\n", "## Gaussian Mixture Model\n", "\n", "The pdf of $N$ data points drawn from a Gaussian mixture model with $K \\in \\mathbb{N}$ components is given by\n", "\n", "\\begin{align}\n", " p\\left(X,Z \\middle| \\mu, \\Sigma, \\pi \\right) = \n", " \\prod_{n=0}^{N-1} \\prod_{k=0}^{K-1} \n", " \\left[\\pi_k \\mathcal{N}\\left(x_n \\middle| \\mu_k , \\Sigma_k \\right) \\right]^{z_{n,k}}\n", "\\end{align}\n", "\n", "where \n", "* $z_{n,k}\\in \\{0,1 \\}, \\sum_{k=0}^{K-1}z_{n,k}=1$ denotes the latent cluster vector for data point $n$ \n", "* $\\pi_k \\geq 0, \\sum_{k=0}^{K-1} \\pi_k = 1$ are the cluster probabilities\n", "* $\\mathcal{N}\\left(x \\middle| \\mu_k , \\Sigma_k \\right)$ is a gaussian pdf with mean $\\mu_k \\in \\mathbb{R}^d$ and covariance matrix $\\Sigma_k$\n", "\n", "Marginalizing over $Z$, we get\n", "\n", "\n", "\\begin{align}\n", " p \\left( X \\middle| \\mu, \\Sigma, \\pi \\right) =\n", " \\prod_{n=0}^{N-1} \\left[ \\sum_{k=0}^{K-1} \\pi_k \\mathcal{N}\\left(x_n \\middle| \\mu_k, \\Sigma_k \\right) \\right]\n", "\\end{align}\n", "\n" ] }, { "cell_type": "code", "metadata": { "collapsed": true, "id": "MGzEyvvzzpL6" }, "source": [ "class GaussianMixtureModel:\n", " def __init__(self, K):\n", " self.K = K\n", " self.Pi = None\n", " self.Mu = None\n", " self.Sigma = None\n", " \n", " def _init_params(self, X, random_state=None):\n", " '''\n", " Method for initializing model parameterse based on the size and variance of the input data array. \n", " \n", " Parameters\n", " ----------\n", " X : 2D numpy array\n", " 2-D numpy array representing input data, where X[n, i] represents the i-th element of n-th point in X.\n", " '''\n", " n_samples, n_features = np.shape(X)\n", " rnd = np.random.RandomState(seed=random_state)\n", " \n", " self.Pi = np.ones(self.K)/self.K\n", " self.Mu = X[rnd.choice(n_samples, size=self.K, replace=False)]\n", " self.Sigma = np.tile(np.diag(np.var(X, axis=0)), (self.K, 1, 1))\n", "\n", " \n", " def _calc_nmat(self, X):\n", " '''\n", " Method for calculating array corresponding $\\mathcal{N}(x_n | \\mu_k)$\n", " \n", " Parameters\n", " ----------\n", " X : 2D numpy array\n", " 2-D numpy array representing input data, where X[n, i] represents the i-th element of n-th point in X.\n", " \n", " Returns\n", " ----------\n", " Nmat : 2D numpy array\n", " 2-D numpy array representing probability density for each sample and each component, \n", " where Nmat[n, k] = $\\mathcal{N}(x_n | \\mu_k)$.\n", " \n", " '''\n", " n_samples, n_features = np.shape(X)\n", " \n", " Diff = np.reshape(X, (n_samples, 1, n_features) ) - np.reshape(self.Mu, (1, self.K, n_features) )\n", " L = np.linalg.inv(self.Sigma)\n", " exponent = np.einsum(\"nkj,nkj->nk\", np.einsum(\"nki,kij->nkj\", Diff, L), Diff)\n", " Nmat = np.exp(-0.5*exponent)/np.sqrt(np.linalg.det(self.Sigma)) / (2*np.pi)**(n_features/2)\n", " return Nmat\n", " \n", " def _Estep(self, X):\n", " '''\n", " Method for calculating the array corresponding to responsibility.\n", " \n", " Parameters\n", " ----------\n", " X : 2D numpy array\n", " 2-D numpy array representing input data, where X[n, i] represents the i-th element of n-th point in X.\n", " \n", " Returns\n", " ----------\n", " Gam : 2D numpy array\n", " 2-D numpy array representing responsibility of each component for each sample in X, \n", " where Gamt[n, k] = $\\gamma_{n, k}$.\n", " \n", " '''\n", " n_samples, n_features = np.shape(X)\n", " Nmat = self._calc_nmat(X)\n", " tmp = Nmat * self.Pi\n", " Gam = tmp/np.reshape(np.sum(tmp, axis=1), (n_samples, 1) )\n", " return Gam\n", " \n", " def _Mstep(self, X, Gam):\n", " '''\n", " Method for calculating the model parameters based on the responsibility gamma.\n", " \n", " Parameters\n", " ----------\n", " X : 2D numpy array\n", " 2-D numpy array representing input data, where X[n, i] represents the i-th element of n-th point in X.\n", " Gam : 2D numpy array\n", " 2-D numpy array representing responsibility of each component for each sample in X, \n", " where Gamt[n, k] = $\\gamma_{n, k}$.\n", " '''\n", " n_samples, n_features = np.shape(X)\n", " Diff = np.reshape(X, (n_samples, 1, n_features) ) - np.reshape(self.Mu, (1, self.K, n_features) )\n", " Nk = np.sum(Gam, axis=0)\n", " self.Pi = Nk/n_samples\n", " self.Mu = Gam.T @ X / np.reshape(Nk, (self.K, 1))\n", " self.Sigma = np.einsum(\"nki,nkj->kij\", np.einsum(\"nk,nki->nki\", Gam, Diff), Diff) / np.reshape(Nk, (self.K, 1, 1))\n", " \n", " def calc_prob_density(self, X):\n", " '''\n", " Method for calculating the probablity density $\\sum_k \\pi_k \\mathcal{N}(x_n | \\mu_k)$\n", " \n", " Parameters\n", " ----------\n", " X : 2D numpy array\n", " 2-D numpy array representing input data, where X[n, i] represents the i-th element of n-th point in X.\n", " \n", " Returns\n", " ----------\n", " prob_density : 2D numpy array\n", "\n", " '''\n", " prob_density = self._calc_nmat(X) @ self.Pi\n", " return prob_density\n", " \n", " \n", " def calc_log_likelihood(self, X):\n", " '''\n", " Method for calculating the log-likelihood for the input X and current model parameters.\n", " \n", " Parameters\n", " ----------\n", " X : 2D numpy array\n", " 2-D numpy array representing input data, where X[n, i] represents the i-th element of n-th point in X.\n", " Returns\n", " ----------\n", " loglikelihood : float\n", " The log-likelihood of the input data X with respect to current parameter set.\n", " \n", " '''\n", " log_likelihood = np.sum(np.log(self.calc_prob_density(X)))\n", " return log_likelihood\n", " \n", " \n", " def fit(self, X, max_iter, tol, disp_message, random_state=None):\n", " '''\n", " Method for performing learning. \n", " \n", " Parameters\n", " ----------\n", " X : 2D numpy array\n", " 2-D numpy array representing input data, where X[n, i] represents the i-th element of n-th point in X.\n", " max_iter : int\n", " Maximum number of iteration\n", " tol : float, positive\n", " Precision. If the change of parameter is below this value, the iteration is stopped\n", " disp_message : Boolean\n", " Whether or not to show the message about the number of iteration\n", " '''\n", " self._init_params(X, random_state=random_state)\n", " log_likelihood = - np.float(\"inf\")\n", " \n", " for i in range(max_iter):\n", " Gam = self._Estep(X)\n", " self._Mstep(X, Gam)\n", " log_likelihood_old = log_likelihood\n", " log_likelihood = self.calc_log_likelihood(X)\n", " \n", " if log_likelihood - log_likelihood_old < tol:\n", " break\n", " if disp_message:\n", " print(f\"n_iter : {i+1}\")\n", " print(f\"log_likelihood : {log_likelihood}\")\n", " \n", " def predict_proba(self, X):\n", " '''\n", " Method for calculating the array corresponding to responsibility. Just a different name for _Estep\n", " \n", " Parameters\n", " ----------\n", " X : 2D numpy array\n", " 2-D numpy array representing input data, where X[n, i] represents the i-th element of n-th point in X.\n", " \n", " Returns\n", " ----------\n", " Gam : 2D numpy array\n", " 2-D numpy array representing responsibility of each component for each sample in X, \n", " where Gamt[n, k] = $\\gamma_{n, k}$.\n", " \n", " '''\n", " Gam = self._Estep(X)\n", " return Gam\n", " \n", " def predict(self, X):\n", " '''\n", " Method for make prediction about which cluster input points are assigned to.\n", " \n", " Parameters\n", " ----------\n", " X : 2D numpy array\n", " 2-D numpy array representing input data, where X[n, i] represents the i-th element of n-th point in X.\n", " \n", " Returns\n", " ----------\n", " pred : 1D numpy array\n", " 1D numpy array, with dtype=int, representing which class input points are assigned to.\n", " '''\n", " pred = np.argmax(self.predict_proba(X), axis=1)\n", " return pred\n", " " ], "execution_count": null, "outputs": [] }, { "cell_type": "code", "metadata": { "collapsed": true, "id": "8Txq0agtzpMB" }, "source": [ "def get_meshgrid(x, y, nx, ny, margin=0.1):\n", " x_min, x_max = (1 + margin) * x.min() - margin * x.max(), (1 + margin) * x.max() - margin * x.min()\n", " y_min, y_max = (1 + margin) * y.min() - margin * y.max(), (1 + margin) * y.max() - margin * y.min()\n", " xx, yy = np.meshgrid(np.linspace(x_min, x_max, nx),\n", " np.linspace(y_min, y_max, ny))\n", " return xx, yy" ], "execution_count": null, "outputs": [] }, { "cell_type": "code", "metadata": { "scrolled": false, "colab": { "base_uri": "https://localhost:8080/", "height": 486 }, "id": "620KSOcQzpMB", "outputId": "bf4a3578-f084-400b-fdd7-e778c137d48a" }, "source": [ "from sklearn.datasets import make_blobs\n", "\n", "X, y = make_blobs(centers = [[0,0],[3,1],[1,3]], n_features=2, n_samples=100,cluster_std=[0.5, 0.5, 0.5], random_state=1)\n", "plt.scatter(X[:,0], X[:,1], c=y)\n", "\n", "xx, yy = get_meshgrid(X[:, 0], X[:, 1], nx=100, ny=100, margin=0.1)" ], "execution_count": null, "outputs": [ { "output_type": "display_data", "data": { "image/png": "\n", "text/plain": [ "