ZeroMQ的python版本和C/C++版本的接口差不多,要实现一个server对N个client,异步方式,而且可以对指定的client发送消息,可以这样: server采用ROUTER方式,client采用DEALER方式,而且要自己制定client的zmq.IDENTITY(如果不指定,zmq就会自动生成一个,不好控制对制定的client发送消息。)
#!/usr/bin/python #-*-coding:utf-8-*- import time import zmq #import zhelpers context = zmq.Context() socket = context.socket(zmq.ROUTER) #server不需要指定 #socket.setsockopt_string(zmq.IDENTITY, u"desktop") socket.bind("tcp://*:5555") while True: #zhelpers.dump(socket) #这里可以打印出帧的具体内容 [address,contents]=socket.recv_multipart() print("[%s]%s "%(address,contents)) reply = "[get server reply:" + contents + "]" socket.send_multipart([address, reply]) #这里的address就可以指定客户端发消息
#!/usr/bin/python #-*-coding:utf-8-*- import zmq import sys import os import threading import ctypes import inspect class ZmqClientThread(threading.Thread): def __init__(self, func, serverIp, port, identity): threading.Thread.__init__(self) self.context = zmq.Context() self.socket = self.context.socket(zmq.DEALER) self.serverIp = serverIp self.identity = identity self.port = port self.func = func self.socket.setsockopt_string(zmq.IDENTITY, identity) #默认使用utf-8编码 #先设置IDENTITY,再connect,顺序不能颠倒 self.socket.connect( "tcp://{0}:{1}".format(serverIp, port) ) #("tcp://localhost:5555")或者("tcp://127.0.0.1:5555")都可以 #向server发送字符串 def sendMsg(self, data): if not self.socket.closed: self.socket.send(data) else: print "sock is closed,cant send message..." def run(self): self.func(self.socket) #收从server发来的字符串 def loop(socket): while True: if not socket.closed: message = socket.recv() print message else: print "sock is closed,cant receive any message..." break def main(): serverIp = "127.0.0.1" port = 5555 identity = u"client1" zmqThread = ZmqClientThread(loop, serverIp, port, identity) zmqThread.start() while(True): data = raw_input("input your data:") if data == q: print "data == q" os._exit(1) else: zmqThread.sendMsg(data) #这种方式发字符串 if __name__==__main__: main()
知识点: Context使用完,在C/C++中需要手动关闭,而python中会在垃圾回收的时候自动调用term关闭。
Close or terminate the context. This can be called to close the context by hand. If this is not called, the context will automatically be closed when it is garbage collected.
Socket也是一样,使用完后,在C/C++中需要手动关闭,而python中会在垃圾回收的时候自动调用term关闭。
Close the socket. If linger is specified, LINGER sockopt will be set prior to closing. This can be called to close the socket by hand. If this is not called, the socket will automatically be closed when it is garbage collected.
具体查看官方文档: 还有需要注意的是Router <->Dealer模式,需要客户端先发一个数据帧(空白的也可以)到服务器端,之后服务器端才能指定客户端发消息(这是一个建立路由的过程)
(这个文件是从github的一个开源项目上下载下来的,用dump可以打印出帧的具体内容,方便调试,地址:)
# encoding: utf-8 """ Helper module for example applications. Mimics ZeroMQ Guides zhelpers.h. """ from __future__ import print_function import binascii import os from random import randint import zmq def socket_set_hwm(socket, hwm=-1): """libzmq 2/3/4 compatible sethwm""" try: socket.sndhwm = socket.rcvhwm = hwm except AttributeError: socket.hwm = hwm def dump(msg_or_socket): """Receives all message parts from socket, printing each frame neatly""" if isinstance(msg_or_socket, zmq.Socket): # its a socket, call on current message msg = msg_or_socket.recv_multipart() else: msg = msg_or_socket print("----------------------------------------") for part in msg: print("[%03d]" % len(part), end= ) is_text = True try: print(part.decode(ascii)) except UnicodeDecodeError: print(r"0x%s" % (binascii.hexlify(part).decode(ascii))) def set_id(zsocket): """Set simple random printable identity on socket""" identity = u"%04x-%04x" % (randint(0, 0x10000), randint(0, 0x10000)) zsocket.setsockopt_string(zmq.IDENTITY, identity) def zpipe(ctx): """build inproc pipe for talking to threads mimic pipe used in czmq zthread_fork. Returns a pair of PAIRs connected via inproc """ a = ctx.socket(zmq.PAIR) b = ctx.socket(zmq.PAIR) a.linger = b.linger = 0 a.hwm = b.hwm = 1 iface = "inproc://%s" % binascii.hexlify(os.urandom(8)) a.bind(iface) b.connect(iface) return a,b
python版本的ZeroMQ的参考资料: 官方文档: 实例代码: 另外就是参考《ZeroMQ 云时代极速消息通信库》这本书是用C/C++写的,但是里面的模型和设置参数的方式和python版本几乎是一样的,可以用来参考,他山之石可以攻玉。