消息队列MQ

WPF实现MQTT通信

本文主要是介绍WPF实现MQTT通信,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

一、简介

  MQTT(消息队列遥测传输协议),一个基于客户端-服务器的消息发布/订阅传输协议,该协议构建于TCP/IP协议上。主要应用于物联网领域。在MQTT协议中,由三部分,发布者、代理(服务)、订阅者,MQTT的消息传输与分为两个部分,分别是主题(Topic)和负载(payload),主题为消息类型、负载为具体的信息内容

二、服务器端

第一步 创建一个服务端对象

public IMqttServer m_MqttServer = new MqttFactory().CreateMqttServer();;

第二步 配置一个Mqtt服务

var optionsBuilder = new MqttServerOptionsBuilder()
                .WithConnectionBacklog(2000)//连接记录数
                .WithDefaultEndpoint()
                .WithDefaultEndpointPort(8000)//端口号
                .WithConnectionValidator(c =>
                {
                    if (c.ClientId.Length < 5)
                    {
                        c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
                        return;
                    }

                    if (c.Username != "Username") //用户名
                    {
                        c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                        return;
                    }
                    if (c.Password != "PassWord") //用户名
                    {
                        c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                        return;
                    }

                    OnMessage($"{c.ClientId}连接成功!");
                    c.ReasonCode = MqttConnectReasonCode.Success;
                }) //连接验证器
                .WithPersistentSessions(); //持续会话

 

在连接验证器里,可以加入一些过滤条件,比如 ClientId长度小于5的认为是无效的Id,还可以加入的用户名和密码的验证。

第三步 启动MQTT服务

//启动服务
m_MqttServer.StartAsync(optionsBuilder.Build());
//处理接收到的信息
m_MqttServer.UseApplicationMessageReceivedHandler(e =>
{
    var payload = e.ApplicationMessage.ConvertPayloadToString();
    OnMessage($"主题:{e.ApplicationMessage.Topic}");
    OnMessage($"信息:{payload}");
});

第四步 当有客户端断开连接时,进行提醒

m_MqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(MqttServer_ClientDisconnected);
private void MqttServer_ClientDisconnected(MqttServerClientDisconnectedEventArgs obj)
{
     OnMessage(obj.ClientId + "断开连接");
}

三、客户端

 第一步 声名一个客户端对象

public IMqttClient  m_MqttClient = new MqttFactory().CreateMqttClient();;

第二步 配置MqttClient

var mqttOptions = new MqttClientOptions()
{
        ClientId = "Client1122",
        ChannelOptions = new MqttClientTcpOptions()
         {
               //服务主机地址
               Server = "192.168.2.51",
                Port = 8000
          },
                
           Credentials = new MqttClientCredentials()
           {
                 Username = "Username",
                 Password = Encoding.UTF8.GetBytes("PassWord")
           },
           CleanSession = false,
           KeepAlivePeriod = TimeSpan.FromSeconds(65535),
           CommunicationTimeout = TimeSpan.FromSeconds(30)
};                                                

第三步 连接服务

CancellationToken cancellationToken = new CancellationToken();
var result = await m_MqttClient.ConnectAsync(mqttOptions, cancellationToken);

在连接时,需要传入一个CancellationToken,用于在连接过程中取消连接使用,在这里并未使用到

可根据MqttClientConnectResult.ResultCode来判断是否连接成功,连接成功时,可以自行

第四步 订阅Topic

m_MqttClient.SubscribeAsync(new MqttTopicFilter() { Topic = "up" });

订阅的Topic可根据实际情况填写

四、完整代码实现

  只是为了实现MQTT通信,界面就做的比较简单

1. 服务端

1) 界面代码

 

    <Grid Margin="10">
        <Grid.RowDefinitions>
            <RowDefinition Height="25"/>
            <RowDefinition />
        </Grid.RowDefinitions>
        <Button Name="btnStart" Grid.Row="0"
                Content="启动"
                Width="50"
                Click="Button_Click" />
        <GroupBox Header="信息监听" Grid.Row="1">
            <RichTextBox Name="rtxMessage"  />
        </GroupBox>
       
    </Grid>

