Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WSS: Add Web Socket Server #795

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions neo-modules.sln
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SQLiteWallet", "src\SQLiteW
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "StorageDumper", "src\StorageDumper\StorageDumper.csproj", "{938D86EA-0F48-436B-9255-4AD9A8E6B9AC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WebSocketServer", "src\WebSocketServer\WebSocketServer.csproj", "{776B6533-D073-4541-8288-3736B8F9DD9D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -121,6 +123,10 @@ Global
{938D86EA-0F48-436B-9255-4AD9A8E6B9AC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{938D86EA-0F48-436B-9255-4AD9A8E6B9AC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{938D86EA-0F48-436B-9255-4AD9A8E6B9AC}.Release|Any CPU.Build.0 = Release|Any CPU
{776B6533-D073-4541-8288-3736B8F9DD9D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{776B6533-D073-4541-8288-3736B8F9DD9D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{776B6533-D073-4541-8288-3736B8F9DD9D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{776B6533-D073-4541-8288-3736B8F9DD9D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -144,6 +150,7 @@ Global
{8D2EE375-2E2D-45FE-A4E9-0254D12C7554} = {59D802AB-C552-422A-B9C3-64D329FBCDCC}
{D121D57A-512E-4F74-ADA1-24482BF5C42B} = {97E81C78-1637-481F-9485-DA1225E94C23}
{938D86EA-0F48-436B-9255-4AD9A8E6B9AC} = {97E81C78-1637-481F-9485-DA1225E94C23}
{776B6533-D073-4541-8288-3736B8F9DD9D} = {97E81C78-1637-481F-9485-DA1225E94C23}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {61D3ADE6-BBFC-402D-AB42-1C71C9F9EDE3}
Expand Down
57 changes: 57 additions & 0 deletions src/WebSocketServer/Behaviors/BlockWebSocketBehavior.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System;
using System.Threading.Tasks;
using Neo.Json;
using WebSocketSharp;
using WebSocketSharp.Server;

namespace Neo.Plugins.WebSocketServer.Behaviors;

public class BlockWebSocketBehavior : WebSocketBehavior
{
private Guid ClientId { get; set; }

public BlockWebSocketBehavior()
{
ClientId = Guid.NewGuid();
}

protected override void OnOpen()
{
base.OnOpen();
WebSocketServerPlugin.AddClient(ClientId, this);
}

protected override void OnClose(CloseEventArgs e)
{
base.OnClose(e);
WebSocketServerPlugin.RemoveClient(ClientId);
}

// public async Task SendPersistedBlockMessage(string message)
// {
// await SendAsync(message, completed => { });
// }

protected override void OnMessage(MessageEventArgs e)
{
var request = JToken.Parse(e.Data);

var action = request?["action"]?.ToString();
if (action == "subscribe")
{
// The 'message' parameter will be passed from the WebSocketServerPlugin class
// Task.Run(() => SendPersistedBlockMessage(message));
}
}

public Task SendPersistedBlockMessage(string message)
{
var response = new JObject
{
["action"] = "blockPersisted",
["block"] = message
};
Send(response.ToString());
return Task.CompletedTask;
}
}
10 changes: 10 additions & 0 deletions src/WebSocketServer/WebSocketServer.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<RootNamespace>Neo.Plugins.WebSocketServer</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Neo.ConsoleService" Version="1.2.0" />
<PackageReference Include="WebSocketSharp" Version="1.0.3-rc11" />
</ItemGroup>
</Project>
104 changes: 104 additions & 0 deletions src/WebSocketServer/WebSocketServerPlugin.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Neo.ConsoleService;
using Newtonsoft.Json.Linq;
using Neo.Ledger;
using Neo.Network.P2P.Payloads;
using Neo.Plugins.WebSocketServer.Behaviors;

namespace Neo.Plugins.WebSocketServer;

public class WebSocketServerPlugin : Plugin
{
public override string Name => "NeoWebSocketServer";
public override string Description => "Enables WebSocket notifications for the node";

private Settings _settings;
private static readonly Dictionary<Guid, BlockWebSocketBehavior> Handlers = new();
Copy link
Member

@cschuchardt88 cschuchardt88 Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should be using ConcurrentDictionary, so you dont have to use Lock. Reason being if have, let's say a million clients using "lock" will have a delay and lock the thread for new clients connecting to the server. You can use ConcurrentDictionary to fix this problem. because like i said new clients wont get a response. especially if there is blocks with 5k transactions in it. They would be waiting for lock to be lifted.

private static WebSocketSharp.Server.WebSocketServer _server;
private NeoSystem _system;

public WebSocketServerPlugin()
{
Blockchain.Committed += OnCommitted;
}

protected override void Configure()
{
_settings ??= new Settings(GetConfiguration());
}

protected override void OnSystemLoaded(NeoSystem system)
{
_system = system;
}

public override void Dispose()
{
Blockchain.Committed -= OnCommitted;
_server?.Stop();
base.Dispose();
}

[ConsoleCommand("start wss", Category = "wss", Description = "Open Web Socket Server")]
private void OnStart()
{
if (_server is { IsListening: true }) return;

var s = _settings.Servers.FirstOrDefault(p => p.Network == _system.Settings.Network);
if (s == null) return;

var useSsl = !string.IsNullOrEmpty(s.SslCert) && !string.IsNullOrEmpty(s.SslCertPassword);
_server = new WebSocketSharp.Server.WebSocketServer(s.BindAddress, s.Port, useSsl);
if (useSsl)
{
_server.SslConfiguration.ServerCertificate = new System.Security.Cryptography.X509Certificates.X509Certificate2(s.SslCert, s.SslCertPassword);
}
_server.AddWebSocketService<BlockWebSocketBehavior>("/block", () => new BlockWebSocketBehavior());
_server.Start();
}

[ConsoleCommand("close wss", Category = "wss", Description = "Close Web Socket Server")]
private void OnClose()
{
if (_server is { IsListening: true }) _server.Stop();
ConsoleHelper.Info("Web Socket Server closed");
}

private static async void OnCommitted(NeoSystem system, Block block)
{
using var snapshot = system.GetSnapshot();
var blockJson = JObject.FromObject(block);
await SendMessageToClients(blockJson.ToString());
}

private static async Task SendMessageToClients(string message)
{
var sendTasks = new List<Task>();

lock (Handlers)
{
sendTasks.AddRange(Handlers.Values.Select(handler => handler.SendPersistedBlockMessage(message)));
}

await Task.WhenAll(sendTasks);
}

public static void AddClient(Guid clientId, BlockWebSocketBehavior client)
{
lock (Handlers)
{
Handlers[clientId] = client;
}
}

public static void RemoveClient(Guid clientId)
{
lock (Handlers)
{
Handlers.Remove(clientId);
}
}
}
52 changes: 52 additions & 0 deletions src/WebSocketServer/WebSocketServerSetting.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using Microsoft.Extensions.Configuration;

namespace Neo.Plugins.WebSocketServer;

public record WebSocketServerSetting
{
public uint Network { get; init; }
public IPAddress BindAddress { get; init; }
public ushort Port { get; init; }
public string SslCert { get; init; }
public string SslCertPassword { get; init; }
public List<string> TrustedAuthorities { get; init; }
public int MaxConcurrentConnections { get; init; }
public bool SessionEnabled { get; init; }
public TimeSpan SessionExpirationTime { get; init; }

public static WebSocketServerSetting Default { get; } = new()
{
Network = 5195086u,
BindAddress = IPAddress.None,
MaxConcurrentConnections = 40,
SessionEnabled = false,
SessionExpirationTime = TimeSpan.FromSeconds(60)
};

public static WebSocketServerSetting Load(IConfigurationSection section) => new()
{
Network = section.GetValue("Network", Default.Network),
BindAddress = IPAddress.Parse(section.GetSection("BindAddress").Value),
Port = ushort.Parse(section.GetSection("Port").Value),
SslCert = section.GetValue("SslCert", ""),
SslCertPassword = section.GetValue("SslCertPassword", ""),
TrustedAuthorities = section.GetSection("TrustedAuthorities").Get<List<string>>() ?? new List<string>(),
MaxConcurrentConnections = section.GetValue("MaxConcurrentConnections", Default.MaxConcurrentConnections),
SessionEnabled = section.GetValue("SessionEnabled", Default.SessionEnabled),
SessionExpirationTime = TimeSpan.FromSeconds(section.GetValue("SessionExpirationTime", (int)Default.SessionExpirationTime.TotalSeconds))
};
}

public class Settings
{
public IReadOnlyList<WebSocketServerSetting> Servers { get; init; }

public Settings(IConfigurationSection section)
{
Servers = section.GetSection(nameof(Servers)).GetChildren().Select(p => WebSocketServerSetting.Load(p)).ToArray();
}
}
17 changes: 17 additions & 0 deletions src/WebSocketServer/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"PluginConfiguration": {
"Servers": [
{
"Network": 860833102,
"BindAddress": "127.0.0.1",
"Port": 10334,
"SslCert": "",
"SslCertPassword": "",
"TrustedAuthorities": [],
"MaxConcurrentConnections": 40,
"SessionEnabled": false,
"SessionExpirationTime": 60
Jim8y marked this conversation as resolved.
Show resolved Hide resolved
}
]
}
}