目录
学习完机器学习实战的分类回归树,简单的做个笔记。文中部分描述属于个人消化后的理解,仅供参考。
所有代码和数据可以访问 我的 github
扫描二维码关注公众号,回复: 3898548 查看本文章如果这篇文章对你有一点小小的帮助,请给个关注喔~我会非常开心的~
0. 前言
分类回归树(Classification And Regression Tree)主要用于复杂数据的回归。
- 优点:可以对复杂和非线性的数据建模
- 缺点:结果不易理解
- 适用数据类型:数值型和标称型在数据(标称型数据需映射成二值型)
在 算法中,每次选择最佳的特征分割数据,特征有几种取值,树的结点就有几棵子树,而且连续型特征需转换为离散型。选择过的特征会被筛除,不会再次选择。
在 算法中,每次选择最佳的特征分割数据,但是只进行二元切分,产生两棵子树。选择过的特征,不会被筛除,仍有可能被选择。
- 回归树:叶子结点为常数,即预测值
- 模型树:叶子结点为线性方程
1. 回归树
在 算法中,根据信息增益定义数据的混乱度。
在 算法中,根据总方差(方差乘以样本大小)定义数据的混乱度:
在分类中,叶子节点表示的是对应的类别。
在回归中,叶子节点表示的是对应的预测值,在训练模型的时候,使用数据结果的均值作为叶子节点。
创建树的伪代码如下表示:
每次选择最佳特征和特征值时,采用误差作为衡量标准,伪代码如下表示:
注:训练数据结果相同、划分后最小误差和划分前误差相差不大、划分后数据集很小,这几种情况都直接返回叶子结点,不进行划分。
2. 模型树
模型树在训练的时候,当满足返回叶子结点的条件的时候,对剩余数据进行线性拟合,返回拟合参数,所以叶子结点是线性拟合的参数。
模型树的可解释性是它优于回归树的特点,当数据由分段函数组成的时候,模型树可以更好的发挥它的作用。
在模型树中,误差的计算采用的是误差平方和:
3. 剪枝(pruning)
如果一棵树的结点过多,可能会造成过拟合,需要对树进行剪枝,去掉不必要的枝条,以降低复杂度。
- 预剪枝(prepruning):在创建树的时候,预先判断,如果会造成过于复杂,则不扩展这个枝条
- 后剪枝(postpruning):在树创建了之后,对其进行测试,如果会造成过于复杂,则剪去这个枝条
一般地,为了达到更好的剪枝效果,会同时采用两种剪枝方法。
3.1. 预剪枝
在选择最佳特征的伪代码中,划分后最小误差和划分前误差相差不大、划分后数据集很小,就直接返回叶子结点,不划分数据扩展枝条,这就是预剪枝。
预剪枝对人为设定的参数比较敏感,例如最小误差和划分前误差相差的阈值、数据集大小的阈值。
3.2. 后剪枝
在后剪枝中,将数据集分成训练集和测试集,训练集用于训练树,测试集用于剪枝。
后剪枝的思路是,从树根进行递归,直到找到左结点和右结点都为叶子结点的时候,计算误差平方和,再将两个结点合并,计算误差平方和,如果合并后误差降低,则进行剪枝。
后剪枝是从叶子结点从下往上合并,伪代码如下表示:
注:因测试数据和训练数据的不同,可能会造成还未递归到叶子结点,测试数据就无法继续划分的情况。此时采用塌陷处理,即不断递归返回结点的平均值,任一结点的平均值等于其左结点和右结点的平均。
4. 实战案例
以下将展示书中案例的代码段,所有代码和数据可以在github中下载:
4.1. 回归树
# coding:utf-8
from numpy import *
"""
回归树
"""
# 加载数据集
def loadDataSet(fileName):
dataMat = []
fr = open(fileName)
for line in fr.readlines():
curLine = line.strip().split('\t')
# 将数据映射为浮点型
fltLine = list(map(float, curLine))
dataMat.append(fltLine)
return dataMat
# 根据特征和特征值,二元分割一个数据集
def binSplitDataSet(dataSet, feature, value):
mat0 = dataSet[nonzero(dataSet[:, feature] > value)[0], :]
mat1 = dataSet[nonzero(dataSet[:, feature] <= value)[0], :]
return mat0, mat1
# 创建叶子结点时,采取所有剩余数据的标签的均值
def regLeaf(dataSet):
return mean(dataSet[:, -1])
# 计算总方差
def regErr(dataSet):
return var(dataSet[:, -1]) * shape(dataSet)[0]
# CART算法选择最佳划分点
def chooseBestSplit(dataSet, leafType=regLeaf, errType=regErr, ops=(1, 4)):
# 误差改善的最小要求
tolS = ops[0]
# 数据集大小的最小要求
tolN = ops[1]
# 如果当前数据集结果标签都是同一个值,则直接返回叶子节点
if len(set(dataSet[:, -1].T.tolist()[0])) == 1:
return None, leafType(dataSet)
m, n = shape(dataSet)
# 获取当前数据集的误差
S = errType(dataSet)
bestS = inf
bestIndex = 0
bestValue = 0
# 一重循环遍历所有特征
for featIndex in range(n - 1):
# 二重循环遍历所有特征值
for splitVal in set((dataSet[:, featIndex].T.A.tolist())[0]):
# 划分数据
mat0, mat1 = binSplitDataSet(dataSet, featIndex, splitVal)
# 如果划分后数据集太小则返回
if (shape(mat0)[0] < tolN) or (shape(mat1)[0] < tolN):
continue
# 计算新的误差
newS = errType(mat0) + errType(mat1)
# 如果新的误差小于当前最好的误差,则替换
if newS < bestS:
bestIndex = featIndex
bestValue = splitVal
bestS = newS
# 遍历结束后,如果最佳的误差与遍历之前数据集的误差改善不大,则直接返回
if (S - bestS) < tolS:
return None, leafType(dataSet)
# 划分两个数据子集
mat0, mat1 = binSplitDataSet(dataSet, bestIndex, bestValue)
# 如果两个子集太小,则直接返回
if (shape(mat0)[0] < tolN) or (shape(mat1)[0] < tolN):
return None, leafType(dataSet)
return bestIndex, bestValue
# 递归创建树
# dataSet: 数据集
# leafType: 返回叶子节点的时候引用的函数
# errType: 误差计算引用的函数
# ops: 用户定义的标准值
def createTree(dataSet, leafType=regLeaf, errType=regErr, ops=(1, 4)):
# 选择最佳的划分点
feat, val = chooseBestSplit(dataSet, leafType, errType, ops)
# 当前为叶子节点
if feat == None:
return val
# 记录当前的划分的特征和特征值
retTree = {}
retTree['spInd'] = feat
retTree['spVal'] = val
# 划分两个数据集
lSet, rSet = binSplitDataSet(dataSet, feat, val)
# 递归对两个子集创建子树
retTree['left'] = createTree(lSet, leafType, errType, ops)
retTree['right'] = createTree(rSet, leafType, errType, ops)
return retTree
# 对树进行后剪枝
# 判断是否是子树
def isTree(obj):
return (type(obj).__name__ == 'dict')
# 对树进行后剪枝
# 递归获取当前节点的均值
# 在没有测试数据的时候,对节点进行塌陷处理
def getMean(tree):
if isTree(tree['right']): tree['right'] = getMean(tree['right'])
if isTree(tree['left']): tree['left'] = getMean(tree['left'])
return (tree['left'] + tree['right']) / 2.0
# 对树进行后剪枝,算法
def prune(tree, testData):
# 如果没有测试数据了,则对树进行塌陷处理
if shape(testData)[0] == 0:
return getMean(tree)
# 如果左节点或者右节点是树,则划分测试数据集
if (isTree(tree['right']) or isTree(tree['left'])):
lSet, rSet = binSplitDataSet(testData, tree['spInd'], tree['spVal'])
# 如果左节点或者右节点是树,则递归后剪枝,直到叶子节点
if isTree(tree['left']): tree['left'] = prune(tree['left'], lSet)
if isTree(tree['right']): tree['right'] = prune(tree['right'], rSet)
# 当前左节点和右节点都为叶子节点
if not isTree(tree['left']) and not isTree(tree['right']):
# 划分测试数据集
lSet, rSet = binSplitDataSet(testData, tree['spInd'], tree['spVal'])
# 计算不合的误差
errorNoMerge = sum(power(lSet[:, -1] - tree['left'], 2)) + \
sum(power(rSet[:, -1] - tree['right'], 2))
# 计算合并的误差
treeMean = (tree['left'] + tree['right']) / 2.0
errorMerge = sum(power(testData[:, -1] - treeMean, 2))
# 如果合并后误差小,则合并
if errorMerge < errorNoMerge:
print("merging")
return treeMean
else:
return tree
else:
return tree
# 测试函数
# 返回叶子节点浮点类型值
def regTreeEval(model, inDat):
return float(model)
# 预测函数
# inData是一条数据向量矩阵
def treeForeCast(tree, inData, modelEval=regTreeEval):
# 叶子节点
if not isTree(tree):
return modelEval(tree, inData)
# 选择左子树还是右子树
if inData[tree['spInd']] > tree['spVal']:
# 判断是否是树
if isTree(tree['left']):
return treeForeCast(tree['left'], inData, modelEval)
else:
return modelEval(tree['left'], inData)
else:
if isTree(tree['right']):
return treeForeCast(tree['right'], inData, modelEval)
else:
return modelEval(tree['right'], inData)
# 预测函数测试
def createForeCast(tree, testData, modelEval=regTreeEval):
m = len(testData)
yHat = mat(zeros((m, 1)))
for i in range(m):
yHat[i, 0] = treeForeCast(tree, mat(testData[i]), modelEval)
return yHat
if __name__ == '__main__':
# myDat1 = loadDataSet('ex0.txt')
# myMat1 = mat(myDat1)
# tree1 = createTree(myMat1)
# print(tree1)
# myDat2 = loadDataSet('ex2.txt')
# myMat2 = mat(myDat2)
# tree2 = createTree(myMat2, ops=(0, 1))
# myDat2Test = loadDataSet('ex2test.txt')
# myMat2Test = mat(myDat2Test)
# tree2 = prune(tree2, myMat2Test)
# print(tree2)
trainMat = mat(loadDataSet('bikeSpeedVsIq_train.txt'))
testMat = mat(loadDataSet('bikeSpeedVsIq_test.txt'))
myTree = createTree(trainMat, ops=(1, 20))
yHat = createForeCast(myTree, testMat[:, 0])
print(corrcoef(yHat, testMat[:, 1], rowvar=0)[0, 1])
4.2. 模型树
# coding:utf-8
from numpy import *
"""
模型树
"""
# 加载数据集
def loadDataSet(fileName):
dataMat = []
fr = open(fileName)
for line in fr.readlines():
curLine = line.strip().split('\t')
# 将数据映射为浮点型
fltLine = list(map(float, curLine))
dataMat.append(fltLine)
return dataMat
# 根据特征和特征值,二元分割一个数据集
def binSplitDataSet(dataSet, feature, value):
mat0 = dataSet[nonzero(dataSet[:, feature] > value)[0], :]
mat1 = dataSet[nonzero(dataSet[:, feature] <= value)[0], :]
return mat0, mat1
# 对数据进行线性回归
def linearSolve(dataSet):
m, n = shape(dataSet)
X = mat(ones((m, n)))
Y = mat(ones((m, 1)))
# 需要x_0=1
X[:, 1:n] = dataSet[:, 0:n - 1]
Y = dataSet[:, -1]
# 正规方程
xTx = X.T * X
if linalg.det(xTx) == 0.0:
raise NameError('This matrix is singular, cannot do inverse,\n\
try increasing the second value of ops')
ws = xTx.I * (X.T * Y)
return ws, X, Y
# 创建叶子结点时,采用线性函数,即权重ws
def modelLeaf(dataSet):
ws, X, Y = linearSolve(dataSet)
return ws
# 采用误差平方和计算误差
def modelErr(dataSet):
ws, X, Y = linearSolve(dataSet)
yHat = X * ws
return sum(power(Y - yHat, 2))
# CART算法选择最佳划分点
def chooseBestSplit(dataSet, leafType=modelLeaf, errType=modelErr, ops=(1, 10)):
# 误差改善的最小要求
tolS = ops[0]
# 数据集大小的最小要求
tolN = ops[1]
# 如果当前数据集结果标签都是同一个值,则直接返回叶子节点
if len(set(dataSet[:, -1].T.tolist()[0])) == 1:
return None, leafType(dataSet)
m, n = shape(dataSet)
# 获取当前数据集的误差
S = errType(dataSet)
bestS = inf
bestIndex = 0
bestValue = 0
# 一重循环遍历所有特征
for featIndex in range(n - 1):
# 二重循环遍历所有特征值
for splitVal in set((dataSet[:, featIndex].T.A.tolist())[0]):
# 划分数据
mat0, mat1 = binSplitDataSet(dataSet, featIndex, splitVal)
# 如果划分后数据集太小则返回
if (shape(mat0)[0] < tolN) or (shape(mat1)[0] < tolN):
continue
# 计算新的误差
newS = errType(mat0) + errType(mat1)
# 如果新的误差小于当前最好的误差,则替换
if newS < bestS:
bestIndex = featIndex
bestValue = splitVal
bestS = newS
# 遍历结束后,如果最佳的误差与遍历之前数据集的误差改善不大,则直接返回
if (S - bestS) < tolS:
return None, leafType(dataSet)
# 划分两个数据子集
mat0, mat1 = binSplitDataSet(dataSet, bestIndex, bestValue)
# 如果两个子集太小,则直接返回
if (shape(mat0)[0] < tolN) or (shape(mat1)[0] < tolN):
return None, leafType(dataSet)
return bestIndex, bestValue
# 递归创建树
# dataSet: 数据集
# leafType: 返回叶子节点的时候引用的函数
# errType: 误差计算引用的函数
# ops: 用户定义的标准值
def createTree(dataSet, leafType=modelLeaf, errType=modelErr, ops=(1, 4)):
# 选择最佳的划分点
feat, val = chooseBestSplit(dataSet, leafType, errType, ops)
# 当前为叶子节点
if feat == None:
return val
# 记录当前的划分的特征和特征值
retTree = {}
retTree['spInd'] = feat
retTree['spVal'] = val
# 划分两个数据集
lSet, rSet = binSplitDataSet(dataSet, feat, val)
# 递归对两个子集创建子树
retTree['left'] = createTree(lSet, leafType, errType, ops)
retTree['right'] = createTree(rSet, leafType, errType, ops)
return retTree
# 判断是否是子树
def isTree(obj):
return (type(obj).__name__ == 'dict')
# 测试函数
# 返回叶子节点参数和数据相乘后的拟合结果
def modelTreeEval(model, inDat):
n = shape(inDat)[1]
# 因为存在x_0=1
X = mat(ones((1, n + 1)))
X[:, 1:n + 1] = inDat
return float(X * model)
# 预测函数
# inData是一条数据向量矩阵
def treeForeCast(tree, inData, modelEval=modelTreeEval):
# 叶子节点
if not isTree(tree):
return modelEval(tree, inData)
# 选择左子树还是右子树
if inData[tree['spInd']] > tree['spVal']:
# 判断是否是树
if isTree(tree['left']):
return treeForeCast(tree['left'], inData, modelEval)
else:
return modelEval(tree['left'], inData)
else:
if isTree(tree['right']):
return treeForeCast(tree['right'], inData, modelEval)
else:
return modelEval(tree['right'], inData)
# 预测函数测试
def createForeCast(tree, testData, modelEval=modelTreeEval):
m = len(testData)
yHat = mat(zeros((m, 1)))
for i in range(m):
yHat[i, 0] = treeForeCast(tree, mat(testData[i]), modelEval)
return yHat
if __name__ == '__main__':
# myMat = mat(loadDataSet('exp2.txt'))
# tree = createTree(myMat)
# print(tree)
trainMat = mat(loadDataSet('bikeSpeedVsIq_train.txt'))
testMat = mat(loadDataSet('bikeSpeedVsIq_test.txt'))
myTree = createTree(trainMat, ops=(1, 20))
yHat = createForeCast(myTree, testMat[:, 0])
print(corrcoef(yHat, testMat[:, 1], rowvar=0)[0, 1])
如果这篇文章对你有一点小小的帮助,请给个关注喔~我会非常开心的~