2020-01-11 19:02:02 +01:00

130 lines
4.2 KiB
C#

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace MyCore.Services
{
public class MQTTService
{
private IMqttClient _client;
private IMqttClientOptions _options;
private string _mqttServer = "192.168.31.140";
private string _user = "mqtt";
private string _password = "mqtt";
// It's here to have the mqtt initialisation + logic for payload..
// Related to which event occurs, a specific action is done.
public MQTTService()
{
try
{
// Create a new MQTT client.
_client = new MqttFactory().CreateMqttClient();
_options = new MqttClientOptionsBuilder()
.WithClientId("ApiService")
.WithTcpServer(_mqttServer)
.WithCredentials(_user, _password)
.WithCleanSession()
.Build();
_client.ConnectAsync(_options, CancellationToken.None).ContinueWith(res => {
if (res.Status == TaskStatus.RanToCompletion)
{
Console.WriteLine("It's connected");
}
else
{
Console.WriteLine($"Error connecting to {_mqttServer}");
}
});
_client.UseDisconnectedHandler(async e =>
{
Console.WriteLine("### DISCONNECTED FROM SERVER ###");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await _client.ConnectAsync(_options, CancellationToken.None); // Since 3.0.5 with CancellationToken
}
catch
{
Console.WriteLine("### RECONNECTING FAILED ###");
}
});
_client.UseConnectedHandler(async e =>
{
Console.WriteLine("### CONNECTED WITH SERVER ###");
// Subscribe to a topic
await _client.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
Console.WriteLine("### SUBSCRIBED ###");
});
_client.UseApplicationMessageReceivedHandler(e =>
{
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
Console.WriteLine();
});
}
catch (Exception e)
{
Console.WriteLine($"Error during creation of MQTTService with {_mqttServer}, exceptions : {e}");
}
}
public async void PublishMessage(string topic, string message)
{
var mqttMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(message)
.WithExactlyOnceQoS()
.WithRetainFlag()
.Build();
await _client.PublishAsync(mqttMessage);
}
/*protected async Task Start()
{
var server = new MqttFactory().CreateMqttServer();
try
{
var client1 = new MqttFactory().CreateMqttClient();
await client1.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer(_mqttServer).Build());
var message = new MqttApplicationMessageBuilder().WithPayload("It's a test").WithTopic("IpAddress").WithRetainFlag().Build();
await client1.PublishAsync(message);
await Task.Delay(500);
}
finally
{
await server.StopAsync();
}
}*/
}
}