Two mqtt client - Local and online

This commit is contained in:
Thomas Fransolet 2021-01-02 16:53:06 +01:00
parent 1cc12876f7
commit 5b4355d43c
12 changed files with 284 additions and 5 deletions

View File

@ -27,14 +27,16 @@ namespace MyCore.Controllers.Devices
private LocationDatabaseService _LocationDatabaseService;
private UserDatabaseService _UserDatabaseService;
private readonly IMqttClientService _mqttClientService;
private readonly IMqttOnlineClientService _mqttOnlineClientService;
public DeviceController(DeviceDatabaseService DeviceDatabaseService, ProviderDatabaseService ProviderDatabaseService, LocationDatabaseService LocationDatabaseService, UserDatabaseService UserDatabaseService, MqttClientServiceProvider provider)
public DeviceController(DeviceDatabaseService DeviceDatabaseService, ProviderDatabaseService ProviderDatabaseService, LocationDatabaseService LocationDatabaseService, UserDatabaseService UserDatabaseService, MqttClientServiceProvider provider, MqttClientOnlineServiceProvider onlineProvider)
{
this._DeviceDatabaseService = DeviceDatabaseService;
this._ProviderDatabaseService = ProviderDatabaseService;
this._LocationDatabaseService = LocationDatabaseService;
this._UserDatabaseService = UserDatabaseService;
this._mqttClientService = provider.MqttClientService;
this._mqttOnlineClientService = onlineProvider.MqttOnlineClientService;
}
// GET: Devices
@ -171,7 +173,7 @@ namespace MyCore.Controllers.Devices
/// <param name="userId">User Id</param>
[ProducesResponseType(typeof(List<DeviceDetailDTO>), 200)]
[HttpGet("zigbee2Mqtt/{userId}")]
public ObjectResult GetDevicesFromZigbee2Mqtt(string userId)
public async Task<ObjectResult> GetDevicesFromZigbee2Mqtt(string userId)
{
try
{
@ -184,6 +186,32 @@ namespace MyCore.Controllers.Devices
// GET ALL LOCAL DEVICES
var devices = MqttClientService.devices;
// Test mqtt
await MqttClientService.PublishMessage("test", "zdz").ContinueWith(res => {
if (res.Status == TaskStatus.RanToCompletion)
{
}
else
{
throw new Exception("Publish error");
}
});
// Test online mqtt
await MqttClientOnlineService.PublishMessage("test", "zdz").ContinueWith(res => {
if (res.Status == TaskStatus.RanToCompletion)
{
}
else
{
throw new Exception("Publish error");
}
});
return new OkObjectResult(devices);
}
catch (InvalidOperationException ex)

View File

@ -5,4 +5,10 @@
public static BrokerHostSettings BrokerHostSettings;
public static ClientSettings ClientSettings;
}
public class AppSettingsOnlineProvider
{
public static BrokerOnlineHostSettings BrokerHostOnlineSettings;
public static ClientOnlineSettings ClientOnlineSettings;
}
}

View File

@ -12,4 +12,14 @@ namespace Mqtt.Client.AspNetCore.Options
ServiceProvider = serviceProvider;
}
}
public class AspCoreMqttOnlineClientOptionBuilder : MqttClientOptionsBuilder
{
public IServiceProvider ServiceOnlineProvider { get; }
public AspCoreMqttOnlineClientOptionBuilder(IServiceProvider serviceProvider)
{
ServiceOnlineProvider = serviceProvider;
}
}
}

View File

@ -5,4 +5,10 @@
public string Host { set; get; }
public int Port { set; get; }
}
public class BrokerOnlineHostSettings
{
public string Host { set; get; }
public int Port { set; get; }
}
}

View File

@ -6,4 +6,11 @@
public string UserName { set; get; }
public string Password { set; get; }
}
public class ClientOnlineSettings
{
public string Id { set; get; }
public string UserName { set; get; }
public string Password { set; get; }
}
}

View File

@ -11,4 +11,11 @@ namespace Mqtt.Client.AspNetCore.Services
IMqttApplicationMessageReceivedHandler
{
}
public interface IMqttOnlineClientService : IHostedService,
IMqttClientConnectedHandler,
IMqttClientDisconnectedHandler,
IMqttApplicationMessageReceivedHandler
{
}
}

View File

@ -0,0 +1,135 @@
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MyCore.Interfaces.Models;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Mqtt.Client.AspNetCore.Services
{
public class MqttClientOnlineService : IMqttOnlineClientService
{
private static IMqttClient mqttClient;
private IMqttClientOptions onlineOptions;
public static List<Zigbee2MqttDevice> devices = new List<Zigbee2MqttDevice>();
public MqttClientOnlineService(IMqttClientOptions options)
{
this.onlineOptions = options;
mqttClient = new MqttFactory().CreateMqttClient();
ConfigureMqttClient();
}
private void ConfigureMqttClient()
{
mqttClient.ConnectedHandler = this;
mqttClient.DisconnectedHandler = this;
mqttClient.ApplicationMessageReceivedHandler = this;
}
public Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
var payload = "";
if (e.ApplicationMessage.Payload != null)
{
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
}
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
Console.WriteLine();
var topic = e.ApplicationMessage.Topic;
switch (topic)
{
case "zigbee2mqtt/bridge/config/devices":
try
{
var test = JsonConvert.DeserializeObject<List<Zigbee2MqttDevice>>(payload);
devices = test;
// TODO Update in DB, current devices state
}
catch (Exception ex)
{
Console.WriteLine($"Error during retrieving devices ! Exception: {ex}");
}
break;
}
//return new Task(null);
// TODO check what to do
//await PublishMessage("test", "teeest");
return null;
}
public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs)
{
System.Console.WriteLine("connected");
//await mqttClient.SubscribeAsync("hello/world");
await mqttClient.SubscribeAsync("#");
await PublishMessage("zigbee2mqtt/bridge/config/devices/get", "");
}
public async Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs eventArgs)
{
if (!mqttClient.IsConnected)
{
await mqttClient.ReconnectAsync();
}
//throw new System.NotImplementedException();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await mqttClient.ConnectAsync(onlineOptions);
if (!mqttClient.IsConnected)
{
await mqttClient.ReconnectAsync();
}
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if(cancellationToken.IsCancellationRequested)
{
var disconnectOption = new MqttClientDisconnectOptions
{
ReasonCode = MqttClientDisconnectReason.NormalDisconnection,
ReasonString = "NormalDiconnection"
};
await mqttClient.DisconnectAsync(disconnectOption, cancellationToken);
}
await mqttClient.DisconnectAsync();
}
public static async Task PublishMessage(string topic, string message)
{
var mqttMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(message)
.WithExactlyOnceQoS()
.WithRetainFlag()
.Build();
if (mqttClient.IsConnected)
await mqttClient.PublishAsync(mqttMessage);
}
public static List<Zigbee2MqttDevice> GetDevices()
{
return devices;
}
}
}

View File

@ -0,0 +1,12 @@
namespace Mqtt.Client.AspNetCore.Services
{
public class MqttClientOnlineServiceProvider
{
public readonly IMqttOnlineClientService MqttOnlineClientService;
public MqttClientOnlineServiceProvider(IMqttOnlineClientService mqttOnlineClientService)
{
MqttOnlineClientService = mqttOnlineClientService;
}
}
}

View File

@ -23,6 +23,12 @@ namespace Mqtt.Client.AspNetCore.Services
public MqttClientService(IMqttClientOptions options)
{
this.options = options;
this.options = new MqttClientOptionsBuilder()
.WithClientId("ApiService")
.WithTcpServer("192.168.31.140") // TODO replace by localhost
.WithCredentials("mqtt", "mqtt")
.WithCleanSession()
.Build();
mqttClient = new MqttFactory().CreateMqttClient();
ConfigureMqttClient();
}

View File

@ -26,14 +26,32 @@ namespace MyCore.Service.Extensions
return services;
}
public static IServiceCollection AddMqttClientOnlineHostedService(this IServiceCollection services)
{
services.AddMqttClientOnlineServiceWithConfig(aspOnlineOptionBuilder =>
{
var clientSettings = AppSettingsOnlineProvider.ClientOnlineSettings;
var brokerHostSettings = AppSettingsOnlineProvider.BrokerHostOnlineSettings;
aspOnlineOptionBuilder
.WithCredentials(clientSettings.UserName, clientSettings.Password)
.WithClientId(clientSettings.Id)
.WithTcpServer(brokerHostSettings.Host, brokerHostSettings.Port);
});
return services;
}
private static IServiceCollection AddMqttClientServiceWithConfig(this IServiceCollection services, Action<AspCoreMqttClientOptionBuilder> configure)
{
services.AddSingleton<IMqttClientOptions>(serviceProvider =>
// No need as we implement options in service (localhost)
/*services.AddSingleton<IMqttClientOptions>(serviceProvider =>
{
var optionBuilder = new AspCoreMqttClientOptionBuilder(serviceProvider);
configure(optionBuilder);
return optionBuilder.Build();
});
});*/
services.AddSingleton<MqttClientService>();
services.AddSingleton<IHostedService>(serviceProvider =>
{
@ -47,6 +65,28 @@ namespace MyCore.Service.Extensions
});
return services;
}
private static IServiceCollection AddMqttClientOnlineServiceWithConfig(this IServiceCollection services, Action<AspCoreMqttOnlineClientOptionBuilder> configure)
{
services.AddSingleton<IMqttClientOptions>(serviceOnlineProvider =>
{
var optionBuilder = new AspCoreMqttOnlineClientOptionBuilder(serviceOnlineProvider);
configure(optionBuilder);
return optionBuilder.Build();
});
services.AddSingleton<MqttClientOnlineService>();
services.AddSingleton<IHostedService>(serviceProvider =>
{
return serviceProvider.GetService<MqttClientOnlineService>();
});
services.AddSingleton<MqttClientOnlineServiceProvider>(serviceProvider =>
{
var mqttOnlineClientService = serviceProvider.GetService<MqttClientOnlineService>();
var mqttOnlineClientServiceProvider = new MqttClientOnlineServiceProvider(mqttOnlineClientService);
return mqttOnlineClientServiceProvider;
});
return services;
}
}
}

View File

@ -70,6 +70,10 @@ namespace MyCore
BrokerHostSettings brokerHostSettings = new BrokerHostSettings();
Configuration.GetSection(nameof(BrokerHostSettings)).Bind(brokerHostSettings);
AppSettingsProvider.BrokerHostSettings = brokerHostSettings;
BrokerOnlineHostSettings brokerOnlineHostSettings = new BrokerOnlineHostSettings();
Configuration.GetSection(nameof(BrokerOnlineHostSettings)).Bind(brokerOnlineHostSettings);
AppSettingsOnlineProvider.BrokerHostOnlineSettings = brokerOnlineHostSettings;
}
private void MapClientSettings()
@ -77,6 +81,10 @@ namespace MyCore
ClientSettings clientSettings = new ClientSettings();
Configuration.GetSection(nameof(ClientSettings)).Bind(clientSettings);
AppSettingsProvider.ClientSettings = clientSettings;
ClientOnlineSettings clientOnlineSettings = new ClientOnlineSettings();
Configuration.GetSection(nameof(ClientOnlineSettings)).Bind(clientOnlineSettings);
AppSettingsOnlineProvider.ClientOnlineSettings = clientOnlineSettings;
}
// This method gets called by the runtime. Use this method to add services to the container.
@ -196,6 +204,7 @@ namespace MyCore
services.AddScoped<LocationDatabaseService>();
services.AddMqttClientHostedService();
services.AddMqttClientOnlineHostedService();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.

View File

@ -16,6 +16,7 @@
"AccessTokenExpiration": 86400,
"RefreshTokenExpiration": 518400
},
"SecuritySettings": {
"Secret": "azertyuiopqsdfgh",
"Issuer": "MyCore",
@ -23,6 +24,7 @@
"IdType": "Name",
"TokenExpiryInHours": 2
},
"BrokerHostSettings": {
"Host": "192.168.31.140",
"Port": 1883
@ -32,5 +34,16 @@
"Id": "5eb020f043ba8930506acbdd",
"UserName": "mqtt",
"Password": "mqtt"
},
"BrokerOnlineHostSettings": {
"Host": "myhomie.be",
"Port": 1883
},
"ClientOnlineSettings": {
"Id": "5eb020f043ba8930506acbaa",
"UserName": "thomas",
"Password": "MyCore,1"
}
}