使用kombu
读取队列数据的时候报如下错误
amqp.exceptions.PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg ‘x-max-priority’ for queue ‘douyin.pg.logo.ready’ in vhost ‘douyin_pggolden’: received none but current is the value ‘5’ of type ‘signedint’
从队列中读取消息的代码如下
from kombu import Connection rabitmq_user = "admin" rabitmq_pwd = "admin" rabitmq_ip = "8.8.8.8" queue_name = "example.queue" virtual_host = "example" src_rabitmq_host = "amqp://%s:%s@%s:5672/%s"%(rabitmq_user,rabitmq_pwd,rabitmq_ip,virtual_host) conn = Connection(src_rabitmq_host) rec = conn.SimpleQueue(queue_name) while True: msg = rec.get(block=True, timeout=1) msg.ack() #从队列中获取消息 print(msg.payload) break
因为rabitmq的队列中设置了x-max-priority
我们需要将SimpleQueue
改成Queue
即可
from kombu.log import get_logger from kombu.mixins import ConsumerMixin from kombu.utils.functional import reprcall from kombu import Exchange, Queue logger = get_logger(__name__) task_exchange = Exchange('tasks', type='direct') task_queues = [Queue('hipri', task_exchange, routing_key='hipri',queue_arguments={'x-queue-mode': 'lazy', 'x-max-priority': 10},max_priority=5)] class Worker(ConsumerMixin): def __init__(self, connection): self.connection = connection def get_consumers(self, Consumer, channel): return [Consumer(queues=task_queues, accept=['pickle', 'json'], callbacks=[self.process_task])] def process_task(self, body, message): fun = body['fun'] args = body['args'] kwargs = body['kwargs'] logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs)) try: fun(*args, **kwargs) except Exception as exc: logger.error('task raised exception: %r', exc) message.ack() if __name__ == '__main__': from kombu import Connection from kombu.utils.debug import setup_logging # setup root logger setup_logging(loglevel='INFO', loggers=['']) with Connection('amqp://guest:guest@localhost:5672//') as conn: try: worker = Worker(conn) worker.run() except KeyboardInterrupt: print('bye bye')
from celery import Celery from kombu import Exchange, Queue, binding def route_task(exchange_name, exchange_type): def wrapper(name, args, kwargs, options, task=None, **kw): return { 'exchange': exchange_name, 'exchange_type': exchange_type, 'routing_key': name } return wrapper def create_task_queues(binding_list, exchange): binding_map = {} queues = [] for routing_key, queue_name in binding_list: binding_map.setdefault(queue_name, []) binding_map[queue_name].append(routing_key) for queue_name, routing_keys in binding_map.items(): queues.append( Queue( queue_name, [binding(exchange, routing_key=routing_key) for routing_key in routing_keys], queue_arguments={'x-queue-mode': 'lazy', 'x-max-priority': 10},max_priority=5 ) ) return queues class ExampleConfig(object): broker_url = 'amqp://guest:guest@localhost:5672//' task_default_exchange = 'example' task_default_exchange_type = 'topic' bindings = [ ('example.queue', 'example.queue') ] task_queues = create_task_queues(bindings, Exchange(task_default_exchange, type=task_default_exchange_type)) task_routes = (route_task(task_default_exchange, task_default_exchange_type),) app_example= Celery("example") app_example.config_from_object(ExampleConfig) @app_example.task(name="example.queue") def pg_logo_recon(**kwargs): pass