# -*- coding:utf-8 -*- from multiprocessing import Process import paho.mqtt.publish as publish from Data.data import * import paho.mqtt.subscribe as subscribe from common.yaml_common_method import * import time,json,yaml,os """基础数据yaml获取""" data = yaml_method_read("data") host = data["host"]#IP地址 prot = data["port"]#端口 pub_url = data["pub_url"]+"/register"#发送 sub_url = data["sub_url"]+"/register/response"#订阅 """主题请求参数yaml获取""" publish_yaml = yaml_method_read("publish") publish_msg = publish_yaml["upload_service"] client_id = time.strftime('mq:test:%Y%m%d%H%M%S',time.localtime(time.time())) class Mqtt_method_info(): def mqtt_publish(self): #这里是发送的函数 msg = publish_msg print("------------------------------Send_sub------------------------------") print(pub_url) print("----------------------------Send_message----------------------------") print(msg) msg = json.dumps(msg) publish.single(pub_url, msg, qos=1, hostname=host, port=prot, client_id=client_id) print("------------------------------End_send------------------------------") def mqtt_subscribe(self): #这里是订阅的函数 print("--------------------------Subscribe_topics--------------------------") print(sub_url) msg = subscribe.simple(sub_url, qos=1, hostname=host, port=prot, client_id=client_id) msg_payload = json.loads(msg.payload.decode("utf-8")) print("---------------------------Return_message----------------------------") print(msg_payload) yaml_method_write("mqtt_return_data",msg_payload) def implement(self): pub = Process(target=Mqtt_method_info().mqtt_publish)#发送线程 sub = Process(target=Mqtt_method_info().mqtt_subscribe)#订阅线程 sub.start() time.sleep(1) pub.start() sub.join() print("执行完毕") if __name__ == '__main__': Mqtt_method_info().implement()
这里讲一下python-mqtt测试脚本:
我们会使用到python的paho-mqtt库,同样pip安装下就行,失败的话大部分是需要用镜像站的情况
这份代码分为yaml文件读取发送的数据,这里只有操作的方法,大家如果需要使用可以把发送、订阅的函数中修改,只运行方法,然后自己再通过各种文件管理数据