密度峰值聚类(Density peaks clustering, DPC)来自Science上Clustering by fast search and find of density peaks. 2014.数据挖掘课大作业中读到了它。再整理自大作业的研究实验报告,分享到博客。
有了以上的量,就能做出一个以ρ为横轴,δ为纵轴的图(决策图,desision graph)。簇心的密度比其周围的点高,ρ大;簇心距离其他密度大的数据点相对更远,δ大。因此都大的就被看做簇心了。图中右上角两个点。
import numpy as np import matplotlib.pyplot as plt def get_distance_matrix(datas): n = np.shape(datas)[0] distance_matrix = np.zeros((n, n)) for i in range(n): for j in range(n): v_i = datas[i, :] v_j = datas[j, :] distance_matrix[i, j] = np.sqrt(np.dot((v_i - v_j), (v_i - v_j))) return distance_matrix def select_dc(distance_matrix): n = np.shape(distance_matrix)[0] distance_array = np.reshape(distance_matrix, n * n) percent = 2.0 / 100 position = int(n * (n - 1) * percent) dc = np.sort(distance_array)[position + n] return dc def get_local_density(distance_matrix, dc, method=None): n = np.shape(distance_matrix)[0] rhos = np.zeros(n) for i in range(n): if method is None: rhos[i] = np.where(distance_matrix[i, :] < dc)[0].shape[0] - 1 else: pass return rhos def get_deltas(distance_matrix, rhos): n = np.shape(distance_matrix)[0] deltas = np.zeros(n) nearest_neighbor = np.zeros(n) rhos_index = np.argsort(-rhos) for i, index in enumerate(rhos_index): if i == 0: continue higher_rhos_index = rhos_index[:i] deltas[index] = np.min(distance_matrix[index, higher_rhos_index]) nearest_neighbors_index = np.argmin(distance_matrix[index, higher_rhos_index]) nearest_neighbor[index] = higher_rhos_index[nearest_neighbors_index].astype(int) deltas[rhos_index[0]] = np.max(deltas) return deltas, nearest_neighbor def find_k_centers(rhos, deltas, k): rho_and_delta = rhos * deltas centers = np.argsort(-rho_and_delta) return centers[:k] def density_peal_cluster(rhos, centers, nearest_neighbor): k = np.shape(centers)[0] if k == 0: print("Can't find any center") return n = np.shape(rhos)[0] labels = -1 * np.ones(n).astype(int) for i, center in enumerate(centers): labels[center] = i rhos_index = np.argsort(-rhos) for i, index in enumerate(rhos_index): if labels[index] == -1: labels[index] = labels[int(nearest_neighbor[index])] return labels def generate_gauss_datas(): first_group = np.random.normal(20, 1.2, (100, 2)) second_group = np.random.normal(10, 1.2, (100, 2)) third_group = np.random.normal(15, 1.2, (100, 2)) datas = [] for i in range(100): datas.append(first_group[i]) datas.append(second_group[i]) datas.append(third_group[i]) datas = np.array(datas) return datas def draw_decision(datas, rhos, deltas): n = np.shape(datas)[0] for i in range(n): plt.scatter(rhos[i], deltas[i], s=16, color=(0, 0, 0)) plt.annotate(str(i), xy=(rhos[i], deltas[i]), xytext=(rhos[i], deltas[i])) plt.xlabel('local density-ρ') plt.ylabel('minimum distance to higher density points-δ') plt.show() def main(): datas = generate_gauss_datas() distance_matrix = get_distance_matrix(datas) dc = select_dc(distance_matrix) rhos = get_local_density(distance_matrix, dc) deltas, nearest_neighbor = get_deltas(distance_matrix, rhos) centers = find_k_centers(rhos, deltas, 3) labels = density_peal_cluster(rhos, centers, nearest_neighbor) draw_decision(datas, rhos, deltas) plt.cla() fig, ax = plt.subplots() for i in range(300): if labels[i] == 0: ax.scatter(datas[i, 0], datas[i, 1], facecolor='C0', edgecolors='k') elif labels[i] == 1: ax.scatter(datas[i, 0], datas[i, 1], facecolor='yellow', edgecolors='k') elif labels[i] == 2: ax.scatter(datas[i, 0], datas[i, 1], facecolor='red', edgecolors='k') plt.show() if __name__ == '__main__': main()
def get_distance_matrix(datas): n = np.shape(datas)[0] distance_matrix = np.zeros((n, n)) for i in range(n): for j in range(n): v_i = datas[i, :] # 数据点i v_j = datas[j, :] # 数据点j distance_matrix[i, j] = np.sqrt(np.dot((v_i - v_j), (v_i - v_j))) # sqrt开根号,dot点乘 return distance_matrix
def select_dc(distance_matrix): n = np.shape(distance_matrix)[0] distance_array = np.reshape(distance_matrix, n * n) # 把距离矩阵排成(像list一样的)一排,便于后面直接选出dc的位置 percent = 1.8 / 100 position = int(n * (n - 1) * percent) # -1是减掉它自己 dc = np.sort(distance_array)[position + n] # 这里选出dc return dc
3、算局部密度(local density),这里是直接对每个点周围距离小于dc的点进行计数。
def get_local_density(distance_matrix, dc, method=None): n = np.shape(distance_matrix)[0] rhos = np.zeros(n) for i in range(n): if method is None: rhos[i] = np.where(distance_matrix[i, :] < dc)[0].shape[0] - 1 else: pass return rhos
def get_deltas(distance_matrix, rhos): n = np.shape(distance_matrix)[0] deltas = np.zeros(n) nearest_neighbor = np.zeros(n) rhos_index = np.argsort(-rhos) # 得到密度ρ从大到小的排序的索引 for i, index in enumerate(rhos_index): # i是序号,index是rhos_index[i],是第i大的ρ的索引,这里第0大是最大的。 # index是点的索引,rhos[index]和deltas[index]是第index个点的ρ和δ if i == 0: continue higher_rhos_index = rhos_index[:i] # 对于i,比这个点密度更大的点的索引号是rhos_index[:i] deltas[index] = np.min(distance_matrix[index, higher_rhos_index]) # 在index这一行比它ρ大的点中选最小的距离 nearest_neighbors_index = np.argmin(distance_matrix[index, higher_rhos_index]) # distance_matrix第index行里面,higher_rhos_index这些列,中的值里面最小的,的索引(在higher_rhos_index中的索引) nearest_neighbor[index] = higher_rhos_index[nearest_neighbors_index].astype(int) deltas[rhos_index[0]] = np.max(deltas) return deltas, nearest_neighbor
def find_k_centers(rhos, deltas, k): rho_and_delta = rhos * deltas centers = np.argsort(-rho_and_delta) return centers[:k]
def density_peal_cluster(rhos, centers, nearest_neighbor): k = np.shape(centers)[0] if k == 0: print("Can't find any center") return n = np.shape(rhos)[0] labels = -1 * np.ones(n).astype(int) # 给刚刚找出来的簇心编号0, 1, 2, 3 ...... for i, center in enumerate(centers): labels[center] = i # 再将每个点编上与其最近的高密度点相同的编号 rhos_index = np.argsort(-rhos) for i, index in enumerate(rhos_index): if labels[index] == -1: labels[index] = labels[int(nearest_neighbor[index])] return labels