本次做了个人脸识别项目,分为编码阶段和检测阶段。编码阶段将输入人脸进行编码,存到Mysql
数据库中。检测阶段读取数据库全量数据,将输入的检测图像进行人脸编码与数据库中编码好的数据进行比对。
由于编码数据可能有增删改的操作,且批量较大,所以在本地做了数据缓存用于编码对比。并通过Canal
对Mysql
数据库进行行为监测,将本地缓存数据与数据库操作行为数据进行合并,构成新的本地缓存数据。
Canal
客户端在服务中单独启了一个Canal
客户端线程,每秒捕获Mysql
行为。在根据增删改,构造不同的字典用于做缓存数据合并。
from canal.client import Client from canal.protocol import EntryProtocol_pb2 from utils.data_buffer import DataBuffer df_buffer = DataBuffer() class CanalUtils: def __init__(self): # 建立与canal服务端的连接 self.client = Client() self.client.connect(host='127.0.0.1', port=11111) # canal服务端部署的主机IP与端口 self.client.check_valid(username=b'', password=b'') # 自行填写配置的数据库账户密码 self.client.subscribe(client_id=b'1001', destination=b'example', filter='test.va_face_feature_copy1') def get_canal_change(self): while True: message = self.client.get(100) # entries是每个循环周期内获取到数据集 entries = message['entries'] for entry in entries: entry_type = entry.entryType if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]: continue row_change = EntryProtocol_pb2.RowChange() row_change.MergeFromString(entry.storeValue) header = entry.header event_type = header.eventType # row是binlog解析出来的行变化记录,一般有三种格式,对应增删改 for row in row_change.rowDatas: new_data_dict = {} # 删除行为 if event_type == EntryProtocol_pb2.EventType.DELETE: for column in row.beforeColumns: new_data_dict[column.name] = column.value # 增加行为 elif event_type == EntryProtocol_pb2.EventType.INSERT: for column in row.afterColumns: new_data_dict[column.name] = column.value # 更改行为 else: for column in row.afterColumns: new_data_dict[column.name] = column.value df_buffer.write_data(event_type, new_data_dict) time.sleep(1)
其中DataBuffer
是数据缓存类,后面再说。
单独写了个线程类,用于启动Canal
客户端。
class CanalThread(threading.Thread): def run(self) -> None: canal_util = CanalUtils() canal_util.get_canal_change()
DataBuffer
DataBuffer
其实就两个行为,一个是拉取Canal
客户端去写数据。一个是读数据。这个服务是个并发读取的服务,可能多个接口同时去读。
在架构的带领下我写了两级缓存。第一级是有个face_feature_change
去记录行为。第二级是face_feature
,就是最大的全量数据。
数据一共分为三个部分:
Canal
拉取的行为数据:记录数据库操作行为,同时相当于做了一级缓存。读数据的过程很简单,以防写数据的时候同时去读,设置了全量数据缓存。
在处理数据后全局缓存的数据指针指到新的全量数据上。
为了防止指针切换时读取数据为None,使用了重试机制。
def get_data(self): # 重试机制 for i in range(3): if self.data is None: print('数据正在写入中,进行重试') time.sleep(1) else: return self.data return None
写数据的流程复杂了一些
face_feature_change
数据face_feature_change
进行抵消处理,减少数据量DataBuffer
指针,指向合并后的数据def write_data(self, event_type, new_data_dict): # 读取本地缓存数据 df_change = pd.read_pickle(r'utils/face_feature_change.pkl') new_dict = new_data_dict new_dict['change_status'] = event_type # 增加本次数据 df_change = df_change.append(new_data_dict, ignore_index=True) # 修改数据筛选 df_temp = self.get_change(df_change) # 修改数据和原数据合并 df = self.get_df_union(df_temp) # 落盘 if df_change.shape[0] > 1000: df.to_pickle('utils/face_feature.pkl') df_change = pd.DataFrame(columns=df.columns) df_change.to_pickle('utils/face_feature_change.pkl') # 指针转移 self.data = df
face_feature_change
数据根据Canal
对行为的定义,我也通过变量change_status
定义了三个行为,INSERT=1;UPDATE=2;DELETE=3
。
每次使用的时候,先把行为数据放到face_feature_change
中,face_feature_change
对这些数据进行行为抵消处理。
比如:先增加了一条id=123
的数据,后面又给做了修改。那就可以认为新增了一条内容为修改后的数据。
比如:先增加了一条id=123
的数据,后面又给删了,那就可以不理会这个id
数据的操作。
处理这些抵消行为后,在与全量数据做合并,就能节约很多效率。不然的话每有一条行为就要去动全量数据,修改效率很慢的。
def get_change(self, df_change): df_new = pd.DataFrame(columns=df_change.columns) unique_id_list = np.unique(df_change['face_feature_id']) for id in unique_id_list: df = df_change[df_change['face_feature_id'] == id] # 只有一条,该咋干就咋干 if df.shape[0] == 1: df_new = df_new.append(df.loc[df.index.values[0]]) # 有多条就得判断状态了 else: start_status = int(df.loc[df.index.values[0]]['change_status']) end_status = int(df.loc[df.index.values[-1]]['change_status']) # 如果最后的状态是删除,判断是删又增又删,还是增/改又删 if end_status == 3: # 1.增/改了又删,就当它没发生过 # 2.先删最后又删,就是删之前的 if start_status == end_status: df_new = df_new.append(df.loc[df.index.values[-1]]) # 如果最后的状态是修改 elif end_status == 2: # 1.修改了新增加的数据,相当于增加了修改后的值. # 2.无论是删了又增又修改还是修改又修改,都是修改之前的数据 if start_status == 1: df['change_status'].loc[df.index.values[-1]] = start_status df_new = df_new.append(df.loc[df.index.values[-1]]) # 如果最后的状态是新增 else: # 1.可能是改/删了增了,相当于修改之前的数据 # 2.可能是增了又删了又增,就是增加之前没有的数据 if start_status != end_status: df['change_status'].loc[df.index.values[-1]] = 2 df_new = df_new.append(df.loc[df.index.values[-1]]) return df_new
face_feature
和face_feature_change
数据读取本地的face_feature
数据,与face_feature_change
数据合并,构成新的全量数据。
由于face_feature_change
已经对新增加的行为数据进行了抵消处理,最后的行为状态只有三种:INSERT=1;UPDATE=2;DELETE=3
。
那么对于face_feature
,考虑一些效率问题,使用先删除,再修改,最后新增的方式。
def get_df_union(self, df_temp): df_old = pd.read_pickle(r'utils/face_feature.pkl') # 删除 if df_temp[df_temp['change_status'] == 3].shape[0] > 0: df_old = df_old.drop( df_old[df_old['face_feature_id'].isin( df_temp[df_temp['change_status'] == 3]['face_feature_id'].values)].index) # 修改 df_update = df_temp[df_temp['change_status'] == 2] if df_update.shape[0] > 0: # df_update = df_update.drop(['change_status'], axis=1) for face_feature_id in df_update['face_feature_id']: index = df_old[df_old['face_feature_id'] == face_feature_id].index for column in df_old.columns: df_old.loc[index, column] = df_update[df_update['face_feature_id'] == face_feature_id][column] # 新增 df_insert = df_temp[df_temp['change_status'] == 1] if df_insert.shape[0] > 0: df_insert = df_insert.drop(['change_status'], axis=1) df_old = df_old.append(df_insert, ignore_index=True) return df_old
pandas
的pickle
数据格式pickle
是python
的一种序列化本地格式。由于是序列化方式,df.to_pickle()
中是没有index
这个参数的,它必须要带序列化。
序列化的好处主要在于写入效率上,由于有了序列化,可以在落盘写入时只写入改动信息,效率显著提高。
在读取效率方面,pickle
也是优于csv
的。
但是pickle
落盘更占磁盘空间。
在pandas
中使用pickle
好像只能在python3.8
以上版本才可以使用。为此在那个网络有问题的服务器上重新搞了个python3.8
的虚拟环境还废了一些力气。