.Net Core/.net 6/.Net 8 实现Mqtt客户端
- 客户端代码
- 调用
直接上代码
nuget引用
MQTTnet
客户端代码
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using System.Text;namespace Code.Mqtt
{/// <summary>/// Mqtt客户端/// </summary>public class MqttClientBase{/// <summary>/// 客户端/// </summary>public IMqttClient client;/// <summary>/// 订阅主题列表/// </summary>public List<string> Topics=new List<string>();public MqttClientOptions options;public MqttClientBaseOptions _opt;/// <summary>/// 主动断开连接/// </summary>public bool off = false;public bool isconn = false;/// <summary>/// 创建mqtt客户端,并值接传入初始参数/// </summary>/// <param name="opt"></param>public MqttClientBase(MqttClientBaseOptions opt){this._opt = opt;//创建客户端client = new MqttFactory().CreateMqttClient();options =new MqttClientOptions() { ClientId=_opt.clientId,ChannelOptions=new MqttClientTcpOptions(){Server=_opt.server,Port=_opt.port,},Credentials=new MqttClientCredentials(_opt.username,Encoding.UTF8.GetBytes(_opt.password)),//清理会话CleanSession=false,//设置心跳KeepAlivePeriod = TimeSpan.FromSeconds(30)};}/// <summary>/// 创建mqtt客户端,不传参数,/// 必须在调用 Connect之前调用过SetOption方法/// </summary>public MqttClientBase(){//创建客户端client = new MqttFactory().CreateMqttClient();}/// <summary>/// 设置参数/// </summary>/// <param name="opt"></param>public void SetOption(MqttClientBaseOptions opt){options = new MqttClientOptions(){ClientId = _opt.clientId,ChannelOptions = new MqttClientTcpOptions(){Server = _opt.server,Port = _opt.port,},Credentials = new MqttClientCredentials(_opt.username, Encoding.UTF8.GetBytes(_opt.password)),//清理会话CleanSession = false,//设置心跳KeepAlivePeriod = TimeSpan.FromSeconds(30)};}/// <summary>/// 连接服务器/// </summary>/// <param name="action">连接成功后执行</param>/// <param name="ConnectedAsync">连接成功事件</param>public void Connect(Action<MqttClientConnectedEventArgs> ConnectedAsync=null){client.ConnectAsync(options);if(ConnectedAsync != null){//连接成功事件client.ConnectedAsync += (args) =>{ConnectedAsync(args);return Task.CompletedTask;};}}/// <summary>/// 重连服务器/// 在连接断开事件中调用,即可实现无限轮询/// </summary>/// <param name="t">是否重复尝试重连</param>/// <param name="i">尝试次数</param>public void ReConnect(){try{client.ConnectAsync(options).Wait();}catch (Exception ex){Console.WriteLine(ex.Message);}}public async Task AddTopic(string topic){//更新订阅client.SubscribeAsync(new MqttClientSubscribeOptions(){TopicFilters = new List<MqttTopicFilter>() {new MqttTopicFilter { Topic = topic }}});//将主题名称加入列表Topics.Add(topic);}/// <summary>/// 取消订阅/// </summary>/// <param name="topic"></param>/// <returns></returns>public async Task DeleteTopic(string topic){client.UnsubscribeAsync(new MqttClientUnsubscribeOptions(){TopicFilters = new List<string> { topic }});Topics.Remove(topic);}/// <summary>/// 发布消息/// </summary>/// <param name="topic">主题</param>/// <param name="content">内容</param>/// <returns></returns>public async Task Publish(string topic, string content){if(client.IsConnected){client.PublishAsync(new MqttApplicationMessage(){Topic = topic,Payload = Encoding.UTF8.GetBytes(content)});}}/// <summary>/// 主动断开连接/// </summary>public void Disconnect(){off = true;client.DisconnectAsync();}/// <summary>/// 断开连接事件/// </summary>/// <param name="action"></param>/// <returns></returns>public async Task DisconnectedAsync(Action<MqttClientDisconnectedEventArgs> action){client.DisconnectedAsync += (args) => {action(args);return Task.CompletedTask;};}/// <summary>/// 接收消息事件/// </summary>/// <param name="action"></param>/// <returns></returns>public async Task Message(Action<string,string> action) {client.ApplicationMessageReceivedAsync += (args) =>{var topic = args.ApplicationMessage.Topic;var msg = args.ApplicationMessage.Payload.BToString();action(topic, msg);return Task.CompletedTask;};}}
}
调用
我这里是控制台项目
//初始化
var mqtt = new MqttClientBase(new MqttClientBaseOptions() { clientId="client-1",username="username",password="password",server="127.0.0.1",port=10883
});//断开连接事件
mqtt.DisconnectedAsync((e) => {Console.WriteLine("连接断开");//重连服务器mqtt.ReConnect();
});//连接服务器
mqtt.Connect((args) => {/* 连接成功事件 */Console.WriteLine("连接成功");// 添加主题订阅,建议写到 连接成功事件 里面,这样重连后可以重新订阅主题mqtt.AddTopic("topic-1").Wait();mqtt.AddTopic("topic-2").Wait();mqtt.AddTopic("topic-3").Wait();// 取消主题订阅mqtt.DeleteTopic("topic-3").Wait();// 向指定主题推送消息mqtt.Publish("topic-1", "666666666").Wait();});// 收到来自服务器的消息 topic:主题 msg:消息内容
mqtt.Message((topic,msg) => { Console.WriteLine($"收到消息:{topic}:{msg}");
});// 这里暂停三秒,看三秒后主动断开连接效果
// Task.Delay(3000).Wait();// 主动断开连接
//mqtt.Disconnect();while (true)
{// 向指定主题推送消息mqtt.Publish("topic-1", Console.ReadLine());
}