2) 界面后台代码

 public partial class MainWindow : Window
    {
        private MqttServer m_MqttServer = new MqttServer();
        public MainWindow()
        {
            InitializeComponent();
            m_MqttServer.OnMessage += OnMessgae;
        }

        private void Button_Click(object sender, RoutedEventArgs e)
        {
            if (btnStart.Content.ToString() == "启动")
            {
                m_MqttServer.MqttConfig();
                OnMessgae("服务已启动");
                btnStart.Content = "停止";
            }
            else
            {
                m_MqttServer.CloseServer();
                btnStart.Content = "启动";
                OnMessgae("服务已关闭");
            } 
            
        }

        private void OnMessgae(string msg)
        {
            rtxMessage.Dispatcher.Invoke(new Action(() =>
            {
                msg = DateTime.Now.ToString() +">>>"+ msg;
                rtxMessage.AppendText(msg);
                rtxMessage.AppendText(Environment.NewLine);
                rtxMessage.ScrollToEnd();
            }));
        }
    }

3) 新建一个MqttServer类

        public delegate void MessageHandler(string msg);
        public event MessageHandler OnMessage;

        private IMqttServer m_MqttServer;
        /// <summary>
        /// 配置一个Mqtt服务
        /// </summary>
        public void MqttConfig()
        {
            var optionsBuilder = new MqttServerOptionsBuilder()
                .WithConnectionBacklog(2000)
                .WithDefaultEndpoint()
                .WithDefaultEndpointPort(8000)
                .WithConnectionValidator(c =>
                {
                    if (c.ClientId.Length < 5)
                    {
                        c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
                        return;
                    }

                    if (c.Username != "Username") //用户名
                    {
                        c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                        return;
                    }
                    if (c.Password != "PassWord") //用户名
                    {
                        c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                        return;
                    }

                    OnMessage($"{c.ClientId}连接成功!");
                    c.ReasonCode = MqttConnectReasonCode.Success;
                })
                .WithPersistentSessions();

            m_MqttServer = new MqttFactory().CreateMqttServer();

            m_MqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttServer_ClientUnsubscribedTopic);
            m_MqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(MqttServer_ClientConnected);
            m_MqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(MqttServer_ClientDisconnected);

            //启动服务
            m_MqttServer.StartAsync(optionsBuilder.Build());

            m_MqttServer.UseApplicationMessageReceivedHandler(e =>
            {
                var payload = e.ApplicationMessage.ConvertPayloadToString();
                OnMessage($"主题:{e.ApplicationMessage.Topic}");
                OnMessage($"信息:{payload}");
            });
        }

        private void MqttServer_ClientDisconnected(MqttServerClientDisconnectedEventArgs obj)
        {
            OnMessage(obj.ClientId + "断开连接");
        }

        /// <summary>
        /// 连接成功时,
        /// </summary>
        /// <param name="obj"></param>
        private void MqttServer_ClientConnected(MqttServerClientConnectedEventArgs obj)
        {

        }

        private void MqttServer_ClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs obj)
        {

        }

        public void CloseServer()
        {
            m_MqttServer.Dispose();
        }

    }

2. 客户端

1) 界面

 

 <Grid>
        <Grid.RowDefinitions>
            <RowDefinition />
            <RowDefinition />
            <RowDefinition  Height="30" />
        </Grid.RowDefinitions>
        <GroupBox Header="消息">
            <RichTextBox Name="rtxMessage" />
        </GroupBox>
        <GroupBox Header="消息发送区" Grid.Row="1">
            <TextBox Name="txtSend" TextWrapping="Wrap"
                 Width="370"
                 Height="150"
                 Grid.Row="1" />
        </GroupBox>
        
        <StackPanel Grid.Row="2" Orientation="Horizontal" HorizontalAlignment="Center">
            <Button 
                    Content="连接"
                    Width="50"
                    Click="Button_Click" />
            <Button Grid.Row="1" Margin="20,0,0,0"
                    Content="发布"
                    Width="50"
                    Click="ButtonSend_Click" />
            <Button Grid.Row="1"
                    Margin="20,0,0,0"
                    Content="断开"
                    Width="50"
                    Click="ButtonClose_Click" />
        </StackPanel>
    </Grid>

