目录
实现基于条件随机场模型的中文分词
在中文分词领域,基于字标注的方法得到广泛应用,通过字标注分词问题可转换为序列标注问题,现在分词效果较好且常用的是基于条件随机场(CRFs)的标注模型。其模型思想是,条件随机场模型对应一个无向图 ,,Y中的元素与无向图的顶点一一对应。在条件X下,随机变量 的条件概率分布符合图的马尔科夫性质,即可称当前是一个条件随机场。条件随机场模型用于在给定需要标记的观察序列的条件下,计算整个编辑序列的联合概率分布,求解出句子中字的标记序列的联合概率分布,从而实现分词。
基于条件随机场模型的中文分词改进:https://blog.csdn.net/admiz/article/details/109882968
词语特征学习
一、读取语料库
采用Python的open( ) 函数读取ic文件,设置读取编码为'utf-8-sig'后打开名为"msr_training.utf8.ic"的语料库,再用readlines( )函数将语料库的每一行作为wordFeed列表的一个元素,通过len( )函数求得语料库的长度wordFeedLen。其中wordFeed列表内每个元素的格式为:[A|B], “A”为一个字,“B”为该字所处的状态。数据格式如下图,数据下载链接:https://download.csdn.net/download/admiz/13132232
二、求出特征二(R)
为求出特征二,先针对的某个字,统计出所它在语料库中出现的总次数,再计算它的状态为词头(B)、词中(M)、词尾(E)、单字成词(S)的概率。
在进行针对某个字的统计前,先设储存结果的数据结构为字典,其具体结构如{A:[B,M,E,S]},其中“A” 为所针对的字,“B”为该字在词头的概率,“M”为该字在词中的概率,“E”为该字在词尾的概率,“S”为该字单字成词的概率。
该步骤部分伪代码如下:
i为要求特征二的某个字
初始化BDict为空字典
初始化B、M、E、S、BMESsum为0
for j in range(wordFeedLen):
if wordFeed[j][0] == i:
if wordFeed[j][2] == 'B':
B = B+1
elif wordFeed[j][2] == 'M':
M = M+1
elif wordFeed[j][2] == 'E':
E = E+1
elif wordFeed[j][2] == 'S':
S = S+1
BMESsum = B+M+E+S
BDict[i] = [B/BMESsum,M/BMESsum,E/BMESsum,S/BMESsum]
三、求出特征三(P)
求特征三是针对某个字,分别计算出它在当前状态B、M、E、S时,它转移到下一个字的状态为B、M、E、S的概率。 统计所针的某个字在全文中出现的总次数,并计算它的状态为词头(B)、词中(M)、词尾(E)、单字成词(S)的概率。针对4种状态,会形成一个4x4的矩阵,矩阵中的值是它们相互之间的转移概率。
在进行针对某个字的统计前,先设储存结果的数据结构为字典,其具体结构如:
{ A:[[0,0,0,0],[0,0,0,0],[0,0,0,0],[0,0,0,0]]} ,其中“A”为所针对的字,列表内的第一个元素(即所加粗列表)表示:为当“A”为状态B时,下一个字的状态为B、M、E、S的概率与该元素内的[0,0,0,0]一一对应,以此类推。
该步骤部分代码如下:
(1)创建及初始化字的4x4的矩阵(字典)
def setDict2(testWord): #testWord为句子每个字的列表
testDict = { }
for i in testWord:
testDict[i]=[[0,0,0,0],[0,0,0,0],[0,0,0,0],[0,0,0,0]] #[B,M,E,S]的四阶矩阵
return testDict
(2)特征三内的判断判断函数
def charCJudge(i,mark1): #i为要判断的字符 mark1为i的状态
B=0
M=0
E=0
S=0
BMESsum=0
for j in range(wordFeedLen):
if i == wordFeed[j][0] and wordFeed[j][2]==mark1 and j+1<len(wordFeed):
if wordFeed[j+1][2] == 'B':
B=B+1
elif wordFeed[j+1][2] == 'M':
M=M+1
elif wordFeed[j+1][2] == 'E' :
E=E+1
elif wordFeed[j+1][2] == 'S' :
S=S+1
else:
pass
BMESsum=B+M+E+S
if BMESsum>0 :
return [B/BMESsum,M/BMESsum,E/BMESsum,S/BMESsum]
elif BMESsum==0 :
return [0,0,0,0]
(3) 计算转移频率矩阵值
设testWord所要分词的句词列表
testDict=setDict2(testWord)
for i in testWord:
j=0
for mark1 in ['B','M','E','S']:
testDict[i][j]=charCJudge(i,mark1)
j=j+1
四、求出特征四(W)
特征四是针对需要分词的字,计算该字在B、M、E、S状态下,下一个字出现的内容,并计算两个字同时出现的概率,即记录上一个字和下一个的上下文关系。
在进行针对某个字的统计前,先设储存结果的数据结构为字典,其具体结构如:
{ A:{'B':{C:[B0,B1]},'M':{},'E':{},'S':{}}} ,其中“A”为所针对的字,“B”为该字所处的状态,“B”内的字典意为“A”字在“B”状态下,“C”字出现在“A”字前后的概率分别为B0,B1,以此类推。
该步骤部分伪代码如下:
(1)创建及初始化上下文字典
def setDict3(testWord): #testWord为句子每个字的列表
testDict = { }
for i in testWord:
testDict[i]={'B':{},'M':{},'E':{},'S':{}} #[B,M,E,S]的四阶矩阵
return testDict
(2)特征四内的上下文关系概率函数
def charDJudge(i,mark1): #i为要判断的字符 mark1为i的状态
testDict = { }
lastSum=0
nextSum=0
for j in range(wordFeedLen):
#print(wordFeed[j][0])
if i == wordFeed[j][0] and wordFeed[j][2]==mark1 and j+1<len(wordFeed):
#特征四内的上文关系概率函数
if wordFeed[j-1][0] not in testDict:
testDict[wordFeed[j-1][0]]=[1,0]
lastSum=lastSum+1
elif wordFeed[j-1][0] in testDict:
testDict[wordFeed[j-1][0]][0]=testDict[wordFeed[j-1][0]][0]+1
lastSum=lastSum+1
#特征四内的下文关系概率函数
if wordFeed[j+1][0] not in testDict:
testDict[wordFeed[j+1][0]]=[0,1]
nextSum=nextSum+1
elif wordFeed[j+1][0] in testDict:
testDict[wordFeed[j+1][0]][1]=testDict[wordFeed[j+1][0]][1]+1
nextSum=nextSum+1
for key, value in testDict.items():
testDict[key][0]=testDict[key][0]/lastSum
testDict[key][1]=testDict[key][1]/nextSum
return testDict
开始分词
完成词语特征学习后,就可以通过已经训练好的参数值实现中文分词了。例如,对用户输入的句子“希腊的经济结构较特殊”进行中文分词处理。
第一步,将用户输入的“希腊的经济结构较特殊”变成字符列表: ['希', '腊', '的', '经', '济', '结', '构', '较', '特', '殊']。
第二步,求出每个字对应的特征,并根据状态信息回执字与状态的初始矩阵映射关系表。计算字符列表内所有字符在B、M、E、S的状态概率,后面的问题即可迎刃而解。根据维特比公式:
(备注:S为矩阵中的值,R是特征二,P是特征三,“W前”是特征四的上文部分, “W后”是特征四的下文部分。)
最后,根据公式可求得字与状态对应关系计算后的矩阵。
该步骤部分代码如下:
(1)先初始化字符串列表
testList=['希', '腊', '的', '经', '济', '结', '构', '较', '特', '殊']
(2)通过上述3.1特征学习内的步骤求出特征二、特征三、特征四
testCharaB=charaB(testList) #特征二 字符BMES矩阵
testCharaC=charaC(testList) #特征三 转移频率
testCharaD=charaD(testList) #特征四 上下文关联关系
(3)求出字与状态对应关系矩阵及回溯路径
column=['B','M','E','S']
relaDict=setDict(testList)
wayList=[] #保存回溯路径
a=0
b=0
c=0
d=0
for i in range(len(testList)):
oneWaylist=[] #临时保存回溯路径
(4)实现维特比公式
(e为所针对字的特征四上文概率,e为该字的特征四下文概率)
for j in range(len(column)):
if i==0 :
if testList[i+1] not in testCharaD[testList[i]][column[j]] :
e=0
else:
e=testCharaD[testList[i]][column[j]][testList[i+1]][0]
relaDict[testList[i]][j]= e + testCharaB[testList[i]][j]
elif i>0 and i<len(testList)-1:
a=testCharaC[testList[i-1]][0][j] * relaDict[testList[i-1]][0]
b=testCharaC[testList[i-1]][1][j] * relaDict[testList[i-1]][1]
c=testCharaC[testList[i-1]][2][j] * relaDict[testList[i-1]][2]
d=testCharaC[testList[i-1]][3][j] * relaDict[testList[i-1]][3]
if testList[i-1] not in testCharaD[testList[i]][column[j]] : #特征四上文
e=0
else:
e=testCharaD[testList[i]][column[j]][testList[i-1]][0]
if testList[i+1] not in testCharaD[testList[i]][column[j]] : #特征四下文
f=0
else:
f=testCharaD[testList[i]][column[j]][testList[i+1]][1]
relaDict[testList[i]][j]=max(a,b,c,d) + testCharaB[testList[i]][j] + e + f
elif i==len(testList)-1:
a=testCharaC[testList[i-1]][0][j] * relaDict[testList[i-1]][0]
b=testCharaC[testList[i-1]][1][j] * relaDict[testList[i-1]][1]
c=testCharaC[testList[i-1]][2][j] * relaDict[testList[i-1]][2]
d=testCharaC[testList[i-1]][3][j] * relaDict[testList[i-1]][3]
if testList[i-1] not in testCharaD[testList[i]][column[j]] : #特征四上文
e=0
else:
e=testCharaD[testList[i]][column[j]][testList[i-1]][0]
relaDict[testList[i]][j]= max(a,b,c,d) + testCharaB[testList[i]][j] +e
经过上述代码即可求得字与状态对应关系矩阵与回溯路径,得知每个字标注结果,从而实现分词。
项目完整代码和截图
项目代码
# -*- coding: utf-8 -*-
"""
Created on Fri Oct 18 08:36:46 2019
@author: JoeLiao‘s ASUS
"""
#全局变量
#读取语料库
wordFile=open("msr_training.utf8.ic",'r',encoding='utf-8')
wordFeed=wordFile.readlines() #[0]=字 [2]=标注
wordFile.close
wordFeedLen=len(wordFeed)
#print(wordFeed)
#创建字的列表
def setList(a): #a为要分词的句子
testWord = [ ]
for i in a:
testWord.append(i)
return testWord
#创建及初始化矩阵(字典)
def setDict(testWord): #testWord为句子每个字的列表
testDict = { }
for i in testWord:
testDict[i]=[0,0,0,0] #[B,M,E,S]
return testDict
####### 特征二 #######
#特征二 计算状态频率矩阵值
def charaB(testWord): #testDict为每个字的字典
testDict=setDict(testWord)
for i in testWord:
B=0
M=0
E=0
S=0
BMESsum=0
for j in range(wordFeedLen):
#print(wordFeed[j][0])
if i == wordFeed[j][0]:
if wordFeed[j][2] == 'B':
B=B+1
elif wordFeed[j][2] == 'M':
M=M+1
elif wordFeed[j][2] == 'E':
E=E+1
elif wordFeed[j][2] == 'S':
S=S+1
BMESsum=B+M+E+S
testDict[i]=[B/BMESsum,M/BMESsum,E/BMESsum,S/BMESsum]
return testDict
####### 特征三 #######
#创建及初始化字的4x4的矩阵(字典)
def setDict2(testWord): #testWord为句子每个字的列表
testDict = { }
for i in testWord:
testDict[i]=[[0,0,0,0],[0,0,0,0],[0,0,0,0],[0,0,0,0]] #[B,M,E,S]的四阶矩阵
return testDict
#特征三内的判断判断函数
def charCJudge(i,mark1): #i为要判断的字符 mark1为i的状态
B=0
M=0
E=0
S=0
BMESsum=0
for j in range(wordFeedLen):
#print(wordFeed[j][0])
if i == wordFeed[j][0] and wordFeed[j][2]==mark1 and j+1<len(wordFeed):
if wordFeed[j+1][2] == 'B':
B=B+1
elif wordFeed[j+1][2] == 'M':
M=M+1
elif wordFeed[j+1][2] == 'E' :
E=E+1
elif wordFeed[j+1][2] == 'S' :
S=S+1
else:
pass
BMESsum=B+M+E+S
if BMESsum>0 :
return [B/BMESsum,M/BMESsum,E/BMESsum,S/BMESsum]
elif BMESsum==0 :
return [0,0,0,0]
#特征三 计算转移频率矩阵值
def charaC(testWord):
testDict=setDict2(testWord)
for i in testWord:
j=0
for mark1 in ['B','M','E','S']:
testDict[i][j]=charCJudge(i,mark1)
j=j+1
#print(i,j)
#print(testDict)
return testDict
####### 特征四 #######
#创建及初始化上下文字典
def setDict3(testWord): #testWord为句子每个字的列表
testDict = { }
for i in testWord:
testDict[i]={'B':{},'M':{},'E':{},'S':{}} #[B,M,E,S]的四阶矩阵
return testDict
#特征四内的上下文关系概率函数
def charDJudge(i,mark1): #i为要判断的字符 mark1为i的状态
testDict = { }
lastSum=0
nextSum=0
for j in range(wordFeedLen):
#print(wordFeed[j][0])
if i == wordFeed[j][0] and wordFeed[j][2]==mark1 and j+1<len(wordFeed):
#特征四内的上文关系概率函数
if wordFeed[j-1][0] not in testDict:
testDict[wordFeed[j-1][0]]=[1,0]
lastSum=lastSum+1
elif wordFeed[j-1][0] in testDict:
testDict[wordFeed[j-1][0]][0]=testDict[wordFeed[j-1][0]][0]+1
lastSum=lastSum+1
#特征四内的下文关系概率函数
if wordFeed[j+1][0] not in testDict:
testDict[wordFeed[j+1][0]]=[0,1]
nextSum=nextSum+1
elif wordFeed[j+1][0] in testDict:
testDict[wordFeed[j+1][0]][1]=testDict[wordFeed[j+1][0]][1]+1
nextSum=nextSum+1
for key, value in testDict.items():
testDict[key][0]=testDict[key][0]/lastSum
testDict[key][1]=testDict[key][1]/nextSum
return testDict
#特征四计算特定的字与上下文关系
def charaD(testWord):
testDict=setDict3(testWord)
for i in testWord:
for mark1 in ['B','M','E','S']:
testDict[i][mark1]=charDJudge(i,mark1)
return testDict
####### 实现分词 #######
#返回分词结果
def getResult(signList):
resultString=''
for iList in signList:
if iList[1]=='B':
resultString=resultString+' '+iList[0]
elif iList[1]=='M':
resultString=resultString+iList[0]
elif iList[1]=='E':
resultString=resultString+iList[0]+' '
elif iList[1]=='S':
resultString=resultString+' '+iList[0]+' '
return resultString
#返回分词标记转换
def trans(num):
if num==0:
return 'B'
elif num==1:
return 'M'
elif num==2:
return 'E'
elif num==3:
return 'S'
#字与状态对应关系计算
def separateWords(testString):
testString=str1 #要测试的句子
testList=setList(testString) #字符串列表
#计算 特征二 特征三 特征四
print('字符串列表:',testList,'\n')
testCharaB=charaB(testList) #特征二 字符BMES矩阵
#print('特征二:',testCharaB,'\n')
testCharaC=charaC(testList) #特征三 转移频率
#print('特征三',testCharaC,'\n')
testCharaD=charaD(testList) #特征四 上下关联关系
print('特征四',testCharaD,'\n')
#生成字与状态对应关系矩阵值(字典)
column=['B','M','E','S']
relaDict=setDict(testList)
wayList=[]
a=0
b=0
c=0
d=0
for i in range(len(testList)):
oneWaylist=[]
for j in range(len(column)):
#print(testList[i],column[j])
if i==0 :
if testList[i+1] not in testCharaD[testList[i]][column[j]] :
e=0
else:
e=testCharaD[testList[i]][column[j]][testList[i+1]][0]
relaDict[testList[i]][j]= e + testCharaB[testList[i]][j]
elif i>0 and i<len(testList)-1:
a=testCharaC[testList[i-1]][0][j] * relaDict[testList[i-1]][0]
b=testCharaC[testList[i-1]][1][j] * relaDict[testList[i-1]][1]
c=testCharaC[testList[i-1]][2][j] * relaDict[testList[i-1]][2]
d=testCharaC[testList[i-1]][3][j] * relaDict[testList[i-1]][3]
if testList[i-1] not in testCharaD[testList[i]][column[j]] : #特征四上文
e=0
else:
e=testCharaD[testList[i]][column[j]][testList[i-1]][0]
if testList[i+1] not in testCharaD[testList[i]][column[j]] : #特征四下文
f=0
else:
f=testCharaD[testList[i]][column[j]][testList[i+1]][1]
relaDict[testList[i]][j]=max(a,b,c,d) + testCharaB[testList[i]][j] + e + f
elif i==len(testList)-1:
a=testCharaC[testList[i-1]][0][j] * relaDict[testList[i-1]][0]
b=testCharaC[testList[i-1]][1][j] * relaDict[testList[i-1]][1]
c=testCharaC[testList[i-1]][2][j] * relaDict[testList[i-1]][2]
d=testCharaC[testList[i-1]][3][j] * relaDict[testList[i-1]][3]
if testList[i-1] not in testCharaD[testList[i]][column[j]] : #特征四上文
e=0
else:
e=testCharaD[testList[i]][column[j]][testList[i-1]][0]
relaDict[testList[i]][j]= max(a,b,c,d) + testCharaB[testList[i]][j] +e
#wayDict={'a':a,'b':b,'c':c,'d':d}
findMax=[a,b,c,d]
oneWaylist.append(findMax.index(max(findMax)))
#print(oneWaylist)
wayList.append(oneWaylist)
print('\n关系矩阵:',relaDict,'\n\n回溯路径:',wayList)
signList=[]
lenList=[]
#lenList=list(range(len(testList))).reverse()
for i in range(len(testList)):
lenList.append(i)
lenList.reverse()
for i in lenList:
testWord=testList[i]
if i == len(testList)-1:
indexNum=relaDict[testWord].index(max(relaDict[testWord]))
sign=trans(indexNum)
signList.append([testWord,sign])
nextIndexNum=wayList[i][indexNum]
else:
sign=trans(nextIndexNum)
signList.append([testWord,sign])
indexNum=relaDict[testWord].index(max(relaDict[testWord]))
nextIndexNum=wayList[i][indexNum]
signList.reverse()
print("\n分词标记:",signList,'\n')
print("分词原句:",testString,'\n')
print("分词结果:",getResult(signList))
################主函数################
import time
#from multiprocessing.dummy import Pool as ThreadPool
#pool = ThreadPool(processes=8)
str1=input("请输入要分词的字符串:")
print('\n')
start = time.clock()
separateWords(str1)
#results2 = pool.map(separateWords, str1)
#pool.close()
#pool.join()
print('\n')
elapsed = (time.clock() - start)
print("分词用时:",elapsed,'秒')
#input('******回车后可退出界面******')
结果截图
不打印特征二、三、四的截图:
打印特征后的截图:
特征四(部分截图)