1、Kmean算法分类原理
Kmean算法
-
可以对
无标签的样本
做聚类(按特征空间分布)- 1、假设样本聚为3族类,随机选择3个点(随机从样本空间选择3个样本点作为聚类中心)
- 2、计算所有样本点到这3个聚类中心的距离,根据距离大小划分属于哪个聚类
- 3、根据得到的聚类重新计算聚类中心
- 4、如果#3计算的聚类中心与当前聚类中心的误差值大于阈值(自己设定的误差),则更新聚类中心
- 5、重复2,3,4步,当
迭代次数超限
或者聚类中心误差小于阈值
则停止迭代,便得到最终的聚类
-
如果
训练样本是有标签
的,可以按标签聚类- 1、按标签聚类,有多少个标签就分成几个聚类
- 2、计算每个聚类的
中心
(或密度重心
) - 3、新样本直接和每个聚类中心计算距离,离哪个中心近就分成哪个类别(这就是Kmean用于带标签数据的分类)
2、实践
KMean分类(样本有标签)
"""
Author:wucng
Time: 20200108
Summary: 使用Kmean算法对iris数据分类 (数据有标签)
数据下载:https://archive.ics.uci.edu/ml/datasets.php
源代码: https://github.com/wucng/MLAndDL
"""
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler,MinMaxScaler
# from sklearn.neighbors import KNeighborsRegressor,KNeighborsClassifier
from sklearn.metrics import accuracy_score,auc
import pandas as pd
import numpy as np
import os
import time
import pickle
# 1.加载数据集(并做预处理)
def loadData(dataPath: str) -> tuple:
# 如果有标题可以省略header,names ;sep 为数据分割符
df = pd.read_csv(dataPath, sep=",", header=-1,
names=["sepal_length", "sepal_width", "petal_length", "petal_width", "label"])
# 填充缺失值
df = df.fillna(0)
# 数据量化
# 文本量化
df.replace("Iris-setosa", 0, inplace=True)
df.replace("Iris-versicolor", 1, inplace=True)
df.replace("Iris-virginica", 2, inplace=True)
# 划分出特征数据与标签数据
X = df.drop("label", axis=1) # 特征数据
y = df.label # or df["label"] # 标签数据
# 数据归一化
X = (X - np.min(X, axis=0)) / (np.max(X, axis=0) - np.min(X, axis=0))
# 使用sklearn方式
# X = MinMaxScaler().transform(X)
# 查看df信息
# df.info()
# df.describe()
return (X.to_numpy(), y.to_numpy())
class KMeanClassifier():
"""默认使用欧式距离"""
def __init__(self, X_train: np.asarray, y_train: np.asarray,
savefile="./model.ckpt"):
self.X_train = X_train
self.y_train = y_train
self.savefile = savefile
if not os.path.exists(savefile):
self.__calClassCenter()
self.data = pickle.load(open(self.savefile,"rb"))
# 2.训练样本按标签聚类,计算每个类的中心
def __calClassCenter(self):
# 按类别建立一个dict
dataset={}
for x,y in zip(self.X_train,self.y_train):
if y not in dataset:
dataset[y]=[]
dataset[y].append(x)
# 计算每个类别的中心
data = {}
center = []
labels = []
for label in dataset:
# data[label]=np.mean(np.asarray(dataset[label]),0)
labels.append(label)
center.append(np.mean(np.asarray(dataset[label]),0))
# center.append(np.median(np.asarray(dataset[label]),0))
data["label"] = labels
data["center"] = center
# 将这个dict保存,下次就可以不用再重新建立(节省时间)
pickle.dump(data,open(self.savefile,"wb"))
# return data
# 3.预测样本
def predict(self,X_test: np.asarray)->np.asarray:
labels = np.asarray(self.data["label"])
center = np.asarray(self.data["center"])
result_dist = np.zeros([len(X_test), len(center)])
for i, data in enumerate(X_test):
data = np.tile(data, (len(center), 1))
distance = np.sqrt(np.sum((data - center) ** 2, -1))
result_dist[i] = distance
# 距离从小到大排序获取索引
result_index = np.argsort(result_dist, -1)
# 将索引替换成对应的标签,取距离最小对应的类别
y_pred = labels[result_index][...,0]
return y_pred
# 4.计算精度信息
def accuracy(self,y_true,y_pred)->float:
return round(np.sum(y_pred == y_true) / len(y_pred),5)
if __name__ =="__main__":
dataPath = "../../dataset/iris.data"
X,y = loadData(dataPath)
# print(X.shape,y.shape) # (150, 4) (150,)
# 划分训练集与测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size = 0.2, random_state = 42)
start = time.time()
clf = KMeanClassifier(X_train,y_train)
y_pred = clf.predict(X_test)
acc = clf.accuracy(y_test,y_pred)
print("cost time:%.6f(s) acc:%.3f" % (time.time() - start, acc))
# cost time:0.000984(s) acc:0.967
KMean聚类(样本无标签)
"""
Author:wucng
Time: 20200108
Summary: 使用Kmean算法对iris数据聚类
数据下载:https://archive.ics.uci.edu/ml/datasets.php
源代码: https://github.com/wucng/MLAndDL
"""
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler,MinMaxScaler
# from sklearn.neighbors import KNeighborsRegressor,KNeighborsClassifier
from sklearn.cluster import KMeans
from sklearn.metrics import accuracy_score,auc
import pandas as pd
import numpy as np
import os
import time
import pickle
import random
from tqdm import tqdm
import matplotlib.pyplot as plt
# 1.加载数据集(并做预处理)
def loadData(dataPath: str) -> tuple:
# 如果有标题可以省略header,names ;sep 为数据分割符
df = pd.read_csv(dataPath, sep=",", header=-1,
names=["sepal_length", "sepal_width", "petal_length", "petal_width", "label"])
# 填充缺失值
df = df.fillna(0)
# 数据量化
# 文本量化
df.replace("Iris-setosa", 0, inplace=True)
df.replace("Iris-versicolor", 1, inplace=True)
df.replace("Iris-virginica", 2, inplace=True)
# 划分出特征数据与标签数据
X = df.drop("label", axis=1) # 特征数据
y = df.label # or df["label"] # 标签数据
# 数据归一化
X = (X - np.min(X, axis=0)) / (np.max(X, axis=0) - np.min(X, axis=0))
# 使用sklearn方式
# X = MinMaxScaler().transform(X)
# 查看df信息
# df.info()
# df.describe()
return (X.to_numpy(), y.to_numpy())
class KMeanCluster():
"""默认使用欧式距离"""
def __init__(self,n_clusters=3, max_iter=300, error=1e-4, random_state=None,
savefile="./center.npy"):
self.n_clusters = n_clusters
self.max_iter = max_iter
self.error = error
self.random_state = random_state
self.savefile = savefile
def __calClassCenter(self,centers:np.asarray,X:np.asarray,isReturnPred:bool=False)->np.asarray:
labels = np.arange(0,len(centers))
result_dist = np.zeros([len(X), len(centers)])
for i, data in enumerate(X):
data = np.tile(data, (len(centers), 1))
distance = np.sqrt(np.sum((data - centers) ** 2, -1))
result_dist[i] = distance
# 距离从小到大排序获取索引
result_index = np.argsort(result_dist, -1)
# 将索引替换成对应的标签,取距离最小对应的类别
y_pred = labels[result_index][..., 0]
if isReturnPred:
return y_pred
# 按类别建立一个dict
dataset = {}
for x,y in zip(X,y_pred):
if y not in dataset:
dataset[y] = []
dataset[y].append(x)
# 计算每个类别的中心
center = []
for label in labels:
center.append(np.mean(np.asarray(dataset[label]), 0))
return np.asarray(center)
# 构建聚类
def __fit_transform(self, X, y=None, sample_weight=None):
# 1.随机选择聚类中心
random.seed(self.random_state)
centers = np.asarray(random.choices(X,k=self.n_clusters))
# tqdm_bar = tqdm(range(self.max_iter))
# for i in tqdm_bar:
for i in range(self.max_iter):
# 2.根据聚类中心计算每个样本属于哪个聚类,再更新聚类中心
new_centers = self.__calClassCenter(centers,X)
# 计算新的聚类中心与原来的中心之间的误差
error = np.sum((new_centers-centers)**2)/len(centers)
print("step:%s\terror:%f\tmin_error:%f" % (i, error, self.error))
if error > self.error:
# 更新聚类中心
centers = new_centers
else: # 停止迭代
break
# tqdm_bar.set_description_str("step:%s\terror:%.5f"%(i,error))
# self.centers = centers
# 保存
np.save(self.savefile,centers)
# return centers
def fit_transform(self, X, y=None, sample_weight=None):
if not os.path.exists(self.savefile):
self.__fit_transform(X)
self.centers = np.load(self.savefile)
def predict(self, X, sample_weight=None):
return self.__calClassCenter(self.centers,X,True)
if __name__ =="__main__":
dataPath = "../../dataset/iris.data"
X,y = loadData(dataPath)
# print(X.shape,y.shape) # (150, 4) (150,)
# 划分训练集与测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42)
clf = KMeanCluster(n_clusters=3, random_state=9,error=1e-7)
clf.fit_transform(X)
y_pred = clf.predict(X)
plt.subplot(131)
plt.scatter(X[:,0],X[:,2],c=y)
# plt.legend(y.tolist(), loc = 'upper right')
plt.title("origin")
plt.subplot(132)
plt.scatter(X[:, 0], X[:, 2], c=y_pred)
# plt.legend(y_pred.tolist(), loc='upper right')
plt.title("custom kmean")
# -------------------------------------------------------------
# sklearn的KMeans
y_pred = KMeans(n_clusters=3, random_state=9,tol=1e-7).fit_predict(X)
plt.subplot(133)
plt.scatter(X[:, 0], X[:, 2], c=y_pred)
# plt.legend(y_pred.tolist(), loc='upper right')
plt.title("sklearn kmean")
plt.show()
3、KMean实现图像压缩
思路:
- 1、假设图像的shape [h,w,c] -->reshape [h*w,c] 记为
img_arr
- 2、从
img_arr
随机选取m条数据[m,c] 做KMean聚类 - 3、使用#2得到的KMean模型对
img_arr
预测每条数据的类别 - 4、重构图像,根据#3得到的类别,选择对应类别的
center
(#2 Kmean得到的)作为该像素值
"""
Author:wucng
Time: 20200109
Summary: 使用Kmean算法实现图像压缩
源代码: https://github.com/wucng/MLAndDL
"""
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler,MinMaxScaler
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import os
import time
class KMeanCompress(object):
def __init__(self,num_samples,n_clusters=8,random_state=None):
self.num_samples = num_samples # 采样数量,选择多少条来建立KMean模型
self.n_clusters = n_clusters
self.random_state= random_state
def fit(self,X:np.array):
np.random.seed(self.random_state)
data = X.copy()
np.random.shuffle(data)
data = data[...,:self.num_samples]
# Kmean建模
self.kmean = KMeans(n_clusters=self.n_clusters,random_state=self.random_state).fit(data)
def predict(self,X:np.array):
self.labels = self.kmean.predict(X)
def compress(self,h:int,w:int,c:int)->np.array:
new_img = np.zeros([h,w,c])
for i in range(h):
for j in range(w):
index = j + i*w
new_img[i,j,:] = self.kmean.cluster_centers_[self.labels[index]]
return new_img
if __name__=="__main__":
img = Image.open("../../dataset/test.jpg").convert("RGB").resize((224,224))
img.show()
img = np.array(img)/255.
h,w,c = img.shape
img_arr = np.reshape(img, (h*w, c))
clf = KMeanCompress(500,64,9)
clf.fit(img_arr)
clf.predict(img_arr)
new_img = clf.compress(h,w,c)
Image.fromarray(np.clip(new_img*255,0,255).astype(np.uint8)).show()