本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Consumer 概念。
目录
本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Consumer 概念。
Consumer 的作用主要如下:
在具体的实现中,Consumer 把 queue 与 channel 联系起来。queue 里面有一个 channel,用来访问redis。Queue 也有 Exchange,知道访问具体 redis 哪个key(就是queue对应的那个key)。即 Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel。
所以服务端的逻辑大致为:
下面使用如下代码来进行说明。
本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。
def main(arguments): hub = Hub() exchange = Exchange('asynt_exchange') queue = Queue('asynt_queue', exchange, 'asynt_routing_key') def send_message(conn): producer = Producer(conn) producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key') print('message sent') def on_message(message): print('received: {0!r}'.format(message.body)) message.ack() # hub.stop() # <-- exit after one message conn = Connection('redis://localhost:6379') conn.register_with_event_loop(hub) def p_message(): print(' kombu ') with Consumer(conn, [queue], on_message=on_message): send_message(conn) hub.timer.call_repeatedly(3, p_message) hub.run_forever() if __name__ == '__main__': sys.exit(main(sys.argv[1:]))
前文已经完成了构建部分,下面来到了Consumer部分,即如下代码:
with Consumer(conn, [queue], on_message=on_message): send_message(conn) hub.timer.call_repeatedly( 3, p_message ) hub.run_forever()
Consumer主要成员变量如下:
这也是调用时传入的变量。
class Consumer: """Message consumer. Arguments: channel (kombu.Connection, ChannelT): see :attr:`channel`. queues (Sequence[kombu.Queue]): see :attr:`queues`. no_ack (bool): see :attr:`no_ack`. auto_declare (bool): see :attr:`auto_declare` callbacks (Sequence[Callable]): see :attr:`callbacks`. on_message (Callable): See :attr:`on_message` on_decode_error (Callable): see :attr:`on_decode_error`. prefetch_count (int): see :attr:`prefetch_count`. """ #: The connection/channel to use for this consumer. channel = None #: A single :class:`~kombu.Queue`, or a list of queues to #: consume from. queues = None #: Flag for automatic message acknowledgment. no_ack = None #: List of callbacks called in order when a message is received. callbacks = None #: Optional function called whenever a message is received. on_message = None #: List of accepted content-types. accept = None #: Initial prefetch count prefetch_count = None #: Mapping of queues we consume from. _queues = None _tags = count(1) # global
我们也给出 Queue 的定义,其中主要成员变量如下:
具体定义如下:
class Queue(MaybeChannelBound): """A Queue declaration. channel (ChannelT): The channel the Queue is bound to (if bound). """ ContentDisallowed = ContentDisallowed name = '' exchange = Exchange('') routing_key = '' durable = True exclusive = False auto_delete = False no_ack = False attrs = ( ('name', None), ('exchange', None), ('routing_key', None), ('queue_arguments', None), ('binding_arguments', None), ('consumer_arguments', None), ('durable', bool), ('exclusive', bool), ('auto_delete', bool), ('no_ack', None), ('alias', None), ('bindings', list), ('no_declare', bool), ('expires', float), ('message_ttl', float), ('max_length', int), ('max_length_bytes', int), ('max_priority', int) )
在此方法中,先处理调用,随之建立联系。
def __init__(self, channel, queues=None, no_ack=None, auto_declare=None, callbacks=None, on_decode_error=None, on_message=None, accept=None, prefetch_count=None, tag_prefix=None): self.channel = channel self.queues = maybe_list(queues or []) self.no_ack = self.no_ack if no_ack is None else no_ack self.callbacks = (self.callbacks or [] if callbacks is None else callbacks) self.on_message = on_message self.tag_prefix = tag_prefix self._active_tags = {} self.accept = prepare_accept_content(accept) self.prefetch_count = prefetch_count if self.channel: self.revive(self.channel)
传入的参数queues被作为成员变量保存起来。
self.queues = maybe_list(queues or [])
传入的参数Connection被作为成员变量保存起来。
self.channel = channel
传入的参数on_message 作为消息响应方法保存起来。
self.on_message = on_message
用如下方法把 Exchange,Queue 与 Connection 联系起来。
def revive(self, channel): """Revive consumer after connection loss.""" self._active_tags.clear() channel = self.channel = maybe_channel(channel) # modify dict size while iterating over it is not allowed for qname, queue in list(self._queues.items()): # name may have changed after declare self._queues.pop(qname, None) queue = self._queues[queue.name] = queue(self.channel) queue.revive(channel) if self.auto_declare: self.declare() if self.prefetch_count is not None: self.qos(prefetch_count=self.prefetch_count)
进一步调用:
when_bound, entity.py:598 maybe_bind, abstract.py:76 bind, abstract.py:70 bind, entity.py:590 __call__, abstract.py:66 revive, messaging.py:400 __init__, messaging.py:382 main, testUb.py:46, testUb.py:55
由此进入到了Queue类。
这里用如下方法把queue与channel联系起来。queue 里面有一个 channel,用来访问redis,Queue 也有 Exchange,知道访问具体 redis 哪里。
每一个 Consumer 初始化的时候都是和 Channel 绑定的,也就是说我们 Consumer 包含了 Queue 也就和 Connection 关联起来了!
Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel。
channel = {Channel}self = {Queue}-> asynt bound to chan:1>
这样,conneciton就是queue的成员变量。
def revive(self, channel): """Revive channel after the connection has been re-established. """ if self.is_bound: self._channel = channel self.when_bound()
之前我们知道,Queue是包括了exchange成员变量,目前channel也是exchange的成员变量。
Exchange:交换机,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。
于是经由如下方法,准备把channel与exchange联系起来。
def when_bound(self): if self.exchange: self.exchange = self.exchange(self.channel)
此时变量如下:
channel = {Channel}self = {Exchange} Exchange asynt(direct)
进而直接在Exchange基类,使用方法maybe_bind把channel与exchange联系起来。
class MaybeChannelBound(Object): """Mixin for classes that can be bound to an AMQP channel.""" _channel = None def __call__(self, channel): """`self(channel) -> self.bind(channel)`.""" return self.bind(channel) def bind(self, channel): """Create copy of the instance that is bound to a channel.""" return copy(self).maybe_bind(channel) def maybe_bind(self, channel): """Bind instance to channel if not already bound.""" if not self.is_bound and channel: self._channel = maybe_channel(channel) self.when_bound() self._is_bound = True return self
这里会把 Exchange 和 queue 联系。就是把 Exchange 和 routing_key 联系起来,然后把这些联系规则放到redis 之中。
堆栈如下:
_queue_bind, redis.py:814 queue_bind, base.py:568 bind_to, entity.py:674 queue_bind, entity.py:662 _create_queue, entity.py:617 declare, entity.py:606 declare, messaging.py:417 revive, messaging.py:404 __init__, messaging.py:382
具体为
class Queue(MaybeChannelBound): def __init__(self, name='', exchange=None, routing_key='', channel=None, bindings=None, on_declared=None, **kwargs): super().__init__(**kwargs) self.name = name or self.name if isinstance(exchange, str): self.exchange = Exchange(exchange) elif isinstance(exchange, Exchange): self.exchange = exchange self.routing_key = routing_key or self.routing_key self.bindings = set(bindings or []) self.on_declared = on_declared # allows Queue('name', [binding(...), binding(...), ...]) if isinstance(exchange, (list, tuple, set)): self.bindings |= set(exchange) if self.bindings: self.exchange = None # exclusive implies auto-delete. if self.exclusive: self.auto_delete = True self.maybe_bind(channel) def queue_bind(self, nowait=False, channel=None): """Create the queue binding on the server.""" return self.bind_to(self.exchange, self.routing_key, self.binding_arguments, channel=channel, nowait=nowait) def bind_to(self, exchange='', routing_key='', arguments=None, nowait=False, channel=None): if isinstance(exchange, Exchange): exchange = exchange.name return (channel or self.channel).queue_bind( queue=self.name, exchange=exchange, routing_key=routing_key, arguments=arguments, nowait=nowait, )
具体调用到Channel,代码位于 kombu/transport/redis.py。
def _queue_bind(self, exchange, routing_key, pattern, queue): if self.typeof(exchange).type == 'fanout': # Mark exchange as fanout. self._fanout_queues[queue] = ( exchange, routing_key.replace('#', '*'), ) with self.conn_or_acquire() as client: client.sadd(self.keyprefix_queue % (exchange,), self.sep.join([routing_key or '', pattern or '', queue or '']))
代码然后调用到redis client。
# SET COMMANDS def sadd(self, name, *values): "Add ``value(s)`` to set ``name``" return self.execute_command('SADD', name, *values)
具体变量如下,我们代码中,exchange内容为_kombu.binding.asynt_exchange。routing_key的是asynt_routing_key。
name = {str} '_kombu.binding.asynt_exchange' self = {Redis} Redis<ConnectionPool<Connection>> values = {tuple: 1} asynt_routing_keysynt_queue
我们看看Redis内容,发现新建内容如下:
127.0.0.1:6379> smembers _kombu.binding.asynt_exchange 1) "asynt_routing_key\x06\x16\x06\x16asynt_queue"
集合名字为:self.keyprefix_queue % (exchange,), 对于我们就为:_kombu.binding.asynt_exchange
。
集合每个item为:routing_key + sep + pattern + sep + queue
。我们这里sep = '\x06\x16'。
当发消息时候,Exchange的作用是将发送的 routing_key
转化为 queue
的名字。这样发送就知道发到哪个 queue
。这里的 exchange 内容为 _kombu.binding.asynt_exchange。
def get_table(self, exchange): key = self.keyprefix_queue % exchange with self.conn_or_acquire() as client: values = client.smembers(key) if not values: raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key)) return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
得到的集合内容为:
{b'asynt_routing_key\x06\x16\x06\x16asynt_queue'}
即从 exchange 得到 routing_key ---> queue 的规则,然后再依据 routing_key 得到 queue。就知道 Consumer 和 Producer 需要依据哪个 queue 交换消息。
逻辑如下:
+---------------------------------+ | exchange | | | 1 routing_key x | | +----------+ | | +------------+ | Producer | +-----------------> | routing_key x ---> queue x | | Consumer | +--------+-+ | | +------------+ | | routing_key y ---> queue y | | | | ^ | | routing_key z ---> queue z | | | | | | | +---------------------------------+ | | | | | | | | | | | | | | | | | | +-----------+ | | 2 message | | 3 message | +-------------------------------> | queue X | +--------------------+ | | +-----------+
因此,此时总体逻辑如下图:
+----------------------+ +-------------------+ | Consumer | | Channel | | | | | +-----------------------------------------------------------+ | | | client +-------------> | Redis<ConnectionPool<Connection| | channel +--------------------> | | +-----------------------------------------------------------+ | | | pool | | | +---------> | | | connection +---------------+ | | | | | | | | | | +----------------------+ | | +-------------------+ | | | | | v | | | | +-------------------+ +---+-----------------+ +--------------------+ | | | | | Connection | | redis.Transport | | MultiChannelPoller | | | | | | | | | | | | | | | | | | | | _channels +--------+ | | | | | | cycle +------------> | _fd_to_chan | | | | | transport +---------> | | | _chan_to_sock | | +-------->+ | | | | | +------+ poller | | | | +-------------------+ +---------------------+ | | after_read | | | | | | | | | | | +--------------------+ | | | +------------------+ +---------------+ | | | | Hub | | | | | | | v | | | | | +------+------+ | | | | poller +---------------> | _poll | | | | | | | | +-------+ | | | | | | _poller+---------> | poll | v | | +------------------+ | | +-------+ | | +-------------+ +-------------------+ | +----------------+ | Queue | | | | Exchange | | _chann+l | +----+ | | | | | | | exchange +----------------> | channel | | | | | | | | | +-------------------+ +----------------+
手机如下:
现在我们知道:
于是逻辑链已经形成,大约是这样的,后文完善:
routing_key
转化为 queue
的名字。逻辑大致通了,但是缺少动态操作完成此逻辑,我们将在后续完善动态逻辑。
在init之后,第二步会完善联系。
python的上下文管理。在python中实现了__enter__和__exit__方法,即支持上下文管理器协议。上下文管理器就是支持上下文管理器协议的对象,它是为了with而生。当with语句在开始运行时,会在上下文管理器对象上调用 enter 方法。with语句运行结束后,会在上下文管理器对象上调用 exit 方法。
所以这里是调用__enter__
,即 consumer 函数,其目的如下:
Channel
将Consumer
标签,Consumer
要消费的队列,以及标签与队列的映射关系都记录下来,等待循环调用。Transport
将队列与回调函数列表的映射关系记录下来,以便于从队列中取出消息后执行回调函数。class Consumer: """Message consumer. Arguments: channel (kombu.Connection, ChannelT): see :attr:`channel`. queues (Sequence[kombu.Queue]): see :attr:`queues`. no_ack (bool): see :attr:`no_ack`. auto_declare (bool): see :attr:`auto_declare` callbacks (Sequence[Callable]): see :attr:`callbacks`. on_message (Callable): See :attr:`on_message` on_decode_error (Callable): see :attr:`on_decode_error`. prefetch_count (int): see :attr:`prefetch_count`. """ def __enter__(self): self.consume() return self
使用_basic_consume
方法处理Consumer相关的队列列表中的每一项,其中处理最后一个Queue时设置标志nowait=False
。
def consume(self, no_ack=None): """Start consuming messages. Can be called multiple times, but note that while it will consume from new queues added since the last call, it will not cancel consuming from removed queues ( use :meth:`cancel_by_queue`). Arguments: no_ack (bool): See :attr:`no_ack`. """ queues = list(self._queues.values()) if queues: no_ack = self.no_ack if no_ack is None else no_ack H, T = queues[:-1], queues[-1] for queue in H: self._basic_consume(queue, no_ack=no_ack, nowait=True) self._basic_consume(T, no_ack=no_ack, nowait=False)
_basic_consume
方法代码如下:
是将消费者标签以及回调函数传给Queue
的consume
方法。
def _basic_consume(self, queue, consumer_tag=None, no_ack=no_ack, nowait=True): tag = self._active_tags.get(queue.name) if tag is None: tag = self._add_tag(queue, consumer_tag) queue.consume(tag, self._receive_callback, no_ack=no_ack, nowait=nowait) return tag
对于每一个 queue,都会调用其 consume 函数。
Queue
的consume
方法代码:
class Queue(MaybeChannelBound): def consume(self, consumer_tag='', callback=None, no_ack=None, nowait=False): """Start a queue consumer. Consumers last as long as the channel they were created on, or until the client cancels them. Arguments: consumer_tag (str): Unique identifier for the consumer. The consumer tag is local to a connection, so two clients can use the same consumer tags. If this field is empty the server will generate a unique tag. no_ack (bool): If enabled the broker will automatically ack messages. nowait (bool): Do not wait for a reply. callback (Callable): callback called for each delivered message. """ if no_ack is None: no_ack = self.no_ack return self.channel.basic_consume( queue=self.name, no_ack=no_ack, consumer_tag=consumer_tag or '', callback=callback, nowait=nowait, arguments=self.consumer_arguments)
前面提到,queue与channel已经联系了起来。
每一个 Consumer 初始化的时候都是和 Channel 绑定的,也就是说我们 Consumer 包含了 Queue 也就和 Connection 关联起来了!
Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel。
因此又回到了Channel
,就是Channel
的basic_consume
代码:
调用到基类basic_consume方法。
class Channel(virtual.Channel): def basic_consume(self, queue, *args, **kwargs): if queue in self._fanout_queues: exchange, _ = self._fanout_queues[queue] self.active_fanout_queues.add(queue) self._fanout_to_queue[exchange] = queue ret = super().basic_consume(queue, *args, **kwargs) # Update fair cycle between queues. # # We cycle between queues fairly to make sure that # each queue is equally likely to be consumed from, # so that a very busy queue will not block others. # # This works by using Redis's `BRPOP` command and # by rotating the most recently used queue to the # and of the list. See Kombu github issue #166 for # more discussion of this method. self._update_queue_cycle() return ret
基类是 virtual.Channel,其作用是:
Channel
将Consumer
标签,Consumer
要消费的队列,以及标签与队列的映射关系都记录下来,等待循环调用。另外,还通过Transport
将队列与回调函数列表的映射关系记录下来,以便于从队列中取出消息后执行回调函数。
变量是:
Consumer
要消费的队列;Consumer
标签;Transport
;数值如下:
self._tag_to_queue = {dict: 1} {'None1': 'asynt'} self._active_queues = {list: 1} ['asynt'] self._consumers = {set: 1} {'None1'} self.connection = {Transport}self.connection._callbacks = {dict: 1} {'asynt':<function Channel.basic_consume.._callback at 0x7fb3ecd4a2f0>}
代码如下:
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs): """Consume from `queue`.""" self._tag_to_queue[consumer_tag] = queue self._active_queues.append(queue) def _callback(raw_message): message = self.Message(raw_message, channel=self) if not no_ack: self.qos.append(message, message.delivery_tag) return callback(message) self.connection._callbacks[queue] = _callback self._consumers.add(consumer_tag) self._reset_cycle()
_reset_cycle 代码如下,看起来就是调用了 FairCycle,实际上没有用到,因为cycle已经有预设。cycle
是一个MultiChannelPoller
实例。
def _reset_cycle(self): self._cycle = FairCycle( self._get_and_deliver, self._active_queues, Empty)
具体如下图:
+----------+ +-------+ +---------+ | Consumer | | Queue | | Channel | +----+-----+ +---+---+ +-----+---+ | | | | | | __enter__ | | | | | | | | consume | | | | | | | | _basic_consume | | | | | | | | | consume | | +------------> | | | | basic_consume | | | | | | +-----------> | | | | | | | | | _reset_cycle | | | | | | | | | | | | | | | v v v
为了更好的分析,我们暂时注销hub,使用drain_events消费消息,这样更直观。
就是说,Consumer 已经和 Channel 联系起来,知道读取redis 中的哪个key。但是现在缺少一个读取消息的引擎。这个引擎可以驱动消息读取,每次有消息,就调用 consumer 中的回调函数来处理消息。
在没有引擎的情况下,drain_events 就可以起到引擎的作用。
with Consumer(conn, [queue], on_message=on_message): send_message(conn) # hub.timer.call_repeatedly(3, p_message) # hub.run_forever() conn.drain_events(timeout=1)
drain_events 调用 Connection 的方法来进行消费。
def drain_events(self, **kwargs): """Wait for a single event from the server. Arguments: timeout (float): Timeout in seconds before we give up. """ return self.transport.drain_events(self.connection, **kwargs)
在 Transport中的drain_events ,是在无限执行get(self._deliver, timeout=timeout)
get
是self.cycle
的一个方法,cycle
是一个MultiChannelPoller
实例:
所以get
是<bound method MultiChannelPoller.get of
def drain_events(self, connection, timeout=None): time_start = monotonic() get = self.cycle.get polling_interval = self.polling_interval if timeout and polling_interval and polling_interval > timeout: polling_interval = timeout while 1: try: get(self._deliver, timeout=timeout) except Empty: if timeout is not None and monotonic() - time_start >= timeout: raise socket.timeout() if polling_interval is not None: sleep(polling_interval) else: break
Transport
相关联的每一个channel都要执行drain_events
。具体分两步:
对于每一个channel都注册;
进行poll;
代码如下:
def get(self, callback, timeout=None): self._in_protected_read = True try: for channel in self._channels: if channel.active_queues: # BRPOP mode? if channel.qos.can_consume(): self._register_BRPOP(channel) if channel.active_fanout_queues: # LISTEN mode? self._register_LISTEN(channel) events = self.poller.poll(timeout) if events: for fileno, event in events: ret = self.handle_event(fileno, event) if ret: return # - no new data, so try to restore messages. # - reset active redis commands. self.maybe_restore_messages() raise Empty() finally: self._in_protected_read = False while self.after_read: try: fun = self.after_read.pop() except KeyError: break else: fun()
具体注册如下,我们先来看看 _register_BRPOP
,这里做了两个判断,第一个是判断当前的 channel 是否放进了 epoll 模型里面,如果没有,那么就放进去;同时,如果之前这个 channel 不在 epoll 里面,那么这次放进去了。
def _register_BRPOP(self, channel): """Enable BRPOP mode for channel.""" ident = channel, channel.client, 'BRPOP' if not self._client_registered(channel, channel.client, 'BRPOP'): channel._in_poll = False self._register(*ident) if not channel._in_poll: # send BRPOP channel._brpop_start()
最终进行Poll注册,这样当redis的socket对应的fd有消息,就会进行处理。
变量如下:
def register(self, fd, events): fd = fileno(fd) poll_flags = 0 if events & ERR: poll_flags |= POLLERR if events & WRITE: poll_flags |= POLLOUT if events & READ: poll_flags |= POLLIN self._quick_register(fd, poll_flags) return fd
当poll有消息,则相应处理。
events = self.poller.poll(timeout) if events: for fileno, event in events: ret = self.handle_event(fileno, event) if ret: return
但是,这个 connection 还没有对 epoll 起效果,所以发送一个 _brpop_start
。
这里可以看到,是对 asynt_queue 发起了监听请求,也就是说队列有消息过来,会被响应到。
变量如下:
keys = {list: 5} ['asynt_queue', 'asynt_queue\x06\x163', 'asynt_queue\x06\x166', 'asynt_queue\x06\x169', 1] queues = {list: 1} ['asynt_queue']
代码如下:
def _brpop_start(self, timeout=1): queues = self._queue_cycle.consume(len(self.active_queues)) if not queues: return keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps for queue in queues] + [timeout or 0] self._in_poll = self.client.connection self.client.connection.send_command('BRPOP', *keys)
此处有一个负载均衡需要说明:
_queue_cycle属于均衡策略,就是选择下一次哪个queue的策略,items就是具体queue列表。比如:
class round_robin_cycle: """Iterator that cycles between items in round-robin.""" def __init__(self, it=None): self.items = it if it is not None else [] def update(self, it): """Update items from iterable.""" self.items[:] = it def consume(self, n): """Consume n items.""" return self.items[:n]
_brpop_start就是启动了下一次读取,选择哪一个queue。
consume, scheduling.py:79 _brpop_start, redis.py:725 _register_BRPOP, redis.py:314 on_poll_start, redis.py:328 on_poll_start, redis.py:1072 create_loop, hub.py:294 run_once, hub.py:193 run_forever, hub.py:185 main, testUb.py:49, testUb.py:53
因为已经把 file 和 poll 联系起来,所以对调用对应的响应方法,而响应方法会进行read消息。
def handle_event(self, fileno, event): if event & READ: return self.on_readable(fileno), self elif event & ERR: chan, type = self._fd_to_chan[fileno] chan._poll_error(type)
这里听说 Redis 已经准备好了,所以就来获取拿到的结果,然后就解析起来了,解析成功之后,自然要处理这个消息呀,于是乎又回到了这里 redis.py
:
提取fd对应的channel的响应方法如下:
def on_readable(self, fileno): chan, type = self._fd_to_chan[fileno] if chan.qos.can_consume(): chan.handlers[type]()
前面对chan.handlers已经进行了注册。
handlers = {dict: 2} 'BRPOP' = {method}<bound method Channel._brpop_read of > 'LISTEN' = {method}<bound method Channel._receive of >
因此调用_brpop_read。
def _brpop_read(self, **options): try: try: dest__item = self.client.parse_response(self.client.connection, 'BRPOP', **options) except self.connection_errors: # if there's a ConnectionError, disconnect so the next # iteration will reconnect automatically. self.client.connection.disconnect() raise if dest__item: dest, item = dest__item dest = bytes_to_str(dest).rsplit(self.sep, 1)[0] self._queue_cycle.rotate(dest) self.connection._deliver(loads(bytes_to_str(item)), dest) return True else: raise Empty() finally: self._in_poll = None
这里会从redis驱动读取,文件/redis/connection.py,从SocketBuffer读取。
代码为:
def readline(self): buf = self._buffer buf.seek(self.bytes_read) data = buf.readline() while not data.endswith(SYM_CRLF): # there's more data in the socket that we need self._read_from_socket() buf.seek(self.bytes_read) data = buf.readline() self.bytes_read += len(data) # purge the buffer when we've consumed it all so it doesn't # grow forever if self.bytes_read == self.bytes_written: self.purge() return data[:-2]
当读到 response 之后,调用 Redis驱动中对应命令的 回调方法来处理。此处命令为BRPOP。回调方法为:string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None)
。
代码为:
def parse_response(self, connection, command_name, **options): "Parses a response from the Redis server" try: response = connection.read_response() except ResponseError: if EMPTY_RESPONSE in options: return options[EMPTY_RESPONSE] raise if command_name in self.response_callbacks: return self.response_callbacks[command_name](response, **options) return response
变量为:
command_name = {str} 'BRPOP' connection = {Connection} Connectionoptions = {dict: 0} {} self = {Redis} Redis<ConnectionPool<Connection>> connection = {Connection} Connectionconnection_pool = {ConnectionPool} ConnectionPool<Connection> response_callbacks = {CaseInsensitiveDict: 179} {. 'LPUSH' = {function}<function Redis.at 0x7fbad4276ea0> 'RPUSH' = {function}<function Redis.at 0x7fbad4276ea0> 'SORT' = {function}'ZSCORE' = {function}'ZINCRBY' = {function}'BLPOP' = {function}<function Redis.at 0x7fbad4276f28> 'BRPOP' = {function}<function Redis.at 0x7fbad4276f28> ....
这些代码堆栈如下:
readline, connection.py:251 read_response, connection.py:324 read_response, connection.py:739 parse_response, client.py:915 _brpop_read, redis.py:738 on_readable, redis.py:358 handle_event, redis.py:362 get, redis.py:380 drain_events, base.py:960 drain_events, connection.py:318 main, testUb.py:50, testUb.py:53
从Redis驱动获得消息后,回到了 _brpop_read,信息如下:
dest__item = {tuple: 2} 0 = {bytes: 11} b'asynt_queue' 1 = {bytes: 321} b'{"body": "aGVsbG8gd29ybGQ=", "content-encoding": "utf-8", "content-type": "text/plain", "headers": {}, "properties": {"delivery_mode": 2, "delivery_info": {"exchange": "asynt_exchange", "routing_key": "asynt_routing_key"}, "priority": 0, "body_encoding":
当获得消息之后,会取出对应queue的callback,进行调用。
变量如下:
def _deliver(self, message, queue): try: callback = self._callbacks[queue] except KeyError: logger.warning(W_NO_CONSUMERS, queue) self._reject_inbound_message(message) else: callback(message)
代码继续走到 basic_consume
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs): """Consume from `queue`.""" self._tag_to_queue[consumer_tag] = queue self._active_queues.append(queue) def _callback(raw_message): message = self.Message(raw_message, channel=self) if not no_ack: self.qos.append(message, message.delivery_tag) return callback(message) self.connection._callbacks[queue] = _callback self._consumers.add(consumer_tag) self._reset_cycle()
上文的 _callback 就是 _receive_callback in Consumer,所以这时候就调用过去。
<Consumer: [
def _receive_callback(self, message): accept = self.accept on_m, channel, decoded = self.on_message, self.channel, None try: m2p = getattr(channel, 'message_to_python', None) if m2p: message = m2p(message) if accept is not None: message.accept = accept if message.errors: return message._reraise_error(self.on_decode_error) decoded = None if on_m else message.decode() except Exception as exc: if not self.on_decode_error: raise self.on_decode_error(message, exc) else: return on_m(message) if on_m else self.receive(decoded, message)
最终调用用户方法。
on_message, testUb.py:36 _receive_callback, messaging.py:620 _callback, base.py:630 _deliver, base.py:980 _brpop_read, redis.py:748 on_readable, redis.py:358 handle_event, redis.py:362 get, redis.py:380 drain_events, base.py:960 drain_events, connection.py:318 main, testUb.py:50, testUb.py:53
具体如下:
+----------+ +---------+ +------------------+ +------+ +---------+ +-----+ +---------+ |Connection| |Transport| |MultiChannelPoller| |_poll | | Channel | |redis| |Consumer | +----+-----+ +------+--+ +------------+-----+ +----+-+ +-----+---+ +--+--+ +---+-----+ | | | | | | | + | | | | | | drain_events | | | | | | + + | | | | | +-------> drain_events | | | | | | + + | | | | | | +------------> get | | | | | | + | | | | | | + | | | | | | _register_BRPOP | | | | | | + + | | | | | | +-----------> register | | | | | | + | | | | | + | | | | | | poll | | | | | | + | | | | | | + | | | | | | handle_event | | | | | | + | | | | | | + | | | | | | on_readable | | | | | | + | + | | | | | +----------------->_brpop_read | | | | | | + | | | + | | +---------> | | | _deliver basic|consume | | | | | | | | | | | | | +---------> | | | | | | | | | | | | | | | | | | | | | v | | | | | | | | | | | | _receive_ca|lback | | | | | | v v v v v | v
从上图可以看出模块的用途。
手机上如图
至此,我们已经完成了 Consumer 的分析,下文我们进行 Producer 的分析。
celery 7 优秀开源项目kombu源码分析之registry和entrypoint
(二)放弃pika,选择kombu
kombu消息框架
AMQP中的概念
AMQP的基本概念
深入理解AMQP协议
kombu和消息队列总结
关于epoll版服务器的理解(Python实现)
celery源码解读
Kombu源码分析(一)概述