155 lines
5.1 KiB
C#

using Manager.Interfaces.Models;
using Manager.Services;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Protocol;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Mqtt.Client.AspNetCore.Services
{
public class MqttClientService : IMqttClientService
{
private static IMqttClient mqttClient;
private IMqttClientOptions options;
static DeviceDatabaseService _deviceDatabaseService;
static ConfigurationDatabaseService _configurationDatabaseService;
public static string lastTopic;
public static long lastTimeTopic;
public MqttClientService(IMqttClientOptions options)
{
var server = "localhost";
#if DEBUG
server = "192.168.31.96";
#endif
this.options = options;
this.options = new MqttClientOptionsBuilder()
.WithClientId(options.ClientId)
.WithTcpServer(server)
.WithCredentials(options.Credentials.Username, options.Credentials.Password)
.WithCleanSession()
.Build();
mqttClient = new MqttFactory().CreateMqttClient();
ConfigureMqttClient();
}
private void ConfigureMqttClient()
{
mqttClient.ConnectedHandler = this;
mqttClient.DisconnectedHandler = this;
mqttClient.ApplicationMessageReceivedHandler = this;
}
public Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{
var payload = "";
if (e.ApplicationMessage.Payload != null)
{
payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
}
var topic = e.ApplicationMessage.Topic;
var currentTime = DateTimeOffset.Now.ToUnixTimeMilliseconds();
try
{
var deserialized = JsonConvert.DeserializeObject<Dictionary<string, object>>(payload);
switch (topic.Split("/")[0])
{
case "player":
var test = topic.Split("/")[1]; // Get device id
if (topic == "player/status") {
var deviceId = (string)deserialized["deviceId"];
var connectedStatus = (bool)deserialized["connected"];
Device device = _deviceDatabaseService.GetById(deviceId);
if (device != null)
{
device.Connected = connectedStatus;
_deviceDatabaseService.Update(device.Id, device);
}
}
break;
}
} catch (Exception ex)
{
}
return null;
}
public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs)
{
System.Console.WriteLine("connected");
await mqttClient.SubscribeAsync("#");
}
public async Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs eventArgs)
{
if (!mqttClient.IsConnected)
{
await mqttClient.ReconnectAsync();
}
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await mqttClient.ConnectAsync(options);
if (!mqttClient.IsConnected)
{
await mqttClient.ReconnectAsync();
}
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if(cancellationToken.IsCancellationRequested)
{
var disconnectOption = new MqttClientDisconnectOptions
{
ReasonCode = MqttClientDisconnectReason.NormalDisconnection,
ReasonString = "NormalDiconnection"
};
await mqttClient.DisconnectAsync(disconnectOption, cancellationToken);
}
await mqttClient.DisconnectAsync();
}
public static async Task PublishMessage(string topic, string message)
{
var mqttMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(message)
.WithAtMostOnceQoS()
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.WithRetainFlag(false)
.Build();
if (mqttClient.IsConnected)
await mqttClient.PublishAsync(mqttMessage);
}
public static void SetServices(DeviceDatabaseService _DeviceDatabaseService, ConfigurationDatabaseService _ConfigurationDatabaseService)
{
_deviceDatabaseService = _DeviceDatabaseService;
_configurationDatabaseService = _ConfigurationDatabaseService;
}
}
}