Skip to content

Commit

Permalink
Do not close persistent connection on dtor. Add disconnect function t…
Browse files Browse the repository at this point in the history
…o cleanly handle reconnection
  • Loading branch information
rmeisler-applovin committed Feb 1, 2021
1 parent 5159c19 commit f601df3
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 61 deletions.
15 changes: 13 additions & 2 deletions lib/Base.php
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,21 @@ public function close(int $status = 1000, string $message = 'ttfn'): void
$this->receive(); // Receiving a close frame will close the socket now.
}

/**
* Disconnect from client/server.
*/
public function disconnect(): void
{
if ($this->isConnected()) {
fclose($this->socket);
$this->socket = null;
}
}

protected function write(string $data): void
{
$length = strlen($data);
$written = fwrite($this->socket, $data);
$written = @fwrite($this->socket, $data);
if ($written === false) {
$this->throwException("Failed to write {$length} bytes.");
}
Expand All @@ -427,7 +438,7 @@ protected function read(string $length): string
{
$data = '';
while (strlen($data) < $length) {
$buffer = fread($this->socket, $length - strlen($data));
$buffer = @fread($this->socket, $length - strlen($data));
if ($buffer === false) {
$read = strlen($data);
$this->throwException("Broken frame, read {$read} of stated {$length} bytes.");
Expand Down
121 changes: 62 additions & 59 deletions lib/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public function __construct(string $uri, array $options = [])

public function __destruct()
{
if ($this->isConnected()) {
if ($this->isConnected() && get_resource_type($this->socket) !== 'persistent stream') {
fclose($this->socket);
}
$this->socket = null;
Expand Down Expand Up @@ -100,8 +100,9 @@ protected function connect(): void
$context = stream_context_create();
}

$persistent = $this->options['persistent'] === true;
$flags = STREAM_CLIENT_CONNECT;
$flags = ($this->options['persistent'] === true) ? $flags | STREAM_CLIENT_PERSISTENT : $flags;
$flags = $persistent ? $flags | STREAM_CLIENT_PERSISTENT : $flags;

$error = $errno = $errstr = null;
set_error_handler(function (int $severity, string $message, string $file, int $line) use (&$error) {
Expand All @@ -127,73 +128,75 @@ protected function connect(): void
throw new ConnectionException($error);
}

// Set timeout on the stream as well.
stream_set_timeout($this->socket, $this->options['timeout']);

// Generate the WebSocket key.
$key = self::generateKey();

// Default headers
$headers = [
'Host' => $host . ":" . $port,
'User-Agent' => 'websocket-client-php',
'Connection' => 'Upgrade',
'Upgrade' => 'websocket',
'Sec-WebSocket-Key' => $key,
'Sec-WebSocket-Version' => '13',
];

// Handle basic authentication.
if ($user || $pass) {
$headers['authorization'] = 'Basic ' . base64_encode($user . ':' . $pass);
}
if (!$persistent || ftell($this->socket) == 0) {
// Set timeout on the stream as well.
stream_set_timeout($this->socket, $this->options['timeout']);

// Generate the WebSocket key.
$key = self::generateKey();

// Default headers
$headers = [
'Host' => $host . ":" . $port,
'User-Agent' => 'websocket-client-php',
'Connection' => 'Upgrade',
'Upgrade' => 'websocket',
'Sec-WebSocket-Key' => $key,
'Sec-WebSocket-Version' => '13',
];

// Handle basic authentication.
if ($user || $pass) {
$headers['authorization'] = 'Basic ' . base64_encode($user . ':' . $pass);
}

// Deprecated way of adding origin (use headers instead).
if (isset($this->options['origin'])) {
$headers['origin'] = $this->options['origin'];
}
// Deprecated way of adding origin (use headers instead).
if (isset($this->options['origin'])) {
$headers['origin'] = $this->options['origin'];
}

// Add and override with headers from options.
if (isset($this->options['headers'])) {
$headers = array_merge($headers, $this->options['headers']);
}
// Add and override with headers from options.
if (isset($this->options['headers'])) {
$headers = array_merge($headers, $this->options['headers']);
}

$header = "GET " . $path_with_query . " HTTP/1.1\r\n" . implode(
"\r\n",
array_map(
function ($key, $value) {
return "$key: $value";
},
array_keys($headers),
$headers
)
) . "\r\n\r\n";
$header = "GET " . $path_with_query . " HTTP/1.1\r\n" . implode(
"\r\n",
array_map(
function ($key, $value) {
return "$key: $value";
},
array_keys($headers),
$headers
)
) . "\r\n\r\n";

// Send headers.
$this->write($header);
// Send headers.
$this->write($header);

// Get server response header (terminated with double CR+LF).
$response = stream_get_line($this->socket, 1024, "\r\n\r\n");
// Get server response header (terminated with double CR+LF).
$response = stream_get_line($this->socket, 1024, "\r\n\r\n");

/// @todo Handle version switching
/// @todo Handle version switching

$address = "{$scheme}://{$host}{$path_with_query}";
$address = "{$scheme}://{$host}{$path_with_query}";

// Validate response.
if (!preg_match('#Sec-WebSocket-Accept:\s(.*)$#mUi', $response, $matches)) {
$error = "Connection to '{$address}' failed: Server sent invalid upgrade response: {$response}";
$this->logger->error($error);
throw new ConnectionException($error);
}
// Validate response.
if (!preg_match('#Sec-WebSocket-Accept:\s(.*)$#mUi', $response, $matches)) {
$error = "Connection to '{$address}' failed: Server sent invalid upgrade response: {$response}";
$this->logger->error($error);
throw new ConnectionException($error);
}

$keyAccept = trim($matches[1]);
$expectedResonse
= base64_encode(pack('H*', sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
$keyAccept = trim($matches[1]);
$expectedResonse
= base64_encode(pack('H*', sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));

if ($keyAccept !== $expectedResonse) {
$error = 'Server sent bad upgrade response.';
$this->logger->error($error);
throw new ConnectionException($error);
if ($keyAccept !== $expectedResonse) {
$error = 'Server sent bad upgrade response.';
$this->logger->error($error);
throw new ConnectionException($error);
}
}

$this->logger->info("Client connected to {$address}");
Expand Down
2 changes: 2 additions & 0 deletions tests/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ public function testPersistentConnection(): void
MockSocket::initialize('client.connect-persistent', $this);
$client = new Client('ws://localhost:8000/my/mock/path', ['persistent' => true]);
$client->send('Connect');
$client->disconnect();
$this->assertFalse($client->isConnected());
$this->assertTrue(MockSocket::isEmpty());
}

Expand Down
5 changes: 5 additions & 0 deletions tests/mock/mock-socket.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ function feof()
$args = func_get_args();
return MockSocket::handle('feof', $args);
}
function ftell()
{
$args = func_get_args();
return MockSocket::handle('ftell', $args);
}
function fclose()
{
$args = func_get_args();
Expand Down
28 changes: 28 additions & 0 deletions tests/scripts/client.connect-persistent.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
],
"return": "persistent stream"
},
{
"function": "ftell",
"params": [
"@mock-stream"
],
"return": 0
},
{
"function": "stream_set_timeout",
"params": [
Expand Down Expand Up @@ -62,6 +69,27 @@
"@mock-stream"
],
"return": 13
},
{
"function": "get_resource_type",
"params": [
"@mock-stream"
],
"return": "persistent stream"
},
{
"function": "get_resource_type",
"params": [
"@mock-stream"
],
"return": "persistent stream"
},
{
"function": "fclose",
"params": [
"@mock-stream"
],
"return":true
}
]

7 changes: 7 additions & 0 deletions tests/scripts/client.destruct.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
[
{
"function": "get_resource_type",
"params": [
"@mock-stream"
],
"return": "stream"
},
{
"function": "get_resource_type",
"params": [
Expand Down

0 comments on commit f601df3

Please sign in to comment.