本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Producer 概念。
目录
本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Producer 概念。
下面使用如下代码来进行说明。
本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。
def main(arguments): hub = Hub() exchange = Exchange('asynt_exchange') queue = Queue('asynt_queue', exchange, 'asynt_routing_key') def send_message(conn): producer = Producer(conn) producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key') print('message sent') def on_message(message): print('received: {0!r}'.format(message.body)) message.ack() # hub.stop() # <-- exit after one message conn = Connection('redis://localhost:6379') conn.register_with_event_loop(hub) def p_message(): print(' kombu ') with Consumer(conn, [queue], on_message=on_message): send_message(conn) hub.timer.call_repeatedly(3, p_message) hub.run_forever() if __name__ == '__main__': sys.exit(main(sys.argv[1:]))
前文已经完成了构建部分,Consumer部分,下面来到了Producer部分,即如下代码:
def send_message(conn): producer = Producer(conn) producer.publish('hello world', exchange=exchange, routing_key='asynt') print('message sent')
我们知道,Transport需要把Channel与文件信息联系起来,但是此时Transport信息如下,文件信息依然没有,这是我们以后需要留意的:
transport = {Transport}Channel = {type}Cycle = {type}Management = {type}channel_max = {int} 65535 channels = {list: 2} [,] client = {Connection}cycle = {MultiChannelPoller}after_read = {set: 0} set() eventflags = {int} 25 fds = {dict: 0} {} poller = {_poll}default_connection_params = {dict: 2} {'port': 6379, 'hostname': 'localhost'} default_port = {int} 6379 driver_name = {str} 'redis' driver_type = {str} 'redis' implements = {Implements: 3} {'asynchronous': True, 'exchange_type': frozenset({'direct', 'topic', 'fanout'}), 'heartbeats': False} manager = {Management}polling_interval = {NoneType} None state = {BrokerState}
Producer中,主要变量是:
但是本文示例没有传入exchange,这就有些奇怪,我们需要继续看看。
class Producer: """Message Producer. Arguments: channel (kombu.Connection, ChannelT): Connection or channel. exchange (kombu.entity.Exchange, str): Optional default exchange. routing_key (str): Optional default routing key. """ #: Default exchange exchange = None #: Default routing key. routing_key = '' #: Default serializer to use. Default is JSON. serializer = None #: Default compression method. Disabled by default. compression = None #: By default, if a defualt exchange is set, #: that exchange will be declare when publishing a message. auto_declare = True #: Basic return callback. on_return = None #: Set if channel argument was a Connection instance (using #: default_channel). __connection__ = None
init代码如下。
def __init__(self, channel, exchange=None, routing_key=None, serializer=None, auto_declare=None, compression=None, on_return=None): self._channel = channel self.exchange = exchange self.routing_key = routing_key or self.routing_key self.serializer = serializer or self.serializer self.compression = compression or self.compression self.on_return = on_return or self.on_return self._channel_promise = None if self.exchange is None: self.exchange = Exchange('') if auto_declare is not None: self.auto_declare = auto_declare if self._channel: self.revive(self._channel)
这里有个重要转换。
但是 exchange 依然没有意义,是 direct 类型。
代码如下:
def revive(self, channel): """Revive the producer after connection loss.""" if is_connection(channel): connection = channel self.__connection__ = connection channel = ChannelPromise(lambda: connection.default_channel) if isinstance(channel, ChannelPromise): self._channel = channel self.exchange = self.exchange(channel) else: # Channel already concrete self._channel = channel if self.on_return: self._channel.events['basic_return'].add(self.on_return) self.exchange = self.exchange(channel)
此时变量为:
producer = {Producer} auto_declare = {bool} True channel = {Channel}compression = {NoneType} None connection = {Connection}exchange = {Exchange} Exchange ''(direct) on_return = {NoneType} None routing_key = {str} '' serializer = {NoneType} None
逻辑如图:
+----------------------+ +-------------------+ | Producer | | Channel | | | | | +-----------------------------------------------------------+ | | | client +-------------> | Redis<ConnectionPool<Connection| | channel +------------------> | | +-----------------------------------------------------------+ | | | pool | | exchange | +---------> | | | connection +---------------+ | | + | | | | | | | +--+-------------------+ | | +-------------------+ | | | | | | v | | | | | +-------------------+ +---+-----------------+ +--------------------+ | | | | | | Connection | | redis.Transport | | MultiChannelPoller | | | +----------------------> | | | | | | | | | | | | | | | _channels +--------+ | | | | | | cycle +------------> | _fd_to_chan | | | | | transport +---------> | | | _chan_to_sock | | +-------->+ | | | | | +------+ poller | | | | +-------------------+ +---------------------+ | | after_read | | | | | | | | | | | +--------------------+ | | | +------------------+ +---------------+ | | | | Hub | | | | | | | v | | | | | +------+------+ | | | | poller +---------------> | _poll | | publish | | | | | | +-------+ +--------------------------------+ | | | _poller+---------> | poll | | | | +------------------+ | | +-------+ | | | +-------------+ +-------------------+ | +-----> +----------------+ | Queue | | | | Exchange | | _channel | +---------+ | | | | | | | exchange +--------------------> | channel | | | | | | | | | +-------------------+ +----------------+
手机如图:
发送消息是通过producer.publish完成。
def send_message(conn): producer = Producer(conn) producer.publish('hello world', exchange=exchange, routing_key='asynt') print('message sent')
此时传入exchange作为参数。原来如果没有 Exchange,是可以在这里进行补救。
producer.publish继续调用到如下,可以看到分为两步:
prepare_message
;basic_publish
;因此,最终发送消息还是通过channel完成。
def _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare): channel = self.channel message = channel.prepare_message( body, priority, content_type, content_encoding, headers, properties, ) if declare: maybe_declare = self.maybe_declare [maybe_declare(entity) for entity in declare] # handle autogenerated queue names for reply_to reply_to = properties.get('reply_to') if isinstance(reply_to, Queue): properties['reply_to'] = reply_to.name return channel.basic_publish( message, exchange=exchange, routing_key=routing_key, mandatory=mandatory, immediate=immediate, )
channel 的组装消息函数prepare_message
完成组装功能,基本上是为消息添加各种属性。
def prepare_message(self, body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None): """Prepare message data.""" properties = properties or {} properties.setdefault('delivery_info', {}) properties.setdefault('priority', priority or self.default_priority) return {'body': body, 'content-encoding': content_encoding, 'content-type': content_type, 'headers': headers or {}, 'properties': properties or {}}
消息如下:
message = {dict: 5} 'body' = {str} 'aGVsbG8gd29ybGQ=' 'content-encoding' = {str} 'utf-8' 'content-type' = {str} 'text/plain' 'headers' = {dict: 0} {} __len__ = {int} 0 'properties' = {dict: 5} 'delivery_mode' = {int} 2 'delivery_info' = {dict: 2} {'exchange': 'asynt_exchange', 'routing_key': 'asynt_routing_key'} 'priority' = {int} 0 'body_encoding' = {str} 'base64' 'delivery_tag' = {str} '1b03590e-501c-471f-a5f9-f4fdcbe3379a' __len__ = {int} 5
channel的发送消息basic_publish
完成发送功能。此时使用了传入的参数exchange。
发送消息basic_publish
方法是调用_put
方法:
def basic_publish(self, message, exchange, routing_key, **kwargs): """Publish message.""" self._inplace_augment_message(message, exchange, routing_key) if exchange: return self.typeof(exchange).deliver( message, exchange, routing_key, **kwargs ) # anon exchange: routing_key is the destination queue return self._put(routing_key, message, **kwargs)
self.typeof(exchange).deliver
代码接着来到exchange。本文是DirectExchange。
注意,这里用到了self.channel._put。就是Exchange的成员变量channel。
class DirectExchange(ExchangeType): """Direct exchange. The `direct` exchange routes based on exact routing keys. """ type = 'direct' def lookup(self, table, exchange, routing_key, default): return { queue for rkey, _, queue in table if rkey == routing_key } def deliver(self, message, exchange, routing_key, **kwargs): _lookup = self.channel._lookup _put = self.channel._put for queue in _lookup(exchange, routing_key): _put(queue, message, **kwargs)
我们知道,Exchange的作用只是将发送的 routing_key
转化为 queue
的名字。这样发送就知道发到哪个 queue
。
因此依据_lookup方法得到对应的queue
。
def _lookup(self, exchange, routing_key, default=None): """Find all queues matching `routing_key` for the given `exchange`. Returns: str: queue name -- must return the string `default` if no queues matched. """ if default is None: default = self.deadletter_queue if not exchange: # anon exchange return [routing_key or default] try: R = self.typeof(exchange).lookup( self.get_table(exchange), exchange, routing_key, default, ) except KeyError: R = [] if not R and default is not None: warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format( exchange=exchange, routing_key=routing_key)), ) self._new_queue(default) R = [default] return R
此处具体逻辑为:
第一,调用到channel的方法。这里的 exchange 名字为 asynt_exchange。
def get_table(self, exchange): key = self.keyprefix_queue % exchange with self.conn_or_acquire() as client: values = client.smembers(key) if not values: raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key)) return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
我们看看Redis内容,发现集合内容如下:
127.0.0.1:6379> smembers _kombu.binding.asynt_exchange 1) "asynt_routing_key\x06\x16\x06\x16asynt_queue"
第二,因此得到对应binding为:
{b'asynt_routing_key\x06\x16\x06\x16asynt_queue'}
即从 exchange 得到 routing_key ---> queue 的规则,然后再依据 routing_key 得到 queue。就知道 Consumer 和 Producer 需要依据哪个 queue 交换消息。
逻辑如下:
+---------------------------------+ | exchange | | | 1 routing_key x | | +----------+ | | +------------+ | Producer | +-----------------> | routing_key x ---> queue x | | Consumer | +--------+-+ | | +------------+ | | routing_key y ---> queue y | | | | ^ | | routing_key z ---> queue z | | | | | | | +---------------------------------+ | | | | | | | | | | | | | | | | | | +-----------+ | | 2 message | | 3 message | +-------------------------------> | queue X | +--------------------+ | | +-----------+
channel的_put 方法被用来继续处理,可以看到其最终调用到了client.lpush。
client为:
Redis<ConnectionPool<Connection>>
代码为:
def _put(self, queue, message, **kwargs): """Deliver message.""" pri = self._get_message_priority(message, reverse=False) with self.conn_or_acquire() as client: client.lpush(self._q_for_pri(queue, pri), dumps(message))
redis怎么区别不同的queue?
实际是每个 queue 被赋予一个字符串 name,这个 name 就是 redis 对应的 list 的 key。知道应该向哪个 list 放消息,后续就是向此 list 中 lpush 消息。
如下方法完成转换功能。
def _q_for_pri(self, queue, pri): pri = self.priority(pri) if pri: return f"{queue}{self.sep}{pri}" return queue
现在发消息之后,redis内容如下,我们可以看出来,消息作为list 的item,放入到之中。
127.0.0.1:6379> lrange asynt_queue 0 -1 1) "{\"body\": \"aGVsbG8gd29ybGQ=\", \"content-encoding\": \"utf-8\", \"content-type\": \"text/plain\", \"headers\": {}, \"properties\": {\"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"asynt_exchange\", \"routing_key\": \"asynt_routing_key\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"df7af424-e1ab-4c08-84b5-1cd5c97ed25d\"}}" 127.0.0.1:6379>
现在我们总结如下:
于是逻辑链已经形成,大约是这样的:
prepare_message
为消息添加各种属性;Redis<ConnectionPool<Connection
;于是可以基于此进行redis操作;动态逻辑如下:
+------------+ +------------+ +------------+ +-----------------------+ | producer | | channel | | exchange | | Redis| +---+--------+ +----+-------+ +-------+----+ +----------+------------+ | | | | | | | | publish('', exchange, routing_key) | | | | | | | | prepare_message | | | | | | | | +----------------------------------> | | | | | | | | basic_publish (exchange, routing_key)| | | | | | | | +----------------------------------> | | | | | | | | | deliver(exchange, routing_key)| | | | | | | +-----------------------------> | | | | | | | | | | | | _lookup(exchange, routing_key) | | | | | | | | | | | _put(queue, message) | | | v | | | | | | | | | v v v v
手机如下:
celery 7 优秀开源项目kombu源码分析之registry和entrypoint
放弃pika,选择kombu
kombu消息框架
AMQP中的概念
AMQP的基本概念
深入理解AMQP协议
kombu和消息队列总结
关于epoll版服务器的理解(Python实现)