之前参与了一个横向项目,对方要求和他们的服务端对接时,我们开发的客户端必须一直保持连接,即维护一个长连接,这样服务端可以随时对我们下发控制命令。
本文主要介绍如何实现TCP的长连接维护,主要通过Python的socket模块来实现,采用的实现方式为心跳保活策略,即定期发送约定好的心跳包以维持连接不断开。
短连接指的是开启一个socket连接,收发完数据后,立刻关闭连接。我们通常使用的TCP就是这种连接方式,其示意图和工作流程如下(Client表示客户端,Server表示服务端)。
长连接指的是开启一个socket连接,多次收发数据包直到需要的时候再关闭连接。因此,需要定期发送一个不占用数据传输的心跳包来告知服务端自己的状态以维持连接。其示意图和工作流程如下(Client表示客户端,Server表示服务端)。
我们约定传输的数据采用JSON格式,且报文结尾都紧跟一个换行符以方便服务端进行解析,下面时具体的代码,为这里去掉了一些具体的业务线程,只保留了简单的心跳发送和接受服务端消息的线程,以方便理解。
import os import socket import time import threading import json from loguru import logger class TCPSocket(object): def __init__(self, size, ip, port): """ @param size: 报文上限大小 @param ip: ip地址 @param port: 端口 """ self.sk = None self.size = size self.format = "utf8" self.ip_port = (ip, port) self.logger = logger self.msg_type = ['LOGIN', 'HEART'] self.login_dict = { "code": "LOGIN", } # 心跳频率 self.heart_interval = 5 self.status_interval = 5 self.adapt_time = False # 建立socket连接 def connect(self): self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.sk.connect(self.ip_port) except Exception as e: self.logger.error("connect to server failed,prepare to reconnect", e) self.reconnect() # 重新连接 5s/次 def reconnect(self): self.logger.info("try to reconnect") while True: try: self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sk.connect(self.ip_port) self.login_send() self.logger.info('client start connect to host/port:{}'.format(self.ip_port)) break except ConnectionRefusedError: self.logger.error('socket server refused or not started, reconnect to server in 5s .... host/port:{}'.format(self.ip_port)) time.sleep(5) except Exception as e: self.logger.error('do connect error:{}'.format(str(e))) time.sleep(5) self.logger.info("reconnect successfully!!!") # 发送登录验证 def login_send(self): try: login_msg = self.build_request_json("LOGIN", **self.login_dict) self.sk.send(login_msg.encode(self.format)) self.logger.info("[tcp client] send logon message:{}".format(login_msg.replace(os.linesep, ""))) except socket.error: self.logger.info('socket error,do reconnect') time.sleep(5) except Exception as e: self.logger.error(e) time.sleep(5) def rec(self): while True: try: message = self.sk.recv(self.size) messages = self.parse_response_json(message) if messages: for msg in messages: if msg['code'] == "LOGIN": # 登录成功 self.logger.info("[tcp client] receive logon response: {}".format(msg)) elif msg['code'] == "HEART": # 收到心跳反馈 self.logger.info("[tcp client] receive heart response: {}".format(msg)) else: self.logger.warning("message queue is not supported!!!") else: self.logger.info("no message from server or messages are not valid:{}".format(messages)) except socket.error as e: self.logger.error(e) time.sleep(5) self.reconnect() except Exception as e: self.logger.error(e) time.sleep(5) # 间隔固定时间发送心跳 def heartbeats(self): while True: try: msg = self.build_request_json("HEART") self.sk.send(msg.encode(self.format)) self.logger.info("[tcp client] send heart message:{}".format(msg.replace(os.linesep, ""))) except socket.error: self.logger.error('socket error,do reconnect') self.reconnect() except Exception as e: self.logger.error('other error occur', e) time.sleep(5) self.reconnect() time.sleep(self.heart_interval) @staticmethod def build_request_json(method: str, **args) -> str: """ :param method: 该请求的方法类型 :return: 构建好的用于和服务端通信的Json数据 """ if method == "LOGIN": json_data = { "code": "LOGIN", } elif method == "HEART": json_data = { "code": "HEART", } else: print("this method {} is not supported now!!!".format(method)) json_data = None return json.dumps(json_data) + os.linesep if json_data else None def parse_response_json(self, data: bytes): msgs = [] try: data_list = data.decode(self.format).split(os.linesep) data_list = list(filter(lambda x: x.strip().startswith("{"), data_list)) for msg in data_list: msg = json.loads(msg) if msg['code'] in self.msg_type: msgs.append(msg) return msgs except Exception as e: self.logger.error(e) return None if __name__ == '__main__': socket1 = TCPSocket(1024, "127.0.0.1", 5433) socket1.connect() socket1.login_send() t1 = threading.Thread(target=socket1.rec) t2 = threading.Thread(target=socket1.heartbeats) t1.start() t2.start()
服务端的代码正常实现即可,这里就不贴了。
本文简单介绍了如何使用Python实现基于心跳保活的TCP长连接。