using Manager.Interfaces.Models; using Manager.Services; using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Mqtt.Client.AspNetCore.Services { public class MqttClientService : IMqttClientService { private static IMqttClient mqttClient; private IMqttClientOptions options; static DeviceDatabaseService _deviceDatabaseService; static ConfigurationDatabaseService _configurationDatabaseService; public static string lastTopic; public static long lastTimeTopic; public MqttClientService(IMqttClientOptions options) { var server = "localhost"; #if DEBUG server = "192.168.31.96"; #endif this.options = options; this.options = new MqttClientOptionsBuilder() .WithClientId(options.ClientId) .WithTcpServer(server) .WithCredentials(options.Credentials.Username, options.Credentials.Password) .WithCleanSession() .Build(); mqttClient = new MqttFactory().CreateMqttClient(); ConfigureMqttClient(); } private void ConfigureMqttClient() { mqttClient.ConnectedHandler = this; mqttClient.DisconnectedHandler = this; mqttClient.ApplicationMessageReceivedHandler = this; } public Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e) { var payload = ""; if (e.ApplicationMessage.Payload != null) { payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); } var topic = e.ApplicationMessage.Topic; var currentTime = DateTimeOffset.Now.ToUnixTimeMilliseconds(); try { var deserialized = JsonConvert.DeserializeObject>(payload); switch (topic.Split("/")[0]) { case "player": var test = topic.Split("/")[1]; // Get device id if (topic == "player/status") { var deviceId = (string)deserialized["deviceId"]; var connectedStatus = (bool)deserialized["connected"]; Device device = _deviceDatabaseService.GetById(deviceId); if (device != null) { device.Connected = connectedStatus; _deviceDatabaseService.Update(device.Id, device); } } break; } } catch (Exception ex) { } return null; } public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs) { System.Console.WriteLine("connected"); await mqttClient.SubscribeAsync("#"); } public async Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs eventArgs) { if (!mqttClient.IsConnected) { await mqttClient.ReconnectAsync(); } } public async Task StartAsync(CancellationToken cancellationToken) { await mqttClient.ConnectAsync(options); if (!mqttClient.IsConnected) { await mqttClient.ReconnectAsync(); } } public async Task StopAsync(CancellationToken cancellationToken) { if(cancellationToken.IsCancellationRequested) { var disconnectOption = new MqttClientDisconnectOptions { ReasonCode = MqttClientDisconnectReason.NormalDisconnection, ReasonString = "NormalDiconnection" }; await mqttClient.DisconnectAsync(disconnectOption, cancellationToken); } await mqttClient.DisconnectAsync(); } public static async Task PublishMessage(string topic, string message) { var mqttMessage = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(message) .WithExactlyOnceQoS() .WithRetainFlag() .Build(); if (mqttClient.IsConnected) await mqttClient.PublishAsync(mqttMessage); } public static void SetServices(DeviceDatabaseService _DeviceDatabaseService, ConfigurationDatabaseService _ConfigurationDatabaseService) { _deviceDatabaseService = _DeviceDatabaseService; _configurationDatabaseService = _ConfigurationDatabaseService; } } }