之前写过一篇博客讲述极大似然方法, 这一方法通常适用于知道观测数据
,求解模型参数
的场合,即
。
但是,在更多场合除了模型参数是未知的外,还有隐变量
也是未知的,即
。多个隐藏模型的混合,会使得普通的极大似然方法用起来不是那么方便,比如求解高斯混合模型(GMM), 隐马尔可夫模型(HMM),等。这种时候,就会发觉EM算法是机器学习领域中绕不过去的一道坎。
这里,主要基于《统计学习方法》里对于高斯混合模型的讲解,实现EM算法,并给出代码和注释。
在实现模型之前,观察过其他人写的代码,包括sklearn内部的代码,相比较而言,这次的实现封装的更好,使用会更方便,结果也不错。
先描述一下实现的原理,再说代码
一、EM算法解高斯混合模型
假设观测数据
由3高斯混合模型生成,即
其中
, 我们用EM算法估计高斯混合模型的参数
1.0 EM算法:初始化模型参数
1.1 EM算法的E步:确定Q函数
混合高斯模型的Q函数为:
这里需要计算
,记为
是当前模型参数下第
个观测数据来自第
个分模型的概率,称为分模型
对观测数据
的影响度
1.2EM算法的M步:更新模型参数
M步极大化Q函数,更新模型参数如下:
重复E步和M步直到模型收敛
二、EM算法求解高斯混合模型
from sklearn.datasets import load_digits, load_iris
from scipy.stats import multivariate_normal
from sklearn import preprocessing
from sklearn.metrics import accuracy_score
import numpy as np
class GMM_EM():
def __init__(self,n_components,max_iter = 1000,error = 1e-6):
self.n_components = n_components #混合模型由几个gauss模型组成
self.max_iter = max_iter #最大迭代次数
self.error = error #收敛误差
self.samples = 0
self.features = 0
self.alpha = [] #存储模型权重
self.mu = [] #存储均值
self.sigma = [] #存储标准差
def _init(self,data): #初始化参数
np.random.seed(7)
self.mu = np.array(np.random.rand(self.n_components, self.features))
self.sigma = np.array([np.eye(self.features)/self.features] * self.n_components)
self.alpha = np.array([1.0 / self.n_components] * self.n_components)
print(self.alpha.shape,self.mu.shape,self.sigma.shape)
def gauss(self, Y, mu, sigma): #根据模型参数和数据输出概率向量
return multivariate_normal(mean=mu,cov=sigma+1e-7*np.eye(self.features)).pdf(Y)
def preprocess(self,data): # 数据预处理
self.samples = data.shape[0]
self.features = data.shape[1]
pre = preprocessing.MinMaxScaler()
return pre.fit_transform(data)
def fit_predict(self,data): #拟合数据
data = self.preprocess(data)
self._init(data)
weighted_probs = np.zeros((self.samples,self.n_components))
for i in range(self.max_iter):
prev_weighted_probs = weighted_probs
weighted_probs = self._e_step(data)
change = np.linalg.norm(weighted_probs - prev_weighted_probs)
if change < self.error:
break
self._m_step(data,weighted_probs)
return weighted_probs.argmax(axis = 1)
def _e_step(self,data): # E步
probs = np.zeros((self.samples,self.n_components))
for i in range(self.n_components):
probs[:,i] = self.gauss(data, self.mu[i,:], self.sigma[i,:,:])
weighted_probs = np.zeros(probs.shape)
for i in range(self.n_components):
weighted_probs[:,i] = self.alpha[i]*probs[:,i]
for i in range(self.samples):
weighted_probs[i,:] /= np.sum(weighted_probs[i,:])
return weighted_probs
def _m_step(self,data,weighted_probs): #M步
for i in range(self.n_components):
sum_probs_i = np.sum(weighted_probs[:,i])
self.mu[i,:] = np.sum(np.multiply(data, np.mat(weighted_probs[:, i]).T), axis=0) / sum_probs_i
self.sigma[i,:,:] = (data - self.mu[i,:]).T * np.multiply((data - self.mu[i,:]), np.mat(weighted_probs[:, i]).T) / sum_probs_i
self.alpha[i] = sum_probs_i/data.shape[0]
def predict_prob(self,data): #预测概率矩阵
return self._e_step(data)
def predict(self,data): #输出类别
return self._e_step(data).argmax(axis = 1)
dataset = load_iris()
data = dataset['data']
label = dataset['target']
gmm = GMM_EM(3)
pre_label = gmm.fit_predict(data)
print(accuracy_score(pre_label,label))
# EM算法是对初始值敏感的,修改初始化的值会发现模型性能变化很大