2) 后台代码

 public partial class MainWindow : Window
    {
        MqttClient m_MqttClient = new MqttClient();
        public MainWindow()
        {
            InitializeComponent();
            m_MqttClient.OnMessage += OnMessgae;
        }
        private async void Button_Click(object sender, RoutedEventArgs e)
        {
            await m_MqttClient.MqttClientCon();
        }

        private void OnMessgae(string msg)
        {
            rtxMessage.Dispatcher.Invoke(new Action(() =>
            {
                msg = DateTime.Now.ToString() + ">>>" + msg;
                rtxMessage.AppendText(msg);
                rtxMessage.AppendText(Environment.NewLine);
                rtxMessage.ScrollToEnd();
            }));
        }

        private void ButtonSend_Click(object sender, RoutedEventArgs e)
        {
            string payload = txtSend.Text;
            byte[] msggbu = Encoding.UTF8.GetBytes(payload);
            MqttApplicationMessage applicationMessage = new();
            CancellationToken cancellationToken1 = new CancellationToken();
            applicationMessage.Topic = "up";
            applicationMessage.Payload = msggbu;
            applicationMessage.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
            applicationMessage.Retain = false;
            m_MqttClient.Send(applicationMessage, cancellationToken1);
        }

        private void ButtonClose_Click(object sender, RoutedEventArgs e)
        {
            m_MqttClient.Dispose();
        }
    }

3) 新建一个MqttClient类

 public class MqttClient
    {
        public delegate void MessageHandler(string msg);
        public event MessageHandler OnMessage;
        public IMqttClient m_MqttClient = null;
        public async Task MqttClientCon()
        {
            try
            {
                if (m_MqttClient == null)
                {
                    m_MqttClient = new MqttFactory().CreateMqttClient();
                    var mqttOptions = new MqttClientOptions()
                    {
                        ClientId = "Client1122",
                        ChannelOptions = new MqttClientTcpOptions()
                        {
                            //服务主机地址
                            Server = "192.168.2.5",
                            Port = 8000
                        },

                        Credentials = new MqttClientCredentials()
                        {
                            Username = "Username",
                            Password = Encoding.UTF8.GetBytes("PassWord")

                        },
                        CleanSession = false,
                        KeepAlivePeriod = TimeSpan.FromSeconds(65535),
                        CommunicationTimeout = TimeSpan.FromSeconds(30)
                    };

                    CancellationToken cancellationToken = new CancellationToken();
                    var result = await m_MqttClient.ConnectAsync(mqttOptions, cancellationToken);

                    if (result.ResultCode == MQTTnet.Client.Connecting.MqttClientConnectResultCode.Success)
                    {
                        await m_MqttClient.SubscribeAsync(new MqttTopicFilter() { Topic = "up" });
                        OnMessage("连接成功");
                    }
                    else
                    {
                        OnMessage("连接失败");
                    }

                    m_MqttClient.UseDisconnectedHandler(e =>
                    {
                        OnMessage("Disconnected >>Disconnected Server");
                        m_MqttClient = null;
                    });
                }
            }
            catch (Exception ex)
            {
                OnMessage(ex.Message);
                m_MqttClient.Dispose();
                m_MqttClient = null;
            }
        }

        public async void Send(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken1)
        {

            var result1 = await m_MqttClient.PublishAsync(applicationMessage, cancellationToken1);
            if (result1.ReasonCode == MQTTnet.Client.Publishing.MqttClientPublishReasonCode.Success)
            {
                string msg = $"Topic:{applicationMessage.Topic}\n" +
                    $"Payload:{Encoding.UTF8.GetString(applicationMessage.Payload)}";
                OnMessage(msg);
            }
            else
            {
                OnMessage("发送失败");
            }
        }

        public async void Dispose()
        {
            if (m_MqttClient != null)
            {
                m_MqttClient.Dispose();
                m_MqttClient = null;
                OnMessage("Disconnected >>Disconnected Server");
            }
        }

    }

五、效果

 

这篇关于WPF实现MQTT通信的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!