隐马尔可夫模型是一个强大的分析时间序列数据的分析工具。假定被建模的系统是带有隐藏状态的马尔可夫过程,这意味着底层系统可以是一组可能的状态之一,系统经历一系列的状态转换,从而产生一系列输出。我们仅能观察输出,而无法观测状态,因为这些状态被隐藏了。我们的目标是对这些数据建模,以便我们能推断未知数据的状态转换。
- S={s1,s2,…,sN,} 表示所有可能状态的集合(《统计学习方法》使用Q来表示),
- N 表示可能的状态数,
- V={v1,v2,…,vM,} 表示可能的观测的集合,
- M 表示可能的观测数,
- I 表示长度为T的状态序列,O是对应的观测序列,
- A 表示状态转移概率矩阵,
- B 表示观测概率矩阵,
- Pi 表示初始状态概率向量,
- Lambda =(A, B, Pi)表示隐马尔可夫模型的参数模型。
通常来说隐马尔可夫模型(以下简称HMM)有3个基本问题:概率计算问题,学习问题,预测问题(也称解码问题)。第一个问题对应前向算法和后向算法;二,三一般使用Baum-Welch算法,维特比Viterbi算法来解决。
大部分时候前向算法和后向算法都是为Baum-Welch算法服务的,而维特比Viterbi算法是单独存在的,所以我们先讲维特比(以下简称Viterbi)算法。
"""
hmm1.py is two slow: delta_lambda = 15 ,x=8 more zhan 30k times
1. with scale
2. be modified
Pivot:
1. alpha,beta,gamma,xi are all for Baum_Welch
2. delta,psi are for viterbi
3. Baum_Welch for traning, viterbi for predict
Ideas for application:
1. predict
2. speech recognition
3. NLP
error:
1. the computing X must be array, need to np.array
2. and the cells must be float, need np.float, if initialization need decimal.
3. initialization is big proplem, every cell shouldn't be 0
"""
import numpy as np
from scipy.spatial.distance import cdist # computing Chebyshev distance between matrixes
class HMM:
def __init__(self, Ann, Bnm, Pi, O):
self.A = np.array(Ann, np.float)
self.B = np.array(Bnm, np.float)
self.Pi = np.array(Pi, np.float)
self.O = np.array(O, np.float)
self.N = self.A.shape[0]
self.M = self.B.shape[1]
def forward(self):
T = len(self.O)
alpha = np.zeros((T, self.N), np.float)
for i in range(self.N):
alpha[0, i] = self.Pi[i] * self.B[i, int(self.O[0])]
for t in range(T - 1):
for i in range(self.N):
summation = 0 # for every i 'summation' should reset to '0'
for j in range(self.N):
summation += alpha[t, j] * self.A[j, i]
alpha[t + 1, i] = summation * self.B[i, int(self.O[t + 1])]
summation = 0.0
for i in range(self.N):
summation += alpha[T - 1, i]
Polambda = summation
return Polambda, alpha
def forward_with_scale(self):
T = len(self.O)
alpha_raw = np.zeros((T, self.N), np.float)
alpha = np.zeros((T, self.N), np.float)
c = [i for i in range(T)] # scalint factor; 0 or sequence doesn't matter
for i in range(self.N):
alpha_raw[0, i] = self.Pi[i] * self.B[i, [int(self.O[0])]]
c[0] = 1.0 / sum(alpha_raw[0, i] for i in range(self.N))
for i in range(self.N):
alpha[0, i] = c[0] * alpha_raw[0, i]
for t in range(T - 1):
for i in range(self.N):
summation = 0.0
for j in range(self.N):
summation += alpha[t, j] * self.A[j, i]
alpha_raw[t + 1, i] = summation * self.B[i, int(self.O[t + 1])]
c[t + 1] = 1.0 / sum(alpha_raw[t + 1, i1] for i1 in range(self.N))
for i in range(self.N):
alpha[t + 1, i] = c[t + 1] * alpha_raw[t + 1, i]
return alpha, c
def backward(self):
T = len(self.O)
beta = np.zeros((T, self.N), np.float)
for i in range(self.N):
beta[T - 1, i] = 1.0
for t in range(T - 2, -1, -1):
for i in range(self.N):
summation = 0.0 # 这个必须在这一行,每一次for i 都要重置为0
for j in range(self.N):
summation += self.A[i, j] * self.B[j, int(self.O[t + 1])] * beta[t + 1, j]
beta[t, i] = summation
Polambda = 0.0
for i in range(self.N):
Polambda += self.Pi[i] * self.B[i, int(self.O[0])] * beta[0, i]
return Polambda, beta
def backward_with_scale(self, c):
T = len(self.O)
beta_raw = np.zeros((T, self.N), np.float)
beta = np.zeros((T, self.N), np.float)
for i in range(self.N):
beta_raw[T - 1, i] = 1.0
beta[T - 1, i] = c[T - 1] * beta_raw[T - 1, i]
for t in range(T - 2, -1, -1):
for i in range(self.N):
summation = 0.0
for j in range(self.N):
summation += self.A[i, j] * self.B[j, int(self.O[t + 1])] * beta[t + 1, j]
beta[t, i] = c[t] * summation # summation = beta_raw[t,i]
return beta
def viterbi(self):
# given O,lambda .finding I
T = len(self.O)
I = np.zeros(T, np.float)
delta = np.zeros((T, self.N), np.float)
psi = np.zeros((T, self.N), np.float)
for i in range(self.N):
delta[0, i] = self.Pi[i] * self.B[i, int(self.O[0])]
psi[0, i] = 0
for t in range(1, T):
for i in range(self.N):
delta[t, i] = self.B[i, int(self.O[t])] * np.array([delta[t - 1, j] * self.A[j, i]
for j in range(self.N)]).max()
psi[t, i] = np.array([delta[t - 1, j] * self.A[j, i]
for j in range(self.N)]).argmax()
P_T = delta[T - 1, :].max()
I[T - 1] = delta[T - 1, :].argmax()
for t in range(T - 2, -1, -1):
I[t] = psi[t + 1, int(I[t + 1])]
return I
def compute_gamma(self, alpha, beta):
T = len(self.O)
gamma = np.zeros((T, self.N), np.float) # the probability of Ot=q
for t in range(T):
for i in range(self.N):
gamma[t, i] = alpha[t, i] * beta[t, i] / sum(
alpha[t, j] * beta[t, j] for j in range(self.N))
return gamma
def compute_xi(self, alpha, beta):
T = len(self.O)
xi = np.zeros((T - 1, self.N, self.N), np.float) # note that: not T
for t in range(T - 1): # note: not T
for i in range(self.N):
for j in range(self.N):
numerator = alpha[t, i] * self.A[i, j] * self.B[j, int(self.O[t + 1])] * beta[t + 1, j]
# the multiply term below should not be replaced by 'nummerator',
# since the 'i,j' in 'numerator' are fixed.
# In addition, should not use 'i,j' below, to avoid error and confusion.
denominator = sum(sum(
alpha[t, i1] * self.A[i1, j1] * self.B[j1, int(self.O[t + 1])] * beta[t + 1, j1]
for j1 in range(int(self.N))) # the second sum
for i1 in range(self.N)) # the first sum
xi[t, i, j] = numerator / denominator
return xi
def Baum_Welch(self):
# given O list finding lambda model(can derive T form O list)
# also given N, M,
T = len(self.O)
V = [k for k in range(self.M)]
# initialization - lambda
self.A = np.array(([[0, 1, 0, 0], [0.4, 0, 0.6, 0], [0, 0.4, 0, 0.6], [0, 0, 0.5, 0.5]]), np.float)
self.B = np.array(([[0.5, 0.5], [0.3, 0.7], [0.6, 0.4], [0.8, 0.2]]), np.float)
# mean value is not a good choice
self.Pi = np.array(([1.0 / self.N] * self.N), np.float) # must be 1.0 , if 1/3 will be 0
# self.A = np.array([[1.0 / self.N] * self.N] * self.N) # must array back, then can use[i,j]
# self.B = np.array([[1.0 / self.M] * self.M] * self.N)
x = 1
delta_lambda = x + 1
times = 0
# iteration - lambda
while delta_lambda > x: # x
Polambda1, alpha = self.forward() # get alpha
Polambda2, beta = self.backward() # get beta
gamma = self.compute_gamma(alpha, beta) # use alpha, beta
xi = self.compute_xi(alpha, beta)
lambda_n = [self.A, self.B, self.Pi]
for i in range(self.N):
for j in range(self.N):
numerator = sum(xi[t, i, j] for t in range(T - 1))
denominator = sum(gamma[t, i] for t in range(T - 1))
self.A[i, j] = numerator / denominator
for j in range(self.N):
for k in range(self.M):
numerator = sum(gamma[t, j] for t in range(T) if self.O[t] == V[k]) # TBD
denominator = sum(gamma[t, j] for t in range(T))
self.B[i, k] = numerator / denominator
for i in range(self.N):
self.Pi[i] = gamma[0, i]
# if sum directly, there will be positive and negative offset
# computes the Chebyshev distance between two matrixes
cdist_A = cdist(lambda_n[0], self.A, 'chebyshev') # cdist_A is still a matrix
cdist_B = cdist(lambda_n[1], self.B, 'chebyshev')
cdist_Pi = cdist([lambda_n[2]], [self.Pi], 'chebyshev') # turn Pi(vector) to matrix.
delta_lambda = sum([sum(sum(cdist_A)), sum(sum(cdist_B)), sum(sum(cdist_Pi))])
times += 1
print(times)
return self.A, self.B, self.Pi
def Baum_Welch_with_scale(self):
T = len(self.O)
V = [k for k in range(self.M)]
# initialization - lambda , should be float(need .0)
self.A = np.array([[0.2, 0.2, 0.3, 0.3], [0.2, 0.1, 0.6, 0.1], [0.3, 0.4, 0.1, 0.2], [0.3, 0.2, 0.2, 0.3]])
self.B = np.array([[0.5, 0.5], [0.3, 0.7], [0.6, 0.4], [0.8, 0.2]])
x = 3
delta_lambda = x + 1
times = 0
# iteration - lambda
while delta_lambda > x: # x
alpha, c = self.forward_with_scale()
beta = self.backward_with_scale(c)
gamma = self.compute_gamma(alpha, beta)
xi = self.compute_xi(alpha, beta)
lambda_n = [self.A, self.B, self.Pi]
for i in range(self.N):
for j in range(self.N):
numerator = sum(xi[t, i, j] for t in range(T - 1))
denominator = sum(gamma[t, i] for t in range(T - 1))
self.A[i, j] = numerator / denominator
for j in range(self.N):
for k in range(self.M):
numerator = sum(gamma[t, j] for t in range(T) if self.O[t] == V[k]) # TBD
denominator = sum(gamma[t, j] for t in range(T))
self.B[i, k] = numerator / denominator
for i in range(self.N):
self.Pi[i] = gamma[0, i]
# if sum directly, there will be positive and negative offset
# computes the Chebyshev distance between two matrixes
cdist_A = cdist(lambda_n[0], self.A, 'chebyshev')
cdist_B = cdist(lambda_n[1], self.B, 'chebyshev')
cdist_Pi = cdist([lambda_n[2]], [self.Pi], 'chebyshev')
# delta_lambda = sum([ sum(sum(cdist_A)), sum(sum(cdist_B)), sum(sum(cdist_Pi)) ])
# try igore B's error, not work casue the lambda is still in local optimal
delta_lambda = sum([sum(sum(cdist_A)), sum(sum(cdist_Pi))])
times += 1
print(times)
return self.A, self.B, self.Pi
def try_viterbi():
A11 = [[0, 1, 0, 0], [0.4, 0, 0.6, 0], [0, 0.4, 0, 0.6], [0, 0, 0.5, 0.5]]
B11 = [[0.5, 0.5], [0.3, 0.7], [0.6, 0.4], [0.8, 0.2]]
Pi11 = [0.25] * 4
O11 = [0, 1, 0, 1, 0, 0, 1, 0, 1] # 0 denote red, 1 denote white
hmm11 = HMM(A11, B11, Pi11, O11)
I = hmm11.viterbi()
print(I)
def try_Baum_Welch():
A21 = np.zeros((4, 4), np.float)
B21 = np.zeros((4, 2), np.float)
Pi21 = [0.25, 0.25, 0.25, 0.25]
O21 = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1]
V21 = [k for k in range(2)]
Q21 = [q for q in range(4)]
# T21 = len(O21)
hmm21 = HMM(A21, B21, Pi21, O21)
A, B, Pi = hmm21.Baum_Welch()
print(A, B, Pi)
def try_Baum_Welch_with_scale():
A22 = np.zeros((4, 4), np.float)
B22 = np.zeros((4, 2), np.float)
Pi22 = [0.25, 0.25, 0.25, 0.25]
O22 = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1] * 4
# V22 = [k for k in range(2)]
# Q22 = [q for q in range(4)]
# T21 = len(O21)
hmm22 = HMM(A22, B22, Pi22, O22)
A, B, Pi = hmm22.Baum_Welch_with_scale()
print(A, B, Pi)
if __name__ == '__main__':
print('HMM in python')
# try_viterbi()
try_Baum_Welch()
# try_Baum_Welch_with_scale()
测试结果:
1、try_viterbi()
HMM in python
[0. 1. 0. 1. 2. 3. 2. 3. 2.]
2、try_Baum_Welch()
3、try_Baum_Welch_with_scale()