1.安装kafka环境
# 看这个地址 https://blog.csdn.net/github_38482082/article/details/82112641 # 你还需要装Java环境 # 测试启动如果启动成功,那么证明kafka启动成功 .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties #启动kafka .\bin\windows\kafka-server-start.bat .\config\server.properties # 创建top .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test # 生产者 .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test # 消费者 .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
2.pykafka
2,1安装
pip install pykafka
2.2生产者
from pykafka import KafkaClient from conf import KAFKA_HOSTS_LOCALHOST #连接kafka客户端 kafka_client = KafkaClient(hosts=KAFKA_HOSTS_LOCALHOST) #获取topic topic = kafka_client.topics["test"] #获取生产者对象 produce = topic.get_producer() #传数据必须是字节 produce.produce("now_time_bytes".encode("utf8")) #手动关闭该生产者 produce.stop()
2.3消费者
# 导入安装包 from pykafka import KafkaClient # 设置客户端的连接信息 client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics['test'] # print(client.topics) # print(topic.latest_available_offsets()) #consumer_group 与consumer_id值不能一样,不同group相互独立 consumer = topic.get_simple_consumer( consumer_group='18', auto_commit_enable=True, auto_commit_interval_ms=1, reset_offset_on_start=True # consumer_id =1, ) for message in consumer: if message is not None: print(message.offset, message.value)