项目中需要构造带有中文字符非json的测试数据,格式如下:
{'userid': 0, 'ts': '2022-08-03 16:33:38.487973', 'user_name': '中国人'}
发过去之后发现消费出来的都是unicode的编码,且指定了utf-8也没用,一开始以为是kafka producer的value_serializer序列化器用的不对,后面发现其实是代码里json.dumps没用好的原因
# -*- coding: utf-8 -*- import time from kafka import KafkaConsumer, KafkaProducer import json from kafka.errors import KafkaError import datetime producer = KafkaProducer(sasl_mechanism='PLAIN', security_protocol='SASL_PLAINTEXT', sasl_plain_username='xxxxx', sasl_plain_password='xxxxxxxx', bootstrap_servers=['xxxxxxxxxxx'], #这里的dumps可以指定ensure_ascii=False value_serializer=lambda m: json.dumps(m,ensure_ascii=False).encode(), api_version="2.0.0") try: # produce asynchronously for i in range(100): now_time = str(datetime.datetime.now()) send_json={ "userid": i, "ts":now_time, "user_name":"中国人" } print(send_json) future = producer.send('xxxxxxxxxxx', send_json) try: record_metadata = future.get(timeout=2) except KafkaError: # Decide what to do if produce request failed... print("send error!") pass time.sleep(1) print(record_metadata.partition) print(record_metadata.offset) finally: producer.close()
这样就可以把原来的{"userid": 1, "ts": "2022-08-03 16:12:26.595478", "user_name": "\u4e2d\u56fd\u4eba"}改成{"userid": 1, "ts": "2022-08-03 16:33:39.576068", "user_name": "中国人"}
另外1个新手容易犯的错误
1、pyhton中通过str将json强行转换成str类型时,key和value的引号是单引号的,这样发送到kafka,对下游不是很友好,比如下游用java或者flinksql消费的时候可能会出问题,建议用标准序列化json.dumps来转