Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed dependencies on ConnectionGrain in GroupGrain and UserGrain #2

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Orleans.SignalR/Clients/ClientGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using Orleans.Providers;
using Orleans.SignalR.Core;
using Orleans.SignalR.Connections;
using Orleans.Streams;
using System;
using System.Collections.Generic;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.Logging;
using Orleans;
using Orleans.Concurrency;
using Orleans.Streams;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Orleans.SignalR.Core
namespace Orleans.SignalR.Connections
{
internal abstract class ConnectionGrain<TGrainState> : Grain<TGrainState>, IConnectionGrain
where TGrainState : ConnectionState, new()
internal class ConnectionGrain : Grain<ConnectionState>, IConnectionGrain
{
private readonly ILogger _logger;
private IStreamProvider _streamProvider;
private Dictionary<string, StreamSubscriptionHandle<string>> _connectionStreamHandles;

protected ConnectionGrainKey KeyData;

internal ConnectionGrain(ILogger logger)
public ConnectionGrain(ILogger<ConnectionGrain> logger)
{
_logger = logger;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Diagnostics;

namespace Orleans.SignalR.Core
namespace Orleans.SignalR.Connections
{
[DebuggerDisplay("{DebuggerDisplay,nq}")]
internal struct ConnectionGrainKey
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using System.Collections.Generic;
using Orleans.SignalR.Core;
using System.Collections.Generic;
using System.Threading.Tasks;
using Orleans;

namespace Orleans.SignalR.Core
namespace Orleans.SignalR.Connections
{
/// <summary>
/// Grain interface Grouped of connections, such as user or custom group.
Expand Down
13 changes: 7 additions & 6 deletions src/Orleans.SignalR/Core/GrainExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.AspNetCore.SignalR.Protocol;
using Orleans.Concurrency;
using Orleans.SignalR.Clients;
using Orleans.SignalR.Connections;
using Orleans.SignalR.Core;
using Orleans.SignalR.Groups;
using Orleans.SignalR.Users;
Expand Down Expand Up @@ -55,12 +56,12 @@ public static void SendOneWay(this IHubMessageInvoker grain, string methodName,
grain.InvokeOneWay(g => g.Send(methodName, args));
}

[Obsolete("Use Send instead", false)]
public static async Task SendSignalRMessage(this IConnectionGrain grain, string methodName, params object[] message)
{
var invocationMessage = new InvocationMessage(methodName, message).AsImmutable();
await grain.Send(invocationMessage);
}
//[Obsolete("Use Send instead", false)]
//public static async Task SendSignalRMessage(this IConnectionGrain grain, string methodName, params object[] message)
//{
// var invocationMessage = new InvocationMessage(methodName, message).AsImmutable();
// await grain.Send(invocationMessage);
//}

/// <summary>
/// Invokes a method on the hub.
Expand Down
113 changes: 109 additions & 4 deletions src/Orleans.SignalR/Groups/GroupGrain.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,125 @@
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using Orleans.Providers;
using Orleans.SignalR.Core;
using Orleans.SignalR.Connections;
using Orleans.Streams;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Orleans.SignalR.Groups
{
[StorageProvider(ProviderName = SignalrConstants.STORAGE_PROVIDER)]
[Reentrant]
internal class GroupGrain : ConnectionGrain<GroupState>, IGroupGrain
internal class GroupGrain : Grain<GroupState>, IGroupGrain
{
public GroupGrain(ILogger<GroupGrain> logger) : base(logger)
private readonly ILogger _logger;
private IStreamProvider _streamProvider;
private Dictionary<string, StreamSubscriptionHandle<string>> _connectionStreamHandles;

protected ConnectionGrainKey KeyData;

public GroupGrain(ILogger<GroupGrain> logger)
{
_logger = logger;
}

public override async Task OnActivateAsync()
{
KeyData = new ConnectionGrainKey(this.GetPrimaryKeyString());
_connectionStreamHandles = new Dictionary<string, StreamSubscriptionHandle<string>>();
_streamProvider = GetStreamProvider(SignalrConstants.STREAM_PROVIDER);
var subscriptionTasks = new List<Task>();
foreach (var connection in State.Connections)
{
var clientDisconnectStream = _streamProvider.GetStream<string>(SignalrConstants.CLIENT_DISCONNECT_STREAM_ID, connection);
var subscriptions = await clientDisconnectStream.GetAllSubscriptionHandles();
foreach (var subscription in subscriptions)
{
subscriptionTasks.Add(subscription.ResumeAsync(async (connectionId, _) => await Remove(connectionId)));
}
}
await Task.WhenAll(subscriptionTasks);
}

public virtual async Task Add(string connectionId)
{
var shouldWriteState = State.Connections.Add(connectionId);
if (!_connectionStreamHandles.ContainsKey(connectionId))
{
var clientDisconnectStream = _streamProvider.GetStream<string>(SignalrConstants.CLIENT_DISCONNECT_STREAM_ID, connectionId);
var subscription = await clientDisconnectStream.SubscribeAsync(async (connId, _) => await Remove(connId));
_connectionStreamHandles[connectionId] = subscription;
}

if (shouldWriteState)
await WriteStateAsync();
}

public virtual async Task Remove(string connectionId)
{
var shouldWriteState = State.Connections.Remove(connectionId);
if (_connectionStreamHandles.TryGetValue(connectionId, out var stream))
{
await stream.UnsubscribeAsync();
_connectionStreamHandles.Remove(connectionId);
}

if (State.Connections.Count == 0)
{
await ClearStateAsync();
DeactivateOnIdle();
}
else if (shouldWriteState)
{
await WriteStateAsync();
}
}

public virtual Task Send(Immutable<InvocationMessage> message)
{
return SendAll(message, State.Connections);
}

public Task SendExcept(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds)
{
var message = new Immutable<InvocationMessage>(new InvocationMessage(methodName, args));
return SendAll(message, State.Connections.Where(x => !excludedConnectionIds.Contains(x)).ToList());
}

public Task<int> Count()
{
return Task.FromResult(State.Connections.Count);
}

protected Task SendAll(Immutable<InvocationMessage> message, IReadOnlyCollection<string> connections)
{
_logger.LogDebug("Sending message to {hubName}.{targetMethod} on group {groupId} to {connectionsCount} connection(s)",
KeyData.HubName, message.Value.Target, KeyData.Id, connections.Count);

var tasks = ArrayPool<Task>.Shared.Rent(connections.Count);
try
{
int index = 0;
foreach (var connection in connections)
{
var client = GrainFactory.GetClientGrain(KeyData.HubName, connection);
tasks[index++] = client.Send(message);
}

return Task.WhenAll(tasks.Where(x => x != null).ToArray());
}
finally
{
ArrayPool<Task>.Shared.Return(tasks);
}
}
}

internal class GroupState : ConnectionState
internal class GroupState
{
public HashSet<string> Connections { get; set; } = new HashSet<string>();
}
}
31 changes: 30 additions & 1 deletion src/Orleans.SignalR/Groups/IGroupGrain.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,37 @@
using Orleans.SignalR.Core;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Orleans.SignalR.Groups
{
public interface IGroupGrain : IConnectionGrain
/// <summary>
/// Grain interface Grouped of connections, such as user or custom group.
/// </summary>
public interface IGroupGrain : IHubMessageInvoker, IGrainWithStringKey
{
/// <summary>
/// Add connection id to the group.
/// </summary>
/// <param name="connectionId">Connection id to add.</param>
Task Add(string connectionId);

/// <summary>
/// Remove the connection id to the group.
/// </summary>
/// <param name="connectionId">Connection id to remove.</param>
Task Remove(string connectionId);

/// <summary>
/// Gets the connection count in the group.
/// </summary>
Task<int> Count();

/// <summary>
/// Invokes a method on the hub except the specified connection ids.
/// </summary>
/// <param name="methodName">Target method name to invoke.</param>
/// <param name="args">Arguments to pass to the target method.</param>
/// <param name="excludedConnectionIds">Connection ids to exclude.</param>
Task SendExcept(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds);
}
}
5 changes: 5 additions & 0 deletions src/Orleans.SignalR/Orleans.SignalR.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
<RepositoryUrl>https://github.com/zeus82/Orleans.SignalR.git</RepositoryUrl>
</PropertyGroup>

<ItemGroup>
<Compile Remove="Connections\ConnectionGrain.cs" />
<Compile Remove="Connections\IConnectionGrain.cs" />
</ItemGroup>

<ItemGroup>
<!-- see https://gist.github.com/tebeco/a6e0d9a3885a0a36e702795219bd4fe9 -->
<FrameworkReference Include="Microsoft.AspNetCore.App" />
Expand Down
31 changes: 30 additions & 1 deletion src/Orleans.SignalR/Users/IUserGrain.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,37 @@
using Orleans.SignalR.Core;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Orleans.SignalR.Users
{
public interface IUserGrain : IConnectionGrain
/// <summary>
/// Grain interface Grouped of connections, such as user or custom group.
/// </summary>
public interface IUserGrain : IHubMessageInvoker, IGrainWithStringKey
{
/// <summary>
/// Add connection id to the group.
/// </summary>
/// <param name="connectionId">Connection id to add.</param>
Task Add(string connectionId);

/// <summary>
/// Remove the connection id to the group.
/// </summary>
/// <param name="connectionId">Connection id to remove.</param>
Task Remove(string connectionId);

/// <summary>
/// Gets the connection count in the group.
/// </summary>
Task<int> Count();

/// <summary>
/// Invokes a method on the hub except the specified connection ids.
/// </summary>
/// <param name="methodName">Target method name to invoke.</param>
/// <param name="args">Arguments to pass to the target method.</param>
/// <param name="excludedConnectionIds">Connection ids to exclude.</param>
Task SendExcept(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds);
}
}
Loading