Skip to content

Commit

Permalink
Prototype - Trigger subscriptions (#860)
Browse files Browse the repository at this point in the history
* A first prototype of subscribe to triggers

* Removed the dictionary

* some changes

* Draft of TriggerManager

* add CAF

* Simplified

* Correct format

* Removed special classes and updated testapp to use anonumous type

* refactor and filter normal events on message id

* fix test

* fix warnings

* rename

* fix warning

* relax CA on debug project

* Added test for TriggerManager

* small fix

---------

Co-authored-by: Frank Bakker <[email protected]>
  • Loading branch information
helto4real and FrankBakkerNl authored Apr 15, 2023
1 parent b3dc9e4 commit 22e8e70
Show file tree
Hide file tree
Showing 15 changed files with 371 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Collections.Concurrent;

namespace NetDaemon.HassClient.Tests.Integration;

/// <summary>
Expand Down Expand Up @@ -161,6 +163,9 @@ await websocket.SendAsync(new ArraySegment<byte>(bytes, 0, bytes.Length),
WebSocketMessageType.Text, true, _cancelSource.Token).ConfigureAwait(false);
}


private readonly ConcurrentBag<int> _eventSubscriptions = new();

/// <summary>
/// Process incoming websocket requests to simulate Home Assistant websocket API
/// </summary>
Expand Down Expand Up @@ -235,6 +240,8 @@ await ReplaceIdInResponseAndSendMsg(
"result_msg.json",
hassMessage.Id,
webSocket).ConfigureAwait(false);

_eventSubscriptions.Add(hassMessage.Id);

await ReplaceIdInResponseAndSendMsg(
"event.json",
Expand Down Expand Up @@ -299,10 +306,13 @@ await ReplaceIdInResponseAndSendMsg(
hassMessage.Id,
webSocket).ConfigureAwait(false);

await ReplaceIdInResponseAndSendMsg(
"service_event.json",
hassMessage.Id,
webSocket).ConfigureAwait(false);
foreach (var subscription in _eventSubscriptions)
{
await ReplaceIdInResponseAndSendMsg(
"service_event.json",
subscription,
webSocket).ConfigureAwait(false);
}
break;
case "fake_disconnect_test":
// This is not a real home assistant message, just used to test disconnect from socket.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public static async Task<HassConfig> GetConfigAsync(this IHomeAssistantConnectio
throw new NullReferenceException("Unexpected null return from command");
}


/// <summary>
/// Get all configuration from Home Assistant
/// </summary>
Expand Down Expand Up @@ -147,4 +148,24 @@ await connection

return true;
}

public static async Task<HassMessage> SubscribeToTriggerAsync(this IHomeAssistantConnection connection, object trigger, CancellationToken cancelToken)
{
var triggerCommand = new SubscribeTriggerCommand(trigger);

var msg = await connection.SendCommandAndReturnHassMessageResponseAsync
(triggerCommand, cancelToken).ConfigureAwait(false) ??
throw new NullReferenceException("Unexpected null return from command");
return msg;
}

public static async Task UnsubscribeEventsAsync(this IHomeAssistantConnection connection,
int id, CancellationToken cancelToken)
{
var triggerCommand = new UnsubscribeEventsCommand(id);

_ = await connection.SendCommandAndReturnHassMessageResponseAsync
(triggerCommand, cancelToken).ConfigureAwait(false) ??
throw new NullReferenceException("Unexpected null return from command");
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
namespace NetDaemon.Client.HomeAssistant.Model;
using NetDaemon.Client.Common.HomeAssistant.Model;

namespace NetDaemon.Client.HomeAssistant.Model;

public record HassEvent
{
[JsonPropertyName("data")] public JsonElement? DataElement { get; init; }

[JsonPropertyName("variables")] public HassVariable? Variables { get; init; }

[JsonPropertyName("event_type")] public string EventType { get; init; } = string.Empty;

[JsonPropertyName("origin")] public string Origin { get; init; } = string.Empty;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace NetDaemon.Client.Common.HomeAssistant.Model;

public record HassVariable
{
[JsonPropertyName("trigger")] public JsonElement? TriggerElement { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public interface IHomeAssistantConnection : IHomeAssistantApiManager, IAsyncDisp
Task<JsonElement?> SendCommandAndReturnResponseRawAsync<T>(T command, CancellationToken cancelToken)
where T : CommandMessage;

Task<HassMessage?> SendCommandAndReturnHassMessageResponseAsync<T>(T command, CancellationToken cancelToken)
where T : CommandMessage;

/// <summary>
/// Start processing Home Assistant events
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace NetDaemon.Client.Internal.HomeAssistant.Commands;

internal record SubscribeTriggerCommand : CommandMessage
{
public SubscribeTriggerCommand(object trigger)
{
Type = "subscribe_trigger";
Trigger = trigger;
}

[JsonPropertyName("trigger")]
public object Trigger { get; init; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace NetDaemon.Client.Internal.HomeAssistant.Commands;

internal record UnsubscribeEventsCommand : CommandMessage
{
public UnsubscribeEventsCommand(int subscriptionId)
{
Type = "unsubscribe_events";
Subscription = subscriptionId;
}

[JsonPropertyName("subscription")] public int Subscription { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ internal class HomeAssistantConnection : IHomeAssistantConnection, IHomeAssistan

private const int WaitForResultTimeout = 20000;

private int _eventSubscribtionMessageId = -1;

private readonly SemaphoreSlim _messageIdSemaphore = new(1,1);
private int _messageId = 1;

Expand Down Expand Up @@ -48,7 +50,7 @@ IHomeAssistantApiManager apiManager
}

public IObservable<HassEvent> OnHomeAssistantEvent =>
_hassMessageSubject.Where(n => n.Type == "event").Select(n => n.Event!);
_hassMessageSubject.Where(n => n.Type == "event" && n.Id == _eventSubscribtionMessageId).Select(n => n.Event!);

public async Task ProcessHomeAssistantEventsAsync(CancellationToken cancelToken)
{
Expand Down Expand Up @@ -106,6 +108,16 @@ public async Task SendCommandAsync<T>(T command, CancellationToken cancelToken)

public async Task<JsonElement?> SendCommandAndReturnResponseRawAsync<T>(T command, CancellationToken cancelToken)
where T : CommandMessage
{
var hassMessage =
await SendCommandAndReturnHassMessageResponseAsync(command, cancelToken).ConfigureAwait(false);

// The SendCommmandsAndReturnHAssMessageResponse will throw if not successful so just ignore errors here
return hassMessage?.ResultElement;
}

public async Task<HassMessage?> SendCommandAndReturnHassMessageResponseAsync<T>(T command, CancellationToken cancelToken)
where T : CommandMessage
{
var resultMessageTask = await SendCommandAsyncInternal(command, cancelToken);

Expand All @@ -117,14 +129,11 @@ public async Task SendCommandAsync<T>(T command, CancellationToken cancelToken)
throw new InvalidOperationException($"Send command ({command.Type}) did not get response in timely fashion. Sent command is {command.ToJsonElement()}");
}

// We already awaited the task so result
var result = resultMessageTask.Result;

if (result.Success ?? false)
return result.ResultElement;
if (resultMessageTask.Result.Success ?? false)
return resultMessageTask.Result;

// Non successful command should throw exception
throw new InvalidOperationException($"Failed command ({command.Type}) error: {result.Error}. Sent command is {command.ToJsonElement()}");
throw new InvalidOperationException($"Failed command ({command.Type}) error: {resultMessageTask.Result.Error}. Sent command is {command.ToJsonElement()}");
}

public async ValueTask DisposeAsync()
Expand Down Expand Up @@ -165,8 +174,11 @@ await Task.WhenAny(

private async Task SubscribeToAllHomeAssistantEvents(CancellationToken cancelToken)
{
_ = await SendCommandAndReturnResponseAsync<SubscribeEventCommand, object?>(new SubscribeEventCommand(),
var result = await SendCommandAndReturnHassMessageResponseAsync(new SubscribeEventCommand(),
cancelToken).ConfigureAwait(false);

// The id if the message we used to subscribe should be used as the filter for the event messages
_eventSubscribtionMessageId = result?.Id ?? -1;
}

private async Task HandleNewMessages()
Expand Down
117 changes: 117 additions & 0 deletions src/HassModel/NetDaemon.HassModel.Tests/Internal/TriggerManagerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using System.Reactive.Subjects;
using System.Text.Json;
using System.Threading;
using Microsoft.Extensions.DependencyInjection;
using NetDaemon.Client;
using NetDaemon.Client.Common.HomeAssistant.Model;
using NetDaemon.Client.HomeAssistant.Model;
using NetDaemon.Client.Internal.HomeAssistant.Commands;
using NetDaemon.HassModel.Tests.TestHelpers;

namespace NetDaemon.HassModel.Tests.Internal;

public class TriggerManagerTest
{
private readonly ITriggerManager _triggerManager;

private readonly Mock<IHomeAssistantConnection> _hassConnectionMock = new();
private readonly Subject<HassMessage> _messageSubject = new();

private int nextMessageId = 5;

public TriggerManagerTest()
{
_hassConnectionMock.Setup(m => m.SendCommandAndReturnHassMessageResponseAsync(
It.IsAny<SubscribeTriggerCommand>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(() => new HassMessage { Id = nextMessageId });

_hassConnectionMock.Setup(m => m.SendCommandAndReturnHassMessageResponseAsync(
It.IsAny<UnsubscribeEventsCommand>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(() => new HassMessage { Id = nextMessageId });

_hassConnectionMock
.As<IHomeAssistantHassMessages>()
.SetupGet(m => m.OnHassMessage)
.Returns(_messageSubject);

var provider = CreateServiceProvider();
_triggerManager = provider.GetRequiredService<ITriggerManager>();
}


private ServiceProvider CreateServiceProvider()
{
var serviceCollection = new ServiceCollection();
serviceCollection.AddLogging();
serviceCollection.AddScopedHaContext();

var haRunnerMock = new Mock<IHomeAssistantRunner>();

haRunnerMock.SetupGet(n => n.CurrentConnection).Returns(_hassConnectionMock.Object);
serviceCollection.AddSingleton(_ => haRunnerMock.Object);

var provider = serviceCollection.BuildServiceProvider();

return provider;
}


[Fact]
public void RegisterTrigger()
{
var incomingTriggersMock = _triggerManager.RegisterTrigger(new {}).SubscribeMock();

var message = new { }.AsJsonElement();

_messageSubject.OnNext(new HassMessage(){Id = nextMessageId, Event = new HassEvent(){Variables = new HassVariable()
{TriggerElement = message }}});

// Assert
incomingTriggersMock.Verify(e => e.OnNext(message));
}

[Fact]
public async void NoMoreTriggersAfterDispose()
{
// Act
var incomingTriggersMock = _triggerManager.RegisterTrigger(new {}).SubscribeMock();

await ((IAsyncDisposable)_triggerManager).DisposeAsync().ConfigureAwait(false);

// Assert, Dispose should unsubscribe with HA AND stop any messages that do pass
_hassConnectionMock.Verify(m => m.SendCommandAndReturnHassMessageResponseAsync(
new UnsubscribeEventsCommand(nextMessageId), It.IsAny<CancellationToken>()));

_messageSubject.OnNext(new HassMessage(){Id = nextMessageId, Event = new HassEvent(){Variables = new HassVariable()
{TriggerElement = new JsonElement() }}});

incomingTriggersMock.VerifyNoOtherCalls();
}


[Fact]
public void RegisterTriggerCorrectMessagesPerSubscription()
{
nextMessageId = 6;
var incomingTriggersMock6 = _triggerManager.RegisterTrigger(new {}).SubscribeMock();

nextMessageId = 7;
var incomingTriggersMock7 = _triggerManager.RegisterTrigger(new {}).SubscribeMock();

var message6 = new { tag = "six" }.AsJsonElement();
var message7 = new { tag = "seven" }.AsJsonElement();

// Assert
_messageSubject.OnNext(new HassMessage{Id = 6, Event = new HassEvent(){Variables = new HassVariable()
{TriggerElement = message6 }}});


_messageSubject.OnNext(new HassMessage{Id = 7, Event = new HassEvent(){Variables = new HassVariable()
{TriggerElement = message7 }}});

incomingTriggersMock6.Verify(e => e.OnNext(message6), Times.Once);
incomingTriggersMock7.Verify(e => e.OnNext(message7), Times.Once);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ internal static void AddScopedHaContext(this IServiceCollection services)
services.AddTransient<IHaContext>(s => s.GetRequiredService<AppScopedHaContextProvider>());
services.AddScoped<QueuedObservable<HassEvent>>();
services.AddScoped<IQueuedObservable<HassEvent>>(s => s.GetRequiredService<QueuedObservable<HassEvent>>());
services.AddTransient<ITriggerManager, TriggerManager>();
}
}
14 changes: 14 additions & 0 deletions src/HassModel/NetDeamon.HassModel/ITriggerManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace NetDaemon.HassModel;

/// <summary>
/// Enables the creation of triggers
/// </summary>
public interface ITriggerManager
{
/// <summary>
/// Registers a trigger in HA and returns an Observable with the events
/// </summary>
/// <param name="triggerParams">Input data for HA register_trigger command</param>
/// <returns>IObservable with all events resulting from this trigger</returns>
IObservable<JsonElement> RegisterTrigger(object triggerParams);
}
Loading

0 comments on commit 22e8e70

Please sign in to comment.