Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listener fix #228

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Fleck/Interfaces/ISocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public interface ISocket
bool NoDelay { get; set; }
EndPoint LocalEndPoint { get; }

Task<ISocket> Accept(Action<ISocket> callback, Action<Exception> error);
Task Accept(Action<ISocket> callback, Action<Exception> error);
Task Send(byte[] buffer, Action callback, Action<Exception> error);
Task<int> Receive(byte[] buffer, Action<int> callback, Action<Exception> error, int offset = 0);
Task Authenticate(X509Certificate2 certificate, SslProtocols enabledSslProtocols, Action callback, Action<Exception> error);
Expand Down
10 changes: 5 additions & 5 deletions src/Fleck/SocketWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,14 @@ public Task<int> Receive(byte[] buffer, Action<int> callback, Action<Exception>
}
}

public Task<ISocket> Accept(Action<ISocket> callback, Action<Exception> error)
public Task Accept(Action<ISocket> callback, Action<Exception> error)
{
Func<IAsyncResult, ISocket> end = r => _tokenSource.Token.IsCancellationRequested ? null : new SocketWrapper(_socket.EndAccept(r));
Func<IAsyncResult, ISocket> end = r => _tokenSource.Token.IsCancellationRequested ? null : new SocketWrapper (_socket.EndAccept (r));
var task = _taskFactory.FromAsync(_socket.BeginAccept, end, null);
task.ContinueWith(t => callback(t.Result), TaskContinuationOptions.OnlyOnRanToCompletion)
.ContinueWith(t => error(t.Exception), TaskContinuationOptions.OnlyOnFaulted);

task.ContinueWith(t => error(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
return task;

return task.ContinueWith (t => callback (t.Result), TaskContinuationOptions.OnlyOnRanToCompletion);
}

public void Dispose()
Expand Down
62 changes: 42 additions & 20 deletions src/Fleck/WebSocketServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using System.Security.Cryptography.X509Certificates;
using System.Collections.Generic;
using System.Security.Authentication;
using System.Threading;
using System.Threading.Tasks;
using Fleck.Helpers;

namespace Fleck
Expand Down Expand Up @@ -93,34 +95,54 @@ public void Start(Action<IWebSocketConnection> config)
_config = config;
}

private void ListenForClients()
private void TryRestart ()
{
ListenerSocket.Accept(OnClientConnect, e => {
FleckLog.Error("Listener socket is closed", e);
if(RestartAfterListenError){
FleckLog.Info("Listener socket restarting");
try
{
ListenerSocket.Dispose();
var socket = new Socket(_locationIP.AddressFamily, SocketType.Stream, ProtocolType.IP);
ListenerSocket = new SocketWrapper(socket);
Start(_config);
FleckLog.Info("Listener socket restarted");
}
catch (Exception ex)
{
FleckLog.Error("Listener could not be restarted", ex);
}
}
});
FleckLog.Info ("Listener socket restarting");
try {
ListenerSocket.Dispose ();
var socket = new Socket (_locationIP.AddressFamily, SocketType.Stream, ProtocolType.IP);
ListenerSocket = new SocketWrapper (socket);
Start (_config);
FleckLog.Info ("Listener socket restarted");
} catch (Exception ex) {
FleckLog.Error ("Listener socket could not be restarted", ex);
}
}

private void ListenForClients ()
{
ManualResetEvent acceptDone = new ManualResetEvent (false);
bool running = true;

Task.Run (() => {

while (running) {

acceptDone.Reset ();

var task = ListenerSocket.Accept(
s => {
running = (s != null);
acceptDone.Set ();
OnClientConnect (s); },
e => { FleckLog.Error ("Error while listening for new clients", e);
if (RestartAfterListenError) TryRestart ();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this should properly separate listener and client errors, I'm not sure if having the restart is necessary.

running = false; acceptDone.Set (); }
);

task.ContinueWith((t) => FleckLog.Warn ("Error during client connect", t.Exception),
TaskContinuationOptions.OnlyOnFaulted);

acceptDone.WaitOne ();
}
});
}

private void OnClientConnect(ISocket clientSocket)
{
if (clientSocket == null) return; // socket closed

FleckLog.Debug(String.Format("Client connected from {0}:{1}", clientSocket.RemoteIpAddress, clientSocket.RemotePort.ToString()));
ListenForClients();

WebSocketConnection connection = null;

Expand Down