密度峰值聚类(Density peaks clustering, DPC)来自Science上Clustering by fast search and find of density peaks. 2014.数据挖掘课大作业中读到了它。再整理自大作业的研究实验报告,分享到博客。
分为三个部分,先是基本原理,然后写代码实现,然后是浅浅写一些问题和优化。
这个算法的核心是基于两个假设:①簇心的密度比其周围的点高②簇心距离其他密度大的数据点相对更远。
于是我们只需要基于以上的假设找到簇心,再把其余的点分配到最近的簇心即可。
要达到这个目的,我们需要计算以下几个量:
有了以上的量,就能做出一个以ρ为横轴,δ为纵轴的图(决策图,desision graph)。簇心的密度比其周围的点高,ρ大;簇心距离其他密度大的数据点相对更远,δ大。因此都大的就被看做簇心了。图中右上角两个点。
也可以看出这个聚类方法的一些问题。分为四个方面,局部密度、多密度峰、分配策略、复杂度。这里先写局部密度的问题,以便对局部密度ρ有个更直觉上的认识。局部密度的度量方式没有统一的计算方式,需要根据数据集的大小选择不同的函数,而数据集大小的判断没有具体的标准。在这里,局部密度的计算依赖于dc的选择,而dc的选择只考虑了数据的全局分布,没有考虑数据集的局部性质。因此DPC在处理交叉缠绕或密度不均匀的数据集上效果不理想。
先放上完整代码,之后再解说,我这个是我大作业交的代码,里面英语基本没有用缩写,应该可读性拉满了。算法部分计算上面那些量都认真实现了,不过里面也有些地方有点混,比如要聚成几类我是手动指定的,画图部分是弄了三堆正太分布的点。
import numpy as np import matplotlib.pyplot as plt def get_distance_matrix(datas): n = np.shape(datas)[0] distance_matrix = np.zeros((n, n)) for i in range(n): for j in range(n): v_i = datas[i, :] v_j = datas[j, :] distance_matrix[i, j] = np.sqrt(np.dot((v_i - v_j), (v_i - v_j))) return distance_matrix def select_dc(distance_matrix): n = np.shape(distance_matrix)[0] distance_array = np.reshape(distance_matrix, n * n) percent = 2.0 / 100 position = int(n * (n - 1) * percent) dc = np.sort(distance_array)[position + n] return dc def get_local_density(distance_matrix, dc, method=None): n = np.shape(distance_matrix)[0] rhos = np.zeros(n) for i in range(n): if method is None: rhos[i] = np.where(distance_matrix[i, :] < dc)[0].shape[0] - 1 else: pass return rhos def get_deltas(distance_matrix, rhos): n = np.shape(distance_matrix)[0] deltas = np.zeros(n) nearest_neighbor = np.zeros(n) rhos_index = np.argsort(-rhos) for i, index in enumerate(rhos_index): if i == 0: continue higher_rhos_index = rhos_index[:i] deltas[index] = np.min(distance_matrix[index, higher_rhos_index]) nearest_neighbors_index = np.argmin(distance_matrix[index, higher_rhos_index]) nearest_neighbor[index] = higher_rhos_index[nearest_neighbors_index].astype(int) deltas[rhos_index[0]] = np.max(deltas) return deltas, nearest_neighbor def find_k_centers(rhos, deltas, k): rho_and_delta = rhos * deltas centers = np.argsort(-rho_and_delta) return centers[:k] def density_peal_cluster(rhos, centers, nearest_neighbor): k = np.shape(centers)[0] if k == 0: print("Can't find any center") return n = np.shape(rhos)[0] labels = -1 * np.ones(n).astype(int) for i, center in enumerate(centers): labels[center] = i rhos_index = np.argsort(-rhos) for i, index in enumerate(rhos_index): if labels[index] == -1: labels[index] = labels[int(nearest_neighbor[index])] return labels def generate_gauss_datas(): first_group = np.random.normal(20, 1.2, (100, 2)) second_group = np.random.normal(10, 1.2, (100, 2)) third_group = np.random.normal(15, 1.2, (100, 2)) datas = [] for i in range(100): datas.append(first_group[i]) datas.append(second_group[i]) datas.append(third_group[i]) datas = np.array(datas) return datas def draw_decision(datas, rhos, deltas): n = np.shape(datas)[0] for i in range(n): plt.scatter(rhos[i], deltas[i], s=16, color=(0, 0, 0)) plt.annotate(str(i), xy=(rhos[i], deltas[i]), xytext=(rhos[i], deltas[i])) plt.xlabel('local density-ρ') plt.ylabel('minimum distance to higher density points-δ') plt.show() def main(): datas = generate_gauss_datas() distance_matrix = get_distance_matrix(datas) dc = select_dc(distance_matrix) rhos = get_local_density(distance_matrix, dc) deltas, nearest_neighbor = get_deltas(distance_matrix, rhos) centers = find_k_centers(rhos, deltas, 3) labels = density_peal_cluster(rhos, centers, nearest_neighbor) draw_decision(datas, rhos, deltas) plt.cla() fig, ax = plt.subplots() for i in range(300): if labels[i] == 0: ax.scatter(datas[i, 0], datas[i, 1], facecolor='C0', edgecolors='k') elif labels[i] == 1: ax.scatter(datas[i, 0], datas[i, 1], facecolor='yellow', edgecolors='k') elif labels[i] == 2: ax.scatter(datas[i, 0], datas[i, 1], facecolor='red', edgecolors='k') plt.show() if __name__ == '__main__': main()
1、计算任意两点之间的欧式距离
通过i、j的双重的for循环遍历求出每个点i到其他每个点j的欧式距离。
def get_distance_matrix(datas): n = np.shape(datas)[0] distance_matrix = np.zeros((n, n)) for i in range(n): for j in range(n): v_i = datas[i, :] # 数据点i v_j = datas[j, :] # 数据点j distance_matrix[i, j] = np.sqrt(np.dot((v_i - v_j), (v_i - v_j))) # sqrt开根号,dot点乘 return distance_matrix
2、算dc
传入参数的percent指平均每个点周围距离小于dc的点的数目占所有点的数目的百分比,取1%-2%,于是根据这个参数可求出相应dc:
def select_dc(distance_matrix): n = np.shape(distance_matrix)[0] distance_array = np.reshape(distance_matrix, n * n) # 把距离矩阵排成(像list一样的)一排,便于后面直接选出dc的位置 percent = 1.8 / 100 position = int(n * (n - 1) * percent) # -1是减掉它自己 dc = np.sort(distance_array)[position + n] # 这里选出dc return dc
3、算局部密度(local density),这里是直接对每个点周围距离小于dc的点进行计数。
def get_local_density(distance_matrix, dc, method=None): n = np.shape(distance_matrix)[0] rhos = np.zeros(n) for i in range(n): if method is None: rhos[i] = np.where(distance_matrix[i, :] < dc)[0].shape[0] - 1 else: pass return rhos
传入的是二维数组满足一个条件时,返回的是数组中满足这个条件的元素的行索引和列索引。
4、算相对距离
def get_deltas(distance_matrix, rhos): n = np.shape(distance_matrix)[0] deltas = np.zeros(n) nearest_neighbor = np.zeros(n) rhos_index = np.argsort(-rhos) # 得到密度ρ从大到小的排序的索引 for i, index in enumerate(rhos_index): # i是序号,index是rhos_index[i],是第i大的ρ的索引,这里第0大是最大的。 # index是点的索引,rhos[index]和deltas[index]是第index个点的ρ和δ if i == 0: continue higher_rhos_index = rhos_index[:i] # 对于i,比这个点密度更大的点的索引号是rhos_index[:i] deltas[index] = np.min(distance_matrix[index, higher_rhos_index]) # 在index这一行比它ρ大的点中选最小的距离 nearest_neighbors_index = np.argmin(distance_matrix[index, higher_rhos_index]) # distance_matrix第index行里面,higher_rhos_index这些列,中的值里面最小的,的索引(在higher_rhos_index中的索引) nearest_neighbor[index] = higher_rhos_index[nearest_neighbors_index].astype(int) deltas[rhos_index[0]] = np.max(deltas) return deltas, nearest_neighbor
5、找聚类中心。这里是取ρ和δ乘积最大的前k个,偷懒。也可以把决策图做出来,右上角的点就是簇心了。
def find_k_centers(rhos, deltas, k): rho_and_delta = rhos * deltas centers = np.argsort(-rho_and_delta) return centers[:k]
6、聚类
def density_peal_cluster(rhos, centers, nearest_neighbor): k = np.shape(centers)[0] if k == 0: print("Can't find any center") return n = np.shape(rhos)[0] labels = -1 * np.ones(n).astype(int) # 给刚刚找出来的簇心编号0, 1, 2, 3 ...... for i, center in enumerate(centers): labels[center] = i # 再将每个点编上与其最近的高密度点相同的编号 rhos_index = np.argsort(-rhos) for i, index in enumerate(rhos_index): if labels[index] == -1: labels[index] = labels[int(nearest_neighbor[index])] return labels
这是绘制出来的决策图,和三个正太分布的聚类结果。
上文说过了。上面那个正太分布的簇的“平均每个点周围距离小于的点的数目占数据集所有点的数目的比例”(求dc的函数里面的percent)是2%,而如果变成1.8%,就变成下图了,聚类结果不理想。
如果可以在一个簇中找到两个具有较高局部密度和相对距离较大的数据点,这两个点就会被识别成两个簇心。比如下左图被聚成了下右图。优化是,改进该算法中“一个簇只有一个密度峰”的假设。比如,很多改进算法是先以各种算法进行预聚类,再通过各种方法拆分聚类结果,再通过各种方法合并拆分的结果;或者是先拆分后合并。
完成簇心的识别后,其余每个点分配到与其最近的高密度点的同一个簇。于是可能会出现下图的错误:a点周围的点的密度都不超过a点。距离a点最近的高密度点是b点,但是a点实际上不应该与b归为一簇,而应该与c点归为同一簇。而分配错了这个点,将导致更多与之相连的点被错误聚类。优化是,边分配边调错;将一步完成分配改为两步或多步;DPC算法只计算了通过点之间关系计算距离和密度,可加入更多的量来协助点的分配。
时间空间都是n²复杂度,需要很高计算成本,限制了该算法在大规模数据集上的应用。优化是,比如,用网格对象代替数据对象,减少距离计算。或者,对数据进行预划分,同时除去一些小密度的数据对象;或者通过某些方法判定除去更多的不需要参与计算的数据对象,不过需要保证损失聚类精度在可控的范围内。
随便检索都能检索到大量基于这个算法的改进算法,我写研究报告的时候主要参考了:
徐晓,丁世飞,丁玲.密度峰值聚类算法研究进展[J].软件学报,2022,33(05):1800-1816.DOI:10.13328/j.cnki.jos.006122.