Python教程

python通过Canal进行数据监控后的数据缓存设计

本文主要是介绍python通过Canal进行数据监控后的数据缓存设计,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

python通过Canal进行数据监控后的数据缓存设计

使用场景

本次做了个人脸识别项目,分为编码阶段和检测阶段。编码阶段将输入人脸进行编码,存到Mysql数据库中。检测阶段读取数据库全量数据,将输入的检测图像进行人脸编码与数据库中编码好的数据进行比对。

由于编码数据可能有增删改的操作,且批量较大,所以在本地做了数据缓存用于编码对比。并通过CanalMysql数据库进行行为监测,将本地缓存数据与数据库操作行为数据进行合并,构成新的本地缓存数据。

Canal客户端

在服务中单独启了一个Canal客户端线程,每秒捕获Mysql行为。在根据增删改,构造不同的字典用于做缓存数据合并。

from canal.client import Client
from canal.protocol import EntryProtocol_pb2

from utils.data_buffer import DataBuffer

df_buffer = DataBuffer()


class CanalUtils:
    def __init__(self):
        # 建立与canal服务端的连接
        self.client = Client()
        self.client.connect(host='127.0.0.1', port=11111)  # canal服务端部署的主机IP与端口
        self.client.check_valid(username=b'', password=b'')  # 自行填写配置的数据库账户密码
        self.client.subscribe(client_id=b'1001', destination=b'example', filter='test.va_face_feature_copy1')

    def get_canal_change(self):
        while True:
            message = self.client.get(100)
            # entries是每个循环周期内获取到数据集
            entries = message['entries']
            for entry in entries:
                entry_type = entry.entryType
                if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN,
                                  EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
                    continue
                row_change = EntryProtocol_pb2.RowChange()
                row_change.MergeFromString(entry.storeValue)
                header = entry.header

                event_type = header.eventType
                # row是binlog解析出来的行变化记录,一般有三种格式,对应增删改
                for row in row_change.rowDatas:
                    new_data_dict = {}
                    # 删除行为
                    if event_type == EntryProtocol_pb2.EventType.DELETE:
                        for column in row.beforeColumns:
                            new_data_dict[column.name] = column.value
                    # 增加行为
                    elif event_type == EntryProtocol_pb2.EventType.INSERT:
                        for column in row.afterColumns:
                            new_data_dict[column.name] = column.value
                    # 更改行为
                    else:
                        for column in row.afterColumns:
                            new_data_dict[column.name] = column.value
                    df_buffer.write_data(event_type, new_data_dict)
            time.sleep(1)

其中DataBuffer是数据缓存类,后面再说。

单独写了个线程类,用于启动Canal客户端。

class CanalThread(threading.Thread):
    def run(self) -> None:
        canal_util = CanalUtils()
        canal_util.get_canal_change()

数据缓存类DataBuffer

DataBuffer其实就两个行为,一个是拉取Canal客户端去写数据。一个是读数据。这个服务是个并发读取的服务,可能多个接口同时去读。

在架构的带领下我写了两级缓存。第一级是有个face_feature_change去记录行为。第二级是face_feature,就是最大的全量数据。

数据一共分为三个部分:

  1. 本地落盘的数据:用于服务重启时防止数据丢失。
  2. Canal拉取的行为数据:记录数据库操作行为,同时相当于做了一级缓存。
  3. 全量数据:每次编码对比时用的都是这个数据。

读数据

读数据的过程很简单,以防写数据的时候同时去读,设置了全量数据缓存。

在处理数据后全局缓存的数据指针指到新的全量数据上。

为了防止指针切换时读取数据为None,使用了重试机制。

def get_data(self):
    # 重试机制
    for i in range(3):
        if self.data is None:
            print('数据正在写入中,进行重试')
            time.sleep(1)
        else:
            return self.data
    return None

写数据

写数据的流程复杂了一些

  1. 读取face_feature_change数据
  2. face_feature_change进行抵消处理,减少数据量
  3. 将抵消后的数据与原数据进行合并处理
  4. 数据落盘
  5. 切换DataBuffer指针,指向合并后的数据
def write_data(self, event_type, new_data_dict):
    # 读取本地缓存数据
    df_change = pd.read_pickle(r'utils/face_feature_change.pkl')
    new_dict = new_data_dict
    new_dict['change_status'] = event_type
    # 增加本次数据
    df_change = df_change.append(new_data_dict, ignore_index=True)
    # 修改数据筛选
    df_temp = self.get_change(df_change)
    # 修改数据和原数据合并
    df = self.get_df_union(df_temp)
    # 落盘
    if df_change.shape[0] > 1000:
        df.to_pickle('utils/face_feature.pkl')
        df_change = pd.DataFrame(columns=df.columns)
    df_change.to_pickle('utils/face_feature_change.pkl')

    # 指针转移
    self.data = df

