using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Options; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace MyCore.Services { public class MQTTService { private IMqttClient _client; private IMqttClientOptions _options; private string _mqttServer = "192.168.31.140"; private string _user = "mqtt"; private string _password = "mqtt"; // It's here to have the mqtt initialisation + logic for payload.. // Related to which event occurs, a specific action is done. public MQTTService() { try { // Create a new MQTT client. _client = new MqttFactory().CreateMqttClient(); _options = new MqttClientOptionsBuilder() .WithClientId("ApiService") .WithTcpServer(_mqttServer) .WithCredentials(_user, _password) .WithCleanSession() .Build(); _client.ConnectAsync(_options, CancellationToken.None).ContinueWith(res => { if (res.Status == TaskStatus.RanToCompletion) { Console.WriteLine("It's connected"); } else { Console.WriteLine($"Error connecting to {_mqttServer}"); } }); _client.UseDisconnectedHandler(async e => { Console.WriteLine("### DISCONNECTED FROM SERVER ###"); await Task.Delay(TimeSpan.FromSeconds(5)); try { await _client.ConnectAsync(_options, CancellationToken.None); // Since 3.0.5 with CancellationToken } catch { Console.WriteLine("### RECONNECTING FAILED ###"); } }); _client.UseConnectedHandler(async e => { Console.WriteLine("### CONNECTED WITH SERVER ###"); // Subscribe to a topic await _client.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build()); Console.WriteLine("### SUBSCRIBED ###"); }); _client.UseApplicationMessageReceivedHandler(e => { Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); Console.WriteLine(); }); } catch (Exception e) { Console.WriteLine($"Error during creation of MQTTService with {_mqttServer}, exceptions : {e}"); } } public async void PublishMessage(string topic, string message) { var mqttMessage = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(message) .WithExactlyOnceQoS() .WithRetainFlag() .Build(); await _client.PublishAsync(mqttMessage); } /*protected async Task Start() { var server = new MqttFactory().CreateMqttServer(); try { var client1 = new MqttFactory().CreateMqttClient(); await client1.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer(_mqttServer).Build()); var message = new MqttApplicationMessageBuilder().WithPayload("It's a test").WithTopic("IpAddress").WithRetainFlag().Build(); await client1.PublishAsync(message); await Task.Delay(500); } finally { await server.StopAsync(); } }*/ } }