Update mqttnet package

This commit is contained in:
Martijn Scheepers
2025-01-08 13:27:08 +01:00
parent e9fb456d54
commit 21054c4687
4 changed files with 38 additions and 74 deletions

View File

@@ -23,12 +23,14 @@
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="Moq" Version="4.20.72" />
<PackageReference Include="MQTTnet" Version="5.0.1.1416" />
<PackageReference Include="MSTest.TestAdapter" Version="3.6.4" />
<PackageReference Include="MSTest.TestFramework" Version="3.6.4" />
<PackageReference Include="coverlet.collector" Version="6.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Telegram.Bot" Version="22.3.0" />
</ItemGroup>
<ItemGroup>

View File

@@ -1,16 +1,11 @@
using MQTTnet.Client;
using MQTTnet;
using System.Security.Cryptography.X509Certificates;
namespace UCS_Status_Monitor.MQTT
{
public class ClientCertProvider : IMqttClientCertificatesProvider
public class ClientCertProvider(X509Certificate2 clientCert) : IMqttClientCertificatesProvider
{
public X509Certificate2 _clientCert { get; set; }
public ClientCertProvider(X509Certificate2 clientCert)
{
_clientCert = clientCert;
}
public X509Certificate2 _clientCert { get; set; } = clientCert;
public X509CertificateCollection GetCertificates()
{

View File

@@ -2,7 +2,6 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Diagnostics;
@@ -19,8 +18,6 @@ using UCS_Status_Monitor.Telegram;
using UCS_Status_Monitor.Monitor;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using UCS_Status_Monitor.Models.Database;
using Microsoft.AspNetCore.Components.Routing;
namespace UCS_Status_Monitor.Mqtt
{
@@ -48,7 +45,7 @@ namespace UCS_Status_Monitor.Mqtt
_contextFactory = scope.ServiceProvider.GetRequiredService<IDbContextFactory<MonitorDbContext>>();
_configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();
}
[LoggerMessage(Level = LogLevel.Error, Message = "{message}")]
public partial void LogError(string message);
@@ -61,18 +58,14 @@ namespace UCS_Status_Monitor.Mqtt
//private readonly X509Certificate clientCert = new("Certs\\server.pfx", "12345");
private readonly X509Certificate2 clientCert2 = new("Certs\\server.pfx", "12345");
private readonly AutoResetEvent waitForConnection = new(false);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
MqttFactory mqttFactory = new();
using IManagedMqttClient managedMqttClient = mqttFactory.CreateManagedMqttClient();
MqttClientFactory mqttFactory = new();
using IMqttClient mqttClient = mqttFactory.CreateMqttClient();
var managedMqttClientOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(new MqttClientOptionsBuilder()
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithClientId(_mqttconfig.ClientID)
.WithTcpServer(_mqttconfig.BrokerAddress, _mqttconfig.BrokerPort)
.WithCredentials(_mqttconfig.Username, _mqttconfig.Password)
@@ -101,49 +94,33 @@ namespace UCS_Status_Monitor.Mqtt
return chain.Build(x5092);
},
ClientCertificatesProvider = new ClientCertProvider(clientCert2)
//ClientCertificatesProvider = new DefaultMqttCertificatesProvider(new List<X509Certificate>() { clientCert2 })
})
//.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
.WithCleanSession()
.Build())
.Build();
managedMqttClient.ConnectingFailedAsync += OnConnectingFailed;
managedMqttClient.ConnectedAsync += OnConnected;
managedMqttClient.DisconnectedAsync += OnDisconnected;
managedMqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceived;
mqttClient.ConnectedAsync += OnConnected;
mqttClient.DisconnectedAsync += OnDisconnected;
mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceived;
await managedMqttClient.SubscribeAsync("#");
await managedMqttClient.StartAsync(managedMqttClientOptions);
await mqttClient.ConnectAsync(mqttClientOptions, stoppingToken);
waitForConnection.WaitOne();
var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder().WithTopicFilter(f =>
{
f.WithTopic("#");
}).Build();
await mqttClient.SubscribeAsync(mqttSubscribeOptions, stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
// var message = new MqttApplicationMessageBuilder()
// .WithTopic("MQTTNet/Test")
// .WithPayload("{\"Message\":\"Hello world\"}")
// .WithRetainFlag()
// .Build();
// await mqttClient.EnqueueAsync(message);
// //SpinWait.SpinUntil(() => managedMqttClient.PendingApplicationMessagesCount == 0, 10000);
// //Debug.WriteLine($"Pending messages = {managedMqttClient.PendingApplicationMessagesCount}");
// Debug.WriteLine($"Pending messages = {mqttClient.PendingApplicationMessagesCount}");
if (!managedMqttClient.IsConnected)
if (!mqttClient.IsConnected)
{
await managedMqttClient.StartAsync(managedMqttClientOptions);
await mqttClient.ConnectAsync(mqttClientOptions, stoppingToken);
}
else
{
// Wait until the queue is fully processed.
SpinWait.SpinUntil(() => managedMqttClient.PendingApplicationMessagesCount == 0, 10000);
}
await Task.Delay(10000, stoppingToken);
}
await managedMqttClient.StopAsync();
await mqttClient.TryDisconnectAsync();
}
catch (Exception ex)
{
@@ -152,17 +129,9 @@ namespace UCS_Status_Monitor.Mqtt
}
}
private async Task<Task> OnConnectingFailed(ConnectingFailedEventArgs args)
{
Debug.WriteLine("Couldn't connect to broker." + args.Exception.Message);
LogError(args.Exception, "Connect to broker failed.");
return Task.CompletedTask;
}
private async Task<Task> OnConnected(MqttClientConnectedEventArgs args)
{
Debug.WriteLine("Mqtt connected to broker");
waitForConnection.Set();
return Task.CompletedTask;
}
@@ -173,17 +142,11 @@ namespace UCS_Status_Monitor.Mqtt
return Task.CompletedTask;
}
//private async Task<Task> OnApplicationMessageProcessed(ApplicationMessageProcessedEventArgs args)
//{
// Debug.WriteLine("OnApplicationMessageProcessed");
// return Task.CompletedTask;
//}
private async Task<Task> OnApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs args)
{
Debug.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Debug.WriteLine($"+ Topic = {args.ApplicationMessage.Topic}");
Debug.WriteLine($"+ Payload = {Encoding.UTF8.GetString(args.ApplicationMessage.PayloadSegment)}");
Debug.WriteLine($"+ Payload = {Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}");
Debug.WriteLine($"+ QoS = {args.ApplicationMessage.QualityOfServiceLevel}");
Debug.WriteLine($"+ Retain = {args.ApplicationMessage.Retain}");
Debug.WriteLine("-------------------------------");
@@ -224,11 +187,12 @@ namespace UCS_Status_Monitor.Mqtt
{
//Debug.WriteLine(args.ApplicationMessage.PayloadSegment);
//Version 2 message
TeltonikaMessageV2? teltonikaMessage = MQTTDeviceMessage.Deserialize<TeltonikaMessageV2>(Encoding.UTF8.GetString(args.ApplicationMessage.PayloadSegment), device, location);
TeltonikaMessageV2? teltonikaMessage = MQTTDeviceMessage.Deserialize<TeltonikaMessageV2>(Encoding.UTF8.GetString(args.ApplicationMessage.Payload), device, location);
if (teltonikaMessage == null)
{
LogError($"Teltonika Mqtt error - {BitConverter.ToString(args.ApplicationMessage.PayloadSegment.ToArray())}");
//LogError($"Teltonika Mqtt error - {BitConverter.ToString(args.ApplicationMessage.Payload.ToArray()))}");
LogError($"Teltonika Mqtt error - {Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}");
return Task.CompletedTask;
}
@@ -254,11 +218,12 @@ namespace UCS_Status_Monitor.Mqtt
if (args.ApplicationMessage.Topic.EndsWith("/event/connection"))
{
AxisCamConnectionMessage? axisCamMessage = MQTTDeviceMessage.Deserialize<AxisCamConnectionMessage>(Encoding.UTF8.GetString(args.ApplicationMessage.PayloadSegment), device, location);
AxisCamConnectionMessage? axisCamMessage = MQTTDeviceMessage.Deserialize<AxisCamConnectionMessage>(Encoding.UTF8.GetString(args.ApplicationMessage.Payload), device, location);
if (axisCamMessage == null)
{
LogError($"Axis cam Mqtt error - {BitConverter.ToString(args.ApplicationMessage.PayloadSegment.ToArray())}");
//LogError($"Axis cam Mqtt error - {BitConverter.ToString(args.ApplicationMessage.PayloadSegment.Array)}");
LogError($"Axis cam Mqtt error - {Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}");
return Task.CompletedTask;
}
@@ -276,11 +241,12 @@ namespace UCS_Status_Monitor.Mqtt
}
else if (args.ApplicationMessage.Topic.EndsWith("/status"))
{
AxisCamStatusMessage? axisCamStatusMessage = MQTTDeviceMessage.Deserialize<AxisCamStatusMessage>(Encoding.UTF8.GetString(args.ApplicationMessage.PayloadSegment), device, location);
AxisCamStatusMessage? axisCamStatusMessage = MQTTDeviceMessage.Deserialize<AxisCamStatusMessage>(Encoding.UTF8.GetString(args.ApplicationMessage.Payload), device, location);
if (axisCamStatusMessage == null)
{
LogError($"Axis cam Mqtt error - {BitConverter.ToString(args.ApplicationMessage.PayloadSegment.ToArray())}");
//LogError($"Axis cam Mqtt error - {BitConverter.ToString(args.ApplicationMessage.PayloadSegment.ToArray())}");
LogError($"Axis cam Mqtt error - {Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}");
return Task.CompletedTask;
}
@@ -312,11 +278,12 @@ namespace UCS_Status_Monitor.Mqtt
//5623MT001/S13CAM03/event/tns:onvif/Device/tns:axis/Status/Temperature/Inside
//{"topic":"onvif:Device/axis:Status/Temperature/Inside","timestamp":1720515011254,"message":{"source":{},"key":{},"data":{"sensor_level":"1"}}}
AxisCamStatusMessage? axisCamStatusMessage = MQTTDeviceMessage.Deserialize<AxisCamStatusMessage>(Encoding.UTF8.GetString(args.ApplicationMessage.PayloadSegment), device, location);
AxisCamStatusMessage? axisCamStatusMessage = MQTTDeviceMessage.Deserialize<AxisCamStatusMessage>(Encoding.UTF8.GetString(args.ApplicationMessage.Payload), device, location);
if (axisCamStatusMessage == null)
{
LogError($"Axis cam Mqtt error - {BitConverter.ToString(args.ApplicationMessage.PayloadSegment.ToArray())}");
//LogError($"Axis cam Mqtt error - {BitConverter.ToString(args.ApplicationMessage.PayloadSegment.ToArray())}");
LogError($"Axis cam Mqtt error - {Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}");
return Task.CompletedTask;
}

View File

@@ -40,11 +40,11 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="MQTTnet" Version="4.3.7.1207" />
<PackageReference Include="MQTTnet" Version="5.0.1.1416" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.3.7.1207" />
<PackageReference Include="System.DirectoryServices.AccountManagement" Version="8.0.1" />
<PackageReference Include="System.Management" Version="8.0.0" />
<PackageReference Include="Telegram.Bot" Version="22.2.0" />
<PackageReference Include="Telegram.Bot" Version="22.3.0" />
</ItemGroup>
<ItemGroup>