读取face_feature_change数据

根据Canal对行为的定义,我也通过变量change_status定义了三个行为,INSERT=1;UPDATE=2;DELETE=3

每次使用的时候,先把行为数据放到face_feature_change中,face_feature_change对这些数据进行行为抵消处理。

比如:先增加了一条id=123的数据,后面又给做了修改。那就可以认为新增了一条内容为修改后的数据。

比如:先增加了一条id=123的数据,后面又给删了,那就可以不理会这个id数据的操作。

处理这些抵消行为后,在与全量数据做合并,就能节约很多效率。不然的话每有一条行为就要去动全量数据,修改效率很慢的。

def get_change(self, df_change):
    df_new = pd.DataFrame(columns=df_change.columns)
    unique_id_list = np.unique(df_change['face_feature_id'])
    for id in unique_id_list:
        df = df_change[df_change['face_feature_id'] == id]
        # 只有一条,该咋干就咋干
        if df.shape[0] == 1:
            df_new = df_new.append(df.loc[df.index.values[0]])
        # 有多条就得判断状态了
        else:
            start_status = int(df.loc[df.index.values[0]]['change_status'])
            end_status = int(df.loc[df.index.values[-1]]['change_status'])
            # 如果最后的状态是删除,判断是删又增又删,还是增/改又删
            if end_status == 3:
                # 1.增/改了又删,就当它没发生过
                # 2.先删最后又删,就是删之前的
                if start_status == end_status:
                    df_new = df_new.append(df.loc[df.index.values[-1]])
            # 如果最后的状态是修改
            elif end_status == 2:
                # 1.修改了新增加的数据,相当于增加了修改后的值.
                # 2.无论是删了又增又修改还是修改又修改,都是修改之前的数据
                if start_status == 1:
                    df['change_status'].loc[df.index.values[-1]] = start_status
                df_new = df_new.append(df.loc[df.index.values[-1]])
            # 如果最后的状态是新增
            else:
                # 1.可能是改/删了增了,相当于修改之前的数据
                # 2.可能是增了又删了又增,就是增加之前没有的数据
                if start_status != end_status:
                    df['change_status'].loc[df.index.values[-1]] = 2
                df_new = df_new.append(df.loc[df.index.values[-1]])
    return df_new

合并face_featureface_feature_change数据

读取本地的face_feature数据,与face_feature_change数据合并,构成新的全量数据。

由于face_feature_change已经对新增加的行为数据进行了抵消处理,最后的行为状态只有三种:INSERT=1;UPDATE=2;DELETE=3

那么对于face_feature,考虑一些效率问题,使用先删除,再修改,最后新增的方式。

def get_df_union(self, df_temp):
    df_old = pd.read_pickle(r'utils/face_feature.pkl')
    # 删除
    if df_temp[df_temp['change_status'] == 3].shape[0] > 0:
        df_old = df_old.drop(
            df_old[df_old['face_feature_id'].isin(
                df_temp[df_temp['change_status'] == 3]['face_feature_id'].values)].index)
    # 修改
    df_update = df_temp[df_temp['change_status'] == 2]
    if df_update.shape[0] > 0:
        # df_update = df_update.drop(['change_status'], axis=1)
        for face_feature_id in df_update['face_feature_id']:
            index = df_old[df_old['face_feature_id'] == face_feature_id].index
            for column in df_old.columns:
                df_old.loc[index, column] = df_update[df_update['face_feature_id'] == face_feature_id][column]
    # 新增
    df_insert = df_temp[df_temp['change_status'] == 1]
    if df_insert.shape[0] > 0:
        df_insert = df_insert.drop(['change_status'], axis=1)
        df_old = df_old.append(df_insert, ignore_index=True)

    return df_old

pandaspickle数据格式

picklepython的一种序列化本地格式。由于是序列化方式,df.to_pickle()中是没有index这个参数的,它必须要带序列化。

序列化的好处主要在于写入效率上,由于有了序列化,可以在落盘写入时只写入改动信息,效率显著提高。

在读取效率方面,pickle也是优于csv的。

但是pickle落盘更占磁盘空间。

pandas中使用pickle好像只能在python3.8以上版本才可以使用。为此在那个网络有问题的服务器上重新搞了个python3.8的虚拟环境还废了一些力气。

这篇关于python通过Canal进行数据监控后的数据缓存设计的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!