MQTT(消息队列遥测传输协议),一个基于客户端-服务器的消息发布/订阅传输协议,该协议构建于TCP/IP协议上。主要应用于物联网领域。在MQTT协议中,由三部分,发布者、代理(服务)、订阅者,MQTT的消息传输与分为两个部分,分别是主题(Topic)和负载(payload),主题为消息类型、负载为具体的信息内容
第一步 创建一个服务端对象
public IMqttServer m_MqttServer = new MqttFactory().CreateMqttServer();;
第二步 配置一个Mqtt服务
var optionsBuilder = new MqttServerOptionsBuilder() .WithConnectionBacklog(2000)//连接记录数 .WithDefaultEndpoint() .WithDefaultEndpointPort(8000)//端口号 .WithConnectionValidator(c => { if (c.ClientId.Length < 5) { c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid; return; } if (c.Username != "Username") //用户名 { c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } if (c.Password != "PassWord") //用户名 { c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } OnMessage($"{c.ClientId}连接成功!"); c.ReasonCode = MqttConnectReasonCode.Success; }) //连接验证器 .WithPersistentSessions(); //持续会话
在连接验证器里,可以加入一些过滤条件,比如 ClientId长度小于5的认为是无效的Id,还可以加入的用户名和密码的验证。
第三步 启动MQTT服务
//启动服务 m_MqttServer.StartAsync(optionsBuilder.Build()); //处理接收到的信息 m_MqttServer.UseApplicationMessageReceivedHandler(e => { var payload = e.ApplicationMessage.ConvertPayloadToString(); OnMessage($"主题:{e.ApplicationMessage.Topic}"); OnMessage($"信息:{payload}"); });
第四步 当有客户端断开连接时,进行提醒
m_MqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(MqttServer_ClientDisconnected); private void MqttServer_ClientDisconnected(MqttServerClientDisconnectedEventArgs obj) { OnMessage(obj.ClientId + "断开连接"); }
第一步 声名一个客户端对象
public IMqttClient m_MqttClient = new MqttFactory().CreateMqttClient();;
第二步 配置MqttClient
var mqttOptions = new MqttClientOptions() { ClientId = "Client1122", ChannelOptions = new MqttClientTcpOptions() { //服务主机地址 Server = "192.168.2.51", Port = 8000 }, Credentials = new MqttClientCredentials() { Username = "Username", Password = Encoding.UTF8.GetBytes("PassWord") }, CleanSession = false, KeepAlivePeriod = TimeSpan.FromSeconds(65535), CommunicationTimeout = TimeSpan.FromSeconds(30) };
第三步 连接服务
CancellationToken cancellationToken = new CancellationToken(); var result = await m_MqttClient.ConnectAsync(mqttOptions, cancellationToken);
在连接时,需要传入一个CancellationToken,用于在连接过程中取消连接使用,在这里并未使用到
可根据MqttClientConnectResult.ResultCode来判断是否连接成功,连接成功时,可以自行
第四步 订阅Topic
m_MqttClient.SubscribeAsync(new MqttTopicFilter() { Topic = "up" });
订阅的Topic可根据实际情况填写
只是为了实现MQTT通信,界面就做的比较简单
1. 服务端
1) 界面代码
<Grid Margin="10"> <Grid.RowDefinitions> <RowDefinition Height="25"/> <RowDefinition /> </Grid.RowDefinitions> <Button Name="btnStart" Grid.Row="0" Content="启动" Width="50" Click="Button_Click" /> <GroupBox Header="信息监听" Grid.Row="1"> <RichTextBox Name="rtxMessage" /> </GroupBox> </Grid>
2) 界面后台代码
public partial class MainWindow : Window { private MqttServer m_MqttServer = new MqttServer(); public MainWindow() { InitializeComponent(); m_MqttServer.OnMessage += OnMessgae; } private void Button_Click(object sender, RoutedEventArgs e) { if (btnStart.Content.ToString() == "启动") { m_MqttServer.MqttConfig(); OnMessgae("服务已启动"); btnStart.Content = "停止"; } else { m_MqttServer.CloseServer(); btnStart.Content = "启动"; OnMessgae("服务已关闭"); } } private void OnMessgae(string msg) { rtxMessage.Dispatcher.Invoke(new Action(() => { msg = DateTime.Now.ToString() +">>>"+ msg; rtxMessage.AppendText(msg); rtxMessage.AppendText(Environment.NewLine); rtxMessage.ScrollToEnd(); })); } }
3) 新建一个MqttServer类
public delegate void MessageHandler(string msg); public event MessageHandler OnMessage; private IMqttServer m_MqttServer; /// <summary> /// 配置一个Mqtt服务 /// </summary> public void MqttConfig() { var optionsBuilder = new MqttServerOptionsBuilder() .WithConnectionBacklog(2000) .WithDefaultEndpoint() .WithDefaultEndpointPort(8000) .WithConnectionValidator(c => { if (c.ClientId.Length < 5) { c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid; return; } if (c.Username != "Username") //用户名 { c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } if (c.Password != "PassWord") //用户名 { c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } OnMessage($"{c.ClientId}连接成功!"); c.ReasonCode = MqttConnectReasonCode.Success; }) .WithPersistentSessions(); m_MqttServer = new MqttFactory().CreateMqttServer(); m_MqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttServer_ClientUnsubscribedTopic); m_MqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(MqttServer_ClientConnected); m_MqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(MqttServer_ClientDisconnected); //启动服务 m_MqttServer.StartAsync(optionsBuilder.Build()); m_MqttServer.UseApplicationMessageReceivedHandler(e => { var payload = e.ApplicationMessage.ConvertPayloadToString(); OnMessage($"主题:{e.ApplicationMessage.Topic}"); OnMessage($"信息:{payload}"); }); } private void MqttServer_ClientDisconnected(MqttServerClientDisconnectedEventArgs obj) { OnMessage(obj.ClientId + "断开连接"); } /// <summary> /// 连接成功时, /// </summary> /// <param name="obj"></param> private void MqttServer_ClientConnected(MqttServerClientConnectedEventArgs obj) { } private void MqttServer_ClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs obj) { } public void CloseServer() { m_MqttServer.Dispose(); } }
2. 客户端
1) 界面
<Grid> <Grid.RowDefinitions> <RowDefinition /> <RowDefinition /> <RowDefinition Height="30" /> </Grid.RowDefinitions> <GroupBox Header="消息"> <RichTextBox Name="rtxMessage" /> </GroupBox> <GroupBox Header="消息发送区" Grid.Row="1"> <TextBox Name="txtSend" TextWrapping="Wrap" Width="370" Height="150" Grid.Row="1" /> </GroupBox> <StackPanel Grid.Row="2" Orientation="Horizontal" HorizontalAlignment="Center"> <Button Content="连接" Width="50" Click="Button_Click" /> <Button Grid.Row="1" Margin="20,0,0,0" Content="发布" Width="50" Click="ButtonSend_Click" /> <Button Grid.Row="1" Margin="20,0,0,0" Content="断开" Width="50" Click="ButtonClose_Click" /> </StackPanel> </Grid>
2) 后台代码
public partial class MainWindow : Window { MqttClient m_MqttClient = new MqttClient(); public MainWindow() { InitializeComponent(); m_MqttClient.OnMessage += OnMessgae; } private async void Button_Click(object sender, RoutedEventArgs e) { await m_MqttClient.MqttClientCon(); } private void OnMessgae(string msg) { rtxMessage.Dispatcher.Invoke(new Action(() => { msg = DateTime.Now.ToString() + ">>>" + msg; rtxMessage.AppendText(msg); rtxMessage.AppendText(Environment.NewLine); rtxMessage.ScrollToEnd(); })); } private void ButtonSend_Click(object sender, RoutedEventArgs e) { string payload = txtSend.Text; byte[] msggbu = Encoding.UTF8.GetBytes(payload); MqttApplicationMessage applicationMessage = new(); CancellationToken cancellationToken1 = new CancellationToken(); applicationMessage.Topic = "up"; applicationMessage.Payload = msggbu; applicationMessage.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; applicationMessage.Retain = false; m_MqttClient.Send(applicationMessage, cancellationToken1); } private void ButtonClose_Click(object sender, RoutedEventArgs e) { m_MqttClient.Dispose(); } }
3) 新建一个MqttClient类
public class MqttClient { public delegate void MessageHandler(string msg); public event MessageHandler OnMessage; public IMqttClient m_MqttClient = null; public async Task MqttClientCon() { try { if (m_MqttClient == null) { m_MqttClient = new MqttFactory().CreateMqttClient(); var mqttOptions = new MqttClientOptions() { ClientId = "Client1122", ChannelOptions = new MqttClientTcpOptions() { //服务主机地址 Server = "192.168.2.5", Port = 8000 }, Credentials = new MqttClientCredentials() { Username = "Username", Password = Encoding.UTF8.GetBytes("PassWord") }, CleanSession = false, KeepAlivePeriod = TimeSpan.FromSeconds(65535), CommunicationTimeout = TimeSpan.FromSeconds(30) }; CancellationToken cancellationToken = new CancellationToken(); var result = await m_MqttClient.ConnectAsync(mqttOptions, cancellationToken); if (result.ResultCode == MQTTnet.Client.Connecting.MqttClientConnectResultCode.Success) { await m_MqttClient.SubscribeAsync(new MqttTopicFilter() { Topic = "up" }); OnMessage("连接成功"); } else { OnMessage("连接失败"); } m_MqttClient.UseDisconnectedHandler(e => { OnMessage("Disconnected >>Disconnected Server"); m_MqttClient = null; }); } } catch (Exception ex) { OnMessage(ex.Message); m_MqttClient.Dispose(); m_MqttClient = null; } } public async void Send(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken1) { var result1 = await m_MqttClient.PublishAsync(applicationMessage, cancellationToken1); if (result1.ReasonCode == MQTTnet.Client.Publishing.MqttClientPublishReasonCode.Success) { string msg = $"Topic:{applicationMessage.Topic}\n" + $"Payload:{Encoding.UTF8.GetString(applicationMessage.Payload)}"; OnMessage(msg); } else { OnMessage("发送失败"); } } public async void Dispose() { if (m_MqttClient != null) { m_MqttClient.Dispose(); m_MqttClient = null; OnMessage("Disconnected >>Disconnected Server"); } } }