用python建立kd树,然后实现knn算法,数据集为白酒品质
机器学习课程第二次作业part1,程序借鉴了很多别人的内容,自己目前还是个菜鸡,入门机器学习,道阻且长!
白酒数据集下载地址
(不知道为什么正确率很低,无奈~~)
import numpy as np
import pandas as pd
from random import *
from sklearn.model_selection import train_test_split
import time
class Node(object):
def __init__(self, values=None, label=None, split=None, parent=None, left_child=None, right_child=None):
self.values = values # 结点值
self.label = label # 结点标签
self.split = split # 进行分割的维度
self.parent = parent # 父节点
self.left_child = left_child # 左子树
self.right_child = right_child # 右子树
class Kd_Tree(object):
def __init__(self, values, labels):
self.__length = 0 #kd树结点的数目
self.root = self.__create(values, labels) # kd树的结点
def __create(self, values, labels, parent = None): # 建树
m, n = values.shape #m为行数(样本数目),n为列数(特征数目)
labels = labels.reshape(m, 1)
if m == 0:
return None # 空(子)树
var_list = [np.var(values[:, column]) for column in range(n)] # 计算每一列的方差
pos = var_list.index(max(var_list)) # 找到方差最大的列的下标,作为分割的维度
index_list = values[:, pos].argsort() # 获得将该列的特征值升序排列的下标序列
median = index_list[m//2] # 找到中位数的下标
if m ==1:
self.__length += 1
return Node(values=values[median], label=labels[median], split=pos,
parent=parent, left_child=None, right_child=None) # 样本数为1时,返回自身
node = Node(values=values[median], label=labels[median], split=pos,
parent=parent, left_child=None, right_child=None) #生成一个结点
# 建立有序的子树
left_tree_values = values[index_list[:m//2]] # 左子树的所有值
left_tree_labels = labels[index_list[:m//2]] # 左子树的所有标签
left_child = self.__create(left_tree_values, left_tree_labels, node)
if m == 2:
right_child = None #只有左子树,没有右子树
else:
right_tree_values = values[index_list[m//2+1:]] # 右子树的所有值
right_tree_labels = labels[index_list[m//2+1:]] # 右子树的所有标签
right_child = self.__create(right_tree_values, right_tree_labels, node)
# 左右子树递归调用自己,返回子树的根结点
node.left_child = left_child
node.right_child = right_child
self.__length += 1
return node
def transfer_list(self, root_node, kd_list=[]):
# 将kd树存储的信息放入一个列表中嵌套字典的列表中
if root_node is None:
return None # 空树没有信息
element_dict = {
'values': tuple(root_node.values), 'label': root_node.label[0], 'split': root_node.split,
'parent': tuple(root_node.parent.values) if root_node.parent else None,
'left_child': tuple(root_node.left_child.values) if root_node.left_child else None,
'right_child': tuple(root_node.right_child.values) if root_node.right_child else None
} # 每个字典中存放的信息
kd_list.append(element_dict)
# 递归放入每个结点的信息
self.transfer_list(root_node.left_child, kd_list)
self.transfer_list(root_node.right_child, kd_list)
return kd_list
def find_nearest_neighbour(self, values): # 在kd树中找到离样本点最近的点
values = np.array(values)
if self.__length == 0:
return None # 空树
node = self.root # 从根结点开始
if self.__length == 1:
return node # 只有一个结点的kd树
while True:
cur_split = node.split # 建树时进行划分的维度
# 待判样本在这一维度的值等于当前结点在这一维度的值
if values[cur_split] == node.values[cur_split]:
return node
# 待判样本在这一维度的值小于当前结点在这一维度的值
elif values[cur_split] < node.values[cur_split]: #
if node.left_child == None:
return node # 左子树为空,则返回当前结点
node = node.left_child # 左子树不空则进入左子树继续寻找
# 待判样本在这一维度的值大于当前结点在这一维度的值
else:
if node.right_child == None:
return node # 右子树为空,则返回当前结点
node = node.right_child # 右子树不空则进入右子树继续寻找
def knn(self, values, k):
# 找到距离测试样本最近的前k个样本
# k: knn算法参数,定义需要参考的最近点数量
# values: 待判样本的特征值
# return: 返回前k个样本的最大分类标签
if self.__length <= k:
label_dict = {
}
# 将各个标签值出现的次数存入字典
for item in self.transfer_list(self.root):
if item['label'] in label_dict:
label_dict[item['label']] += 1
else:
label_dict[item['label']] = 1
# 将标签按出现的次数降序排列
sorted_label = sorted(label_dict.items(), key=lambda item: item[1], reverse=True) # 按次数降序排序
return sorted_label[0][0] # 返回前k个样本中出现次数最多的标签
values = np.array(values)
node = self.find_nearest_neighbour(values) # 找到距离待判样本最近的结点
if node == None:
return None # 空树
node_list = [] # 用来存放距离待判样本最近的k个点的信息,会不断更新
distance = np.sqrt(sum((values - node.values) ** 2)) # 测试点与最近点之间的距离(采用欧氏距离)
largest_dis = distance # 更新当前node_list中距离待判样本最远的距离
# 将选入结点和待判样本的距离、结点的值和标签信息存入node_list
node_list.append([distance, tuple(node.values), node.label[0]])
# 如果当前结点不是叶节点,则说明它还有左子树
if node.left_child != None:
left_child = node.left_child
left_dis = np.sqrt(sum((values - left_child.values) ** 2)) # 计算距离
# 结点列表还有剩余或者新的距离小于节点列表中最大的距离
if k > len(node_list) or left_dis < largest_dis:
# 将结点存入节点列表篇
node_list.append([left_dis, tuple(left_child.values), left_child.label[0]])
node_list.sort() # 将结点列表按距离升序排列
# 更新节点列表中的最大距离
largest_dis = node_list[-1][0] if k >= len(node_list) else node_list[k - 1][0]
# 不停回到上一层结点
while True:
if node == self.root:
break #如果回到根结点则循环结束
parent = node.parent
par_dis = np.sqrt(sum((values - parent.values) ** 2))
# 计算父节点和测试点的距离,和节点列表中最大距离进行比较
if k > len(node_list) or par_dis < largest_dis: # 结点列表有剩余或者新的距离小于最大距离
node_list.append([par_dis, tuple(parent.values), parent.label[0]])
node_list.sort()
# 更新最大距离
largest_dis = node_list[-1][0] if k >= len(node_list) else node_list[k - 1][0]
# 判断待判样本到当前结点分割线的距离与当前节点列表中的最大距离的大小
if k > len(node_list) or abs(values[parent.split] - parent.values[parent.split]) < largest_dis:
# 当前结点有其他分支
other_child = parent.left_child if parent.left_child != node else parent.right_child
if other_child != None:
if values[parent.split] - parent.values[parent.split] <= 0:
# 测试点在该子结点超平面的左侧
self.left_search(values, other_child, node_list, k)
else:
# 测试点在该子结点超平面的右侧
self.right_search(values, other_child, node_list, k)
# 否则返回上一层
node = parent
label_dict = {
}
node_list = node_list[:k]
# 将各个标签出现的次数存入字典
for item in node_list:
if item[2] in label_dict:
label_dict[item[2]] += 1
else:
label_dict[item[2]] = 1
# 将标签按出现的次数降序排列
sorted_label = sorted(label_dict.items(), key=lambda item: item[1], reverse=True)
# 返回出现次数最多的标签和距离待判样本最近额k个结点的列表
return sorted_label[0][0], node_list
def left_search(self, values, node, nodeList, k):
nodeList.sort() # 对结点列表按距离排序
largest_dis = nodeList[-1][0] if k >= len(nodeList) else nodeList[k - 1][0]
if node.left_child == None and node.right_child == None: # 叶结点
dis = np.sqrt(sum((values - node.values) ** 2))
if k > len(nodeList) or dis < largest_dis:
nodeList.append([dis, tuple(node.values), node.label[0]])
return
self.left_search(values, node.left_child, nodeList, k)
# 每次进行比较前都更新nodelist数据
nodeList.sort() # 对结点列表按距离排序
largest_dis = nodeList[-1][0] if k >= len(nodeList) else nodeList[k - 1][0]
# 比较根结点
dis = np.sqrt(sum((values - node.values) ** 2))
if k > len(nodeList) or dis < largest_dis:
nodeList.append([dis, tuple(node.values), node.label[0]])
# 右子树
if k > len(nodeList) or abs(values[node.split] - node.values[node.split]) < largest_dis: # 需要搜索右子树
if node.right_child != None:
self.left_search(values, node.right_child, nodeList, k)
return nodeList
def right_search(self, values, node, nodeList, k):
nodeList.sort() # 对结点列表按距离排序
largest_dis = nodeList[-1][0] if k >= len(nodeList) else nodeList[k - 1][0]
if node.left_child == None and node.right_child == None: # 叶结点
dis = np.sqrt(sum((values - node.values) ** 2))
if k > len(nodeList) or dis < largest_dis:
nodeList.append([dis, tuple(node.values), node.label[0]])
return
if node.right_child != None:
self.right_search(values, node.right_child, nodeList, k)
nodeList.sort() # 对结点列表按距离排序
largest_dis = nodeList[-1][0] if k >= len(nodeList) else nodeList[k - 1][0]
# 比较根结点
dis = np.sqrt(sum((values - node.values) ** 2))
if k > len(nodeList) or dis < largest_dis:
nodeList.append([dis, tuple(node.values), node.label[0]])
# 左子树
if k > len(nodeList) or abs(values[node.split] - node.values[node.split]) < largest_dis: # 需要搜索左子树
self.right_search(values, node.left_child, nodeList, k)
return nodeList
# 获取待判样本的预测标签
def pre_label(neighbors):
# 按照离待判样本最近的k个结点中出现的次数最多的标签作为待判样本的预测标签
classVotes = {
}
for x in range(len(neighbors)):
response = neighbors[x][-1]
if response in classVotes:
classVotes[response] += 1
else:
classVotes[response] = 1
sortedVotes = sorted(classVotes.items(), key=lambda item: item[1], reverse=True)
return sortedVotes[0][0]
def pre_accuracy(dataArray, labelArray, k):
X_train, X_test, y_train, y_test = train_test_split(dataArray, labelArray, test_size=0.3, random_state=42)
# print(X_train)
# print(y_train)
# print(X_test)
# print(y_test)
# 利用训练集的数据建kd树
kd_tree_part = Kd_Tree(X_train, y_train)
n = 0
num = len(y_test)
# 用建好的kd树和knn算法进行预测,打印预测的准确率
for i in range(len(y_test)):
label, node_list = kd_tree_part.knn(X_test[i], k)
label_pre = pre_label(node_list)
if label_pre == y_test[i]:
n += 1
print("预测的准确率为:{:.2f}%".format((n / num) * 100))
if __name__ == "__main__":
wine_data = pd.read_csv("D:\\学习\\机器学习\\作业\\第二次作业\\winequality-white.csv", sep=';') #数据集的存储位置
wine_data = np.array(wine_data)
dataArray = wine_data[:, 0:11] # 第0到第10列作为特征值
labelArray = wine_data[:, 11] # 第11列为酒的品质,作为标签值
t1 = time.perf_counter()
kd_tree_all = Kd_Tree(dataArray, labelArray)
# kd_list = kd_tree.transfer_list(kd_tree.root)
t2 = time.perf_counter()
label, node_list = kd_tree_all.knn(dataArray[1024], k=3)
print('点%s的最接近的前3个点为:%s' % (dataArray[1024], node_list))
label_pre = pre_label(node_list)
t3 = time.perf_counter()
print('预测点%s的标签为:%s' % (dataArray[1024], label_pre))
print('建立kd树耗时:{:.4f}s'.format(t2 - t1))
print('knn算法耗时:{:.4f}s'.format(t3 - t2))
pre_accuracy(dataArray, labelArray, 3)
P.S.不能保证程序的正确性,希望不会误人子弟。