先说下问题:
正常使用kafka消费者,接收消息时,会出现消息循环无法结束问题,增加参数 consumer_timeout_ms:超时时间(毫秒),超过指定时间没有获取到消息关闭kafka。(例子如下)
consumer.py文件:
from kafka import KafkaProducer, KafkaConsumer import time class KafkaClient(object): topic = "topic" # 使用的kafka的topic client = "0.0.0.0:19823" # kafka所在的服务地址 group_id = "test_consumer_group" # kafka组信息 @staticmethod def log(log_str): t = time.strftime(r"%Y-%m-%d_%H:%M:%S", time.localtime()) print("[%s]%s" % (t, log_str)) def info_send(self, key, info_str): """key: 发送信息的key;info_str:要发送的信息内容""" producer = KafkaProducer(bootstrap_servers=[self.client]) producer.send(self.topic, key=key.encode("utf-8"), value=info_str.encode("utf-8")) # 批量提交可以使用 producer.flush() producer.close() def message_consumer(): # consumer_timeout_ms:超时时间(毫秒),超过指定时间没有获取到消息关闭kafka consumer = KafkaConsumer(self.topic, group_id=self.group_id, bootstrap_servers=[self.client], consumer_timeout_ms=3000) for msg in consumer: # partition:消息所在的分区,offset:消息所在分区的位置,key:消息的key,value:消息的内容 print(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.key, msg.value)