python代码实现(带有核函数的二分类非线性分类器):
import numpy as np
import random
import matplotlib.pyplot as plt
# 核函数
def keanel_func(X, A, kTup):
m, n = np.shape(X)
K = np.mat(np.zeros((m, 1)))
if(kTup[0] == 'lin'):
K = X * A.T
elif(kTup[0] == 'rbf'):
for j in range(m):
divider = X[j] - A
divider_up = divider * divider.T
K[j] = np.exp(divider_up/(-1 * (kTup[1]**2)))
else:
raise NameError("That Kernel is not recognized")
return K
# 从txt文件中读入原始数据,返回输入特征和标签的列表集
def loadDataset(filename):
dataList = []
labelList = []
fr = open(filename)
for row in fr.readlines():
row = row.strip().split("\t")
dataList.append([float(row[0]), float(row[1])])
labelList.append(float(row[2]))
return dataList, labelList
# 创建一个对象,包含程序需要的多种属性参数
class opstruct:
def __init__(self, dataMat, labelMat, C, toler, KTup):
self.dataMat = dataMat
self.labelMat = labelMat
self.C = C
self.tol = toler
self.m = np.shape(dataMat)[0]
self.alphas = np.mat(np.zeros((self.m, 1)))
self.b = 0
# Ei的缓存变量
self.eicache = np.mat(np.zeros((self.m, 2)))
self.K = np.mat(np.zeros((self.m, self.m)))
for i in range(self.m):
self.K[:, i] = keanel_func(self.dataMat, self.dataMat[i], KTup)
# 随机选择一个和i不同的j
def selectjrand(i, m):
j = i
while(j == i):
j = int(random.uniform(0, m))
return j
# 对alphas[j]的值进行调整
def clipAlphs(alphaj, H, L):
if(alphaj > H):
alphaj = H
if(alphaj < L):
alphaj = L
return alphaj
# 计算Ek
def calculate_Ek(os, k):
fxk = float(np.multiply(os.alphas, os.labelMat).T * os.K[:, k]) + os.b
Ek = fxk - float(os.labelMat[k])
return Ek
# 启发式内循环,选择alphaj
def selectj(i, os, Ei):
maxk = -1
maxDeltaE = 0
Ej = 0
os.eicache[i] = [1, Ei]
# validEieacheList = np.nonzero(os.eicache[:, 0].A)[0]
validEieacheList = np.where(os.eicache[:, 0].A == 1)[0]
if (len(validEieacheList > 1)):
for k in validEieacheList:
if (k == i):
continue
Ek = calculate_Ek(os, k)
deltaE = abs(Ek - Ei)
if(deltaE > maxDeltaE):
maxk = k
maxDeltaE = deltaE
Ej = Ek
return maxk, Ej
else:
j = selectj(i, os.m)
Ej = calculate_Ek(os, j)
return j, Ej
# 计算误差Ek放入eieache缓存
def updateEk(os, k):
Ek = calculate_Ek(os, k)
os.eicache[k] = [1, Ek]
# 更新每次启发式选择的两个alpha的值
def inner(i, os):
# 计算Ei
Ei = calculate_Ek(os, i)
# 如果alphas[i]不满足kkt条件就进入下面的优化
if (((os.labelMat[i]*Ei < -os.tol) and (os.alphas[i] < os.C)) or
((os.labelMat[i]*Ei > os.tol) and (os.alphas[i] > 0))):
# 由内循环选择的j
j, Ej = selectj(i, os, Ei)
# 保存alphai和alphaj的值
alphai_old = os.alphas[i].copy()
alphaj_old = os.alphas[j].copy()
# L和H用于将alphas[j]调整到0-C之间。如果L==H,就不做任何改变,直接return 0
if (os.labelMat[i] != os.labelMat[j]):
L = max(0, os.alphas[j] - os.alphas[i])
H = min(os.C, os.C + os.alphas[j] - os.alphas[i])
else:
L = max(0, os.alphas[j] + os.alphas[i] - os.C)
H = min(os.C, os.alphas[j] + os.alphas[i])
if L == H:
print("L==H")
return 0
# 计算alphaj的值
#eta = os.dataMat[i] * os.dataMat[i].T + os.dataMat[j] * os.dataMat[j].T - 2 * os.dataMat[i] * os.dataMat[j].T
eta = os.K[i, i] + os.K[j, j] - 2 * os.K[i, j]
if(eta <= 0):
print("eta <= 0")
return 0
os.alphas[j] += os.labelMat[j]*(Ei-Ej)/eta
os.alphas[j] = clipAlphs(os.alphas[j], H, L)
updateEk(os, j)
if (os.alphas[j]-alphaj_old <= 0.00001):
print("j not changing enough")
return 0
os.alphas[i] += os.labelMat[i]*os.labelMat[j]*(alphaj_old-os.alphas[j])
updateEk(os, i)
# 计算b
b1 = (os.b - Ei - os.labelMat[i] * os.K[i, i] * (os.alphas[i] - alphai_old) - os.labelMat[i] * os.K[i, j] *
(os.alphas[j] - alphaj_old))
b2 = (os.b - Ej - os.labelMat[i] * os.K[i, j] * (os.alphas[i] - alphai_old) - os.labelMat[i] * os.K[j, j] *
(os.alphas[j] - alphaj_old))
if (0 < os.alphas[i]) and (os.alphas[i] < os.C):
os.b = b1
elif (0 < os.alphas[j]) and (os.alphas[j] < os.C):
os.b = b2
else:
os.b = (b1 + b2) / 2.0
return 1
else:
return 0
# 完整的smo算法
def smo_full(dataMat, labelMat, C, toler, maxIter, kTup):
# 创建一个opstruct对象
os = opstruct(dataMat, labelMat, C, toler, kTup)
iter = 0
entireset = True
alphaPairsChanged = 0
while((iter < maxIter) and ((entireset) or (alphaPairsChanged > 0))):
alphaPairsChanged = 0
if(entireset):
for i in range(os.m):
alphaPairsChanged += inner(i, os)
print("fullSet, iter: %d i:%d, pairs changed %d" % (iter, i, alphaPairsChanged))
iter += 1
else:
# 遍历所有的非边界alpha值,也就是不在边界0或C上的值。
nonBoundIs = np.nonzero((os.alphas.A > 0) * (os.alphas.A < C))[0]
for i in nonBoundIs:
alphaPairsChanged += inner(i, os)
print("non-bound, iter: %d i:%d, pairs changed %d" % (iter, i, alphaPairsChanged))
iter += 1
if(entireset):
entireset = False
elif(alphaPairsChanged == 0):
entireset = True
print("iteration number: %d" % iter)
print("entireset:", entireset, "alphaPairsChanged:", alphaPairsChanged)
return os.b, os.alphas
# 基于上面的alphas求w
def calculate_w(dataMat,labelMat,alphas):
print("dataMat: ", dataMat.shape)
print("labelMat: ", labelMat.shape)
m, n = dataMat.shape
w = np.zeros((n, 1))
for i in range(m):
w += np.multiply(alphas[i]*labelMat[i], dataMat[i].T)
return w
# 测试函数
def Test(w, b, datamat, labelmat, alphas, kTup):
print("testing:")
m, n = np.shape(datamat)
svInd = np.nonzero(alphas.A > 0)[0]
print(svInd)
sVs = datamat[svInd]
labelsVs = labelmat[svInd]
print("sVs_shape:", sVs.shape)
print("there are %d Support Vectors" %(len(svInd)))
errorcount = 0
for i in range(m):
kernelEval = keanel_func(sVs, datamat[i], kTup)
#print(kernelEval)
predict = kernelEval.T * np.multiply(labelsVs, alphas[svInd]) + b
print(np.sign(predict), labelmat[i])
if(np.sign(predict) != labelmat[i]):
errorcount += 1
print("the training error rate is:%f" %(float(errorcount)/m))
# 定义一个 plotData 函数,输入参数是 数据 X 和标志 flag: y,返回作图操作 plt, p1, p2 , 目的是为了画图
def plotData(X, y):
# 找到标签为1和0的索引组成数组赋给pos 和 neg
pos = np.where(y == 1)
neg = np.where(y == -1)
p1 = plt.scatter(X[pos, 0].tolist(), X[pos, 1].tolist(), marker='s', color='red')
p2 = plt.scatter(X[neg, 0].tolist(), X[neg, 1].tolist(), marker='o', color='green')
return p1,p2
# 画出决策边界
def plotfig_svm(xmat, ymat, kTup, alphas):
x_min = xmat.min() - 0.5
x_max = xmat.max() + 0.5
y_min = xmat.min() - 0.5
y_max = ymat.max() + 0.5
# 定义步长
stride = 0.01
# 由np.arrange生成一维数组作为np.meshgrid的参数,返回xx矩阵,yy矩阵
x_med = np.arange(x_min, x_max, stride)
y_med = np.arange(y_min, y_max, stride)
xx, yy = np.meshgrid(x_med, y_med)
# .ravel()方法将xx,yy矩阵压缩为一维向量;np.c_:是按行连接两个矩阵,就是把两矩阵左右相加,要求行数相等
# 合成的矩阵作为pred_func的输入,返回预测值
mat = np.c_[xx.ravel(), yy.ravel()]
print(mat.shape)
m1 = np.shape(mat)[0]
Z = np.mat(np.zeros((m1, 1)))
svInd = np.nonzero(alphas.A > 0)[0]
sVs = xmat[svInd]
labelsVs = ymat[svInd]
for i in range(m1):
kernelEval = keanel_func(sVs, mat[i], kTup)
# print(kernelEval)
predict = kernelEval.T * np.multiply(labelsVs, alphas[svInd]) + b
Z[i] = np.sign(predict)
Z = (Z.T).reshape(xx.shape)
p1, p2 = plotData(xmat, ymat)
p3 = plt.contour(xx, yy, Z, levels=0, linewidths=2)
plt.savefig("kernel_svm.png")
plt.show()
if __name__ == "__main__":
dataList, labelList = loadDataset("testSetRBF.txt")
dataMat = np.mat(dataList)
labelMat = np.mat(labelList).T
print(labelMat.shape)
# 设置核函数有关参数
k1 = 0.3
ktup = ('rbf', k1)
b, alphas = smo_full(dataMat, labelMat, 200, 0.0001, 10000, ktup)
w = calculate_w(dataMat, labelMat, alphas)
Test(w, b, dataMat, labelMat, alphas, ktup)
plotfig_svm(dataMat, labelMat, ktup, alphas)
运行代码:
可以看到,我们找到了一个非线性边界成功划分了正负样本。