mycorerepository/MyCore/Extensions/MqttClientService.cs

213 lines
8.0 KiB
C#

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MyCore.Interfaces.Models;
using MyCore.Services.Devices;
using MyCore.Services.MyControlPanel;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Mqtt.Client.AspNetCore.Services
{
public class MqttClientService : IMqttClientService
{
private static IMqttClient mqttClient;
private IMqttClientOptions options;
public static List<Zigbee2MqttDevice> devices = new List<Zigbee2MqttDevice>();
public static List<Zigbee2MqttDevice> devicesNew = new List<Zigbee2MqttDevice>();
public static List<Zigbee2MqttGroup> groups = new List<Zigbee2MqttGroup>();
public static string userId;
static DeviceDatabaseService _deviceDatabaseService;
static GroupDatabaseService _groupDatabaseService;
static ProviderDatabaseService _providerDatabaseService;
static LocationDatabaseService _locationDatabaseService;
static AutomationDatabaseService _automationDatabaseService;
static ActionService _actionService;
public static string lastTopic;
public static long lastTimeTopic;
public MqttClientService(IMqttClientOptions options)
{
var server = "localhost";
var clientId = "ApiService";
#if DEBUG
server = "192.168.31.140";
clientId = "ApiServiceTest";
#endif
this.options = options;
this.options = new MqttClientOptionsBuilder()
.WithClientId(clientId)
.WithTcpServer(server)
.WithCredentials("mqtt", "mqtt")
.WithCleanSession()
.Build();
mqttClient = new MqttFactory().CreateMqttClient();
ConfigureMqttClient();
}
private void ConfigureMqttClient()
{
mqttClient.ConnectedHandler = this;
mqttClient.DisconnectedHandler = this;
mqttClient.ApplicationMessageReceivedHandler = this;
}
public Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{
var payload = "";
if (e.ApplicationMessage.Payload != null)
{
payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
}
var topic = e.ApplicationMessage.Topic;
var currentTime = DateTimeOffset.Now.ToUnixTimeMilliseconds();
// For check if not doubled message (example : motion double true value)
var test = currentTime - lastTimeTopic;
var isWakeUpMessage = false;
// if wakeup => Not drop wakeup message
try
{
var deserialzed = JsonConvert.DeserializeObject<Dictionary<string, object>>(payload);
if (deserialzed != null) {
isWakeUpMessage = (string)deserialzed["action"] == "wakeup";
}
} catch (Exception ex)
{
}
// Less than one second between two messages from a same device
if (!(lastTopic == topic && test <= 500))
{
if (_actionService != null)
{
ActionService.HandleActionFromMQTTAsync(topic, payload, _deviceDatabaseService, _groupDatabaseService, _providerDatabaseService, _locationDatabaseService, _automationDatabaseService, userId);
}
}
else
{
// TODO Check if not a valuable message (example state true to false => correct message..)
System.Console.WriteLine($"Drop message - spam from {topic}");
}
switch (topic)
{
case "zigbee2mqtt/bridge/config/devices":
try
{
var devicesConvert = JsonConvert.DeserializeObject<List<Zigbee2MqttDevice>>(payload);
devices = devicesConvert;
}
catch (Exception ex)
{
Console.WriteLine($"Error during retrieving devices ! Exception: {ex}");
}
break;
case "zigbee2mqtt/bridge/groups":
try
{
var groupsConvert = JsonConvert.DeserializeObject<List<Zigbee2MqttGroup>>(payload);
groups = groupsConvert;
}
catch (Exception ex)
{
Console.WriteLine($"Error during retrieving groups ! Exception: {ex}");
}
break;
case "zigbee2mqtt/bridge/devices":
try
{
var devicesConvert = JsonConvert.DeserializeObject<List<Zigbee2MqttDevice>>(payload);
devicesNew = devicesConvert;
}
catch (Exception ex)
{
Console.WriteLine($"Error during retrieving devices ! Exception: {ex}");
}
break;
}
if (!isWakeUpMessage) {
lastTimeTopic = DateTimeOffset.Now.ToUnixTimeMilliseconds();
lastTopic = topic;
}
return null;
}
public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs)
{
System.Console.WriteLine("connected");
await mqttClient.SubscribeAsync("#");
await PublishMessage("zigbee2mqtt/bridge/config/devices/get", "");
}
public async Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs eventArgs)
{
if (!mqttClient.IsConnected)
{
await mqttClient.ReconnectAsync();
}
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await mqttClient.ConnectAsync(options);
if (!mqttClient.IsConnected)
{
await mqttClient.ReconnectAsync();
}
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if(cancellationToken.IsCancellationRequested)
{
var disconnectOption = new MqttClientDisconnectOptions
{
ReasonCode = MqttClientDisconnectReason.NormalDisconnection,
ReasonString = "NormalDiconnection"
};
await mqttClient.DisconnectAsync(disconnectOption, cancellationToken);
}
await mqttClient.DisconnectAsync();
}
public static async Task PublishMessage(string topic, string message)
{
var mqttMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(message)
.WithExactlyOnceQoS()
.WithRetainFlag()
.Build();
if (mqttClient.IsConnected)
await mqttClient.PublishAsync(mqttMessage);
}
public static void SetServices(DeviceDatabaseService _DeviceDatabaseService, GroupDatabaseService _GroupDatabaseService, ProviderDatabaseService _ProviderDatabaseService, LocationDatabaseService _LocationDatabaseService, ActionService _ActionService, AutomationDatabaseService _AutomationDatabaseService, string UserId)
{
_deviceDatabaseService = _DeviceDatabaseService;
_groupDatabaseService = _GroupDatabaseService;
_providerDatabaseService = _ProviderDatabaseService;
_locationDatabaseService = _LocationDatabaseService;
_automationDatabaseService = _AutomationDatabaseService;
_actionService = _ActionService;
userId = UserId;
}
}
}