diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..36fb140 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.idea +workerman-ripple.imi +.php-cs-fixer.cache + +/vendor/ +composer.lock diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/UniappTool.xml b/.idea/UniappTool.xml new file mode 100644 index 0000000..f7328e8 --- /dev/null +++ b/.idea/UniappTool.xml @@ -0,0 +1,10 @@ + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..2b1c5dd --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,26 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..639900d --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..449fc54 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/php.xml b/.idea/php.xml new file mode 100644 index 0000000..743ec76 --- /dev/null +++ b/.idea/php.xml @@ -0,0 +1,96 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.php-cs-fixer.php b/.php-cs-fixer.php new file mode 100644 index 0000000..e31daea --- /dev/null +++ b/.php-cs-fixer.php @@ -0,0 +1,47 @@ +in(__DIR__) + ->name('*.php') + ->notName('*.blade.php'); + +$config = new Config(); + +$config->setFinder($finder); +$config->setRiskyAllowed(true); + +return $config->setRules([ + '@PSR12' => true, + 'native_function_invocation' => [ + 'include' => ['@all'], + 'scope' => 'all', + 'strict' => true, + ], + 'native_constant_invocation' => [ + 'include' => ['@all'], + 'scope' => 'all', + 'strict' => true, + ], + 'global_namespace_import' => [ + 'import_classes' => true, + 'import_constants' => true, + 'import_functions' => true, + ], + 'declare_strict_types' => true, + 'linebreak_after_opening_tag' => false, + 'blank_line_after_opening_tag' => false, + 'no_unused_imports' => true, +]); diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..c48e73d --- /dev/null +++ b/composer.json @@ -0,0 +1,29 @@ +{ + "name": "cclilshy/workerman-ripple", + "autoload": { + "psr-4": { + "Workerman\\Ripple\\": "src/" + } + }, + "authors": [ + { + "name": "cclilshy", + "email": "jingnigg@gmail.com" + } + ], + "require": { + "php": ">=8.1", + "ext-pcntl": "*", + "ext-posix": "*", + "ext-sockets": "*", + "cloudtay/ripple": "^1.0", + "walkor/workerman": "^4.1", + "cloudtay/ripple-http": "^0.3.0" + }, + "require-dev": { + "friendsofphp/php-cs-fixer": "*", + "phpunit/phpunit": "*" + }, + "minimum-stability": "dev", + "prefer-stable": true +} diff --git a/example/example.php b/example/example.php new file mode 100644 index 0000000..2b406d8 --- /dev/null +++ b/example/example.php @@ -0,0 +1,47 @@ +onWorkerStart = static function () { + $asyncTcpConnection = Utils::asyncTcpConnection('ssl://www.google.com:443'); + $asyncTcpConnection->onConnect = static function (TcpConnection $connection) { + echo 'Connected to google.com' , \PHP_EOL; + $connection->send("GET / HTTP/1.1\r\nHost: www.google.com\r\nConnection: close\r\n\r\n"); + }; + + $asyncTcpConnection->onMessage = static function (TcpConnection $connection, string $data) { + echo 'Received data from google.com: ' . \substr($data, 0, 10),'...' . \PHP_EOL; + }; + + $asyncTcpConnection->connectViaProxy('socks5://127.0.0.1:1080'); +}; + +$worker->onMessage = static function (TcpConnection $connection, string $data) { + switch (\trim($data, "\n\r\t\v\0")) { + case 'ping': + \Co\sleep(1); + $connection->send('pong'); + break; + case 'curl': + try { + $guzzle = Utils::guzzle(); + $response = $guzzle->get('https://www.baidu.com'); + $connection->send('status: ' . $response->getStatusCode()); + } catch (Throwable $e) { + $connection->send('error: ' . $e->getMessage()); + } + break; + default: + $connection->send('Invalid command'); + } +}; + +Worker::$eventLoopClass = Driver::class; +Worker::runAll(); diff --git a/src/Connection/AsyncTcpConnection.php b/src/Connection/AsyncTcpConnection.php new file mode 100644 index 0000000..b0294c4 --- /dev/null +++ b/src/Connection/AsyncTcpConnection.php @@ -0,0 +1,213 @@ +transport === 'unix') { + $this->connect(); + return; + } + + if ($this->_status !== self::STATUS_INITIAL && $this->_status !== self::STATUS_CLOSING && + $this->_status !== self::STATUS_CLOSED) { + return; + } + $this->_status = self::STATUS_CONNECTING; + $this->_connectStartTime = microtime(true); + + /*** Ripple:Overwrite */ + try { + $this->_socket = $this->connectProxy($proxy); + } catch (Exception $e) { + $this->emitError(WORKERMAN_CONNECT_FAIL, $e->getMessage()); + if ($this->_status === self::STATUS_CLOSING) { + $this->destroy(); + } + if ($this->_status === self::STATUS_CLOSED) { + $this->onConnect = null; + } + return; + } + /** Ripple:Overwrite-end */ + + // If failed attempt to emit onError callback. + + // Add socket to global event loop waiting connection is successfully established or faild. + Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'checkConnection')); + // For windows. + if (DIRECTORY_SEPARATOR === '\\') { + Worker::$globalEvent->add($this->_socket, EventInterface::EV_EXCEPT, array($this, 'checkConnection')); + } + } + + /** + * @param string $proxy + * + * @return mixed + * @throws \Ripple\Stream\Exception\ConnectionException + */ + protected function connectProxy(string $proxy): mixed + { + $parse = parse_url($proxy); + if (!isset($parse['host'], $parse['port'])) { + throw new ConnectionException('Invalid proxy address', ConnectionException::CONNECTION_ERROR); + } + + $payload = [ + 'host' => $this->_remoteHost, + 'port' => $this->_remotePort + ]; + if (isset($parse['user'], $parse['pass'])) { + $payload['username'] = $parse['user']; + $payload['password'] = $parse['pass']; + } + + switch ($parse['scheme']) { + case 'socks': + case 'socks5': + $tunnelSocket = Socks5::connect("tcp://{$parse['host']}:{$parse['port']}", $payload)->getSocketStream(); + break; + case 'http': + $tunnelSocket = Http::connect("tcp://{$parse['host']}:{$parse['port']}", $payload)->getSocketStream(); + break; + case 'https': + $tunnel = IO::Socket()->connectWithSSL("tcp://{$parse['host']}:{$parse['port']}"); + $tunnelSocket = Http::connect($tunnel, $payload)->getSocketStream(); + break; + default: + throw new ConnectionException('Unsupported proxy protocol', ConnectionException::CONNECTION_ERROR); + } + + if ($this->transport === 'ssl') { + $tunnelSocket->enableSSL(); + $this->_sslHandshakeCompleted = true; + } + return $tunnelSocket->stream; + } + + /** + * @Ripple:Hook + * @return void + */ + public function checkConnection(): void + { + // Remove EV_EXPECT for windows. + if (DIRECTORY_SEPARATOR === '\\') { + Worker::$globalEvent->del($this->_socket, EventInterface::EV_EXCEPT); + } + + // Remove write listener. + Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE); + + if ($this->_status !== self::STATUS_CONNECTING) { + return; + } + + // Check socket state. + if ($address = stream_socket_get_name($this->_socket, true)) { + // Nonblocking. + stream_set_blocking($this->_socket, false); + // Compatible with hhvm + if (function_exists('stream_set_read_buffer')) { + stream_set_read_buffer($this->_socket, 0); + } + // Try to open keepalive for tcp and disable Nagle algorithm. + if (function_exists('socket_import_stream') && $this->transport === 'tcp') { + $raw_socket = socket_import_stream($this->_socket); + socket_set_option($raw_socket, SOL_SOCKET, SO_KEEPALIVE, 1); + socket_set_option($raw_socket, SOL_TCP, TCP_NODELAY, 1); + } + + /*** Ripple:Overwrite */ + // SSL handshake. + if ($this->transport === 'ssl' && !$this->_sslHandshakeCompleted) { + /*** Ripple:Overwrite-end */ + $this->_sslHandshakeCompleted = $this->doSslHandshake($this->_socket); + if ($this->_sslHandshakeCompleted === false) { + return; + } + } else { + // There are some data waiting to send. + if ($this->_sendBuffer) { + Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite')); + } + } + + // Register a listener waiting read event. + Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead')); + + $this->_status = self::STATUS_ESTABLISHED; + $this->_remoteAddress = $address; + + // Try to emit onConnect callback. + if ($this->onConnect) { + try { + call_user_func($this->onConnect, $this); + } catch (Exception $e) { + Worker::stopAll(250, $e); + } + } + // Try to emit protocol::onConnect + if ($this->protocol && method_exists($this->protocol, 'onConnect')) { + try { + call_user_func(array($this->protocol, 'onConnect'), $this); + } catch (Exception $e) { + Worker::stopAll(250, $e); + } + } + } else { + // Connection failed. + $this->emitError(WORKERMAN_CONNECT_FAIL, 'connect ' . $this->_remoteAddress . ' fail after ' . round(microtime(true) - $this->_connectStartTime, 4) . ' seconds'); + if ($this->_status === self::STATUS_CLOSING) { + $this->destroy(); + } + if ($this->_status === self::STATUS_CLOSED) { + $this->onConnect = null; + } + } + } +} diff --git a/src/Driver.php b/src/Driver.php new file mode 100644 index 0000000..724937e --- /dev/null +++ b/src/Driver.php @@ -0,0 +1,259 @@ + $func($fd); + } + + // 兼容 Workerman 数组Callback方式 + if (is_array($func)) { + $closure = static fn () => call_user_func($func, $fd); + } + + // 兼容 Workerman 字符串Callback方式 + if (is_string($func)) { + if (str_contains($func, '::')) { + $explode = explode('::', $func); + $closure = static fn () => call_user_func($explode, $fd); + } + + if (function_exists($func)) { + $closure = static fn () => $func($fd); + } + } + + if (!isset($closure)) { + return false; + } + + $id = onSignal($fd, $closure); + $this->_signal2ids[$fd] = string2int($id); + return string2int($id); + } catch (Throwable) { + return false; + } + + case EventInterface::EV_TIMER: + $this->_timer[] = $timerId = repeat(static function () use ($func, $args) { + try { + call_user_func_array($func, $args); + } catch (Throwable $e) { + Worker::stopAll(250, $e); + } + }, $fd); + return string2int($timerId); + + case EventInterface::EV_TIMER_ONCE: + $this->_timer[] = $timerId = delay(static function () use ($func, $args) { + try { + call_user_func_array($func, $args); + } catch (Throwable $e) { + Worker::stopAll(250, $e); + } + }, $fd); + return string2int($timerId); + + case EventInterface::EV_READ: + $stream = new Stream($fd); + $eventId = $stream->onReadable(static function (Stream $stream) use ($func) { + $func($stream->stream); + }); + + $this->_fd2RIDs[$stream->id][] = string2int($eventId); + return string2int($eventId); + + case EventInterface::EV_WRITE: + $stream = new Stream($fd); + $eventId = $stream->onWriteable(static function (Stream $stream) use ($func) { + $func($stream->stream); + }); + + $this->_fd2WIDs[$stream->id][] = string2int($eventId); + return string2int($eventId); + } + return false; + } + + /** + * @Author cclilshy + * @Date 2024/8/27 22:00 + * + * @param $fd + * @param $flag + * + * @return void + */ + public function del($fd, $flag): void + { + if ($flag === EventInterface::EV_TIMER || $flag === EventInterface::EV_TIMER_ONCE) { + $this->cancel($fd); + unset($this->_timer[array_search(int2string($fd), $this->_timer)]); + return; + } + + if ($flag === EventInterface::EV_READ || $flag === EventInterface::EV_WRITE) { + if (!$fd) { + return; + } + + $streamId = get_resource_id($fd); + if ($flag === EventInterface::EV_READ) { + foreach ($this->_fd2RIDs[$streamId] ?? [] as $eventId) { + $this->cancel($eventId); + } + unset($this->_fd2RIDs[$streamId]); + } else { + foreach ($this->_fd2WIDs[$streamId] ?? [] as $eventId) { + $this->cancel($eventId); + } + unset($this->_fd2WIDs[$streamId]); + } + return; + } + + if ($flag === EventInterface::EV_SIGNAL) { + $signalId = $this->_signal2ids[$fd] ?? null; + if ($signalId) { + $this->cancel($signalId); + unset($this->_signal2ids[$fd]); + } + } + } + + /** + * @Author cclilshy + * @Date 2024/8/27 22:01 + * + * @param int $id + * + * @return void + */ + private function cancel(int $id): void + { + cancel(int2string($id)); + } + + /** + * @return void + */ + public function clearAllTimer(): void + { + foreach ($this->_timer as $timerId) { + $this->cancel($timerId); + } + } + + /** + * @return void + * @throws Throwable + */ + public function loop(): void + { + if (!isset(Driver::$baseProcessId)) { + Driver::$baseProcessId = (Kernel::getInstance()->supportProcessControl() ? getmypid() : posix_getpid()); + } elseif (Driver::$baseProcessId !== (Kernel::getInstance()->supportProcessControl() ? getmypid() : posix_getpid())) { + Driver::$baseProcessId = (Kernel::getInstance()->supportProcessControl() ? getmypid() : posix_getpid()); + cancelAll(); + System::Process()->forkedTick(); + } + wait(); + + /** + * 不会再有任何事发生 + * + * Workerman会将结束的进程视为异常然后重启, 循环往复 + */ + while (1) { + wait(); + sleep(1); + } + } + + /** + * @return int + */ + public function getTimerCount(): int + { + return count($this->_timer); + } + + /** + * @return void + */ + public function destroy(): void + { + cancelAll(); + } +} diff --git a/src/Protocols/Http/IteratorResponse.php b/src/Protocols/Http/IteratorResponse.php new file mode 100644 index 0000000..10b2e5a --- /dev/null +++ b/src/Protocols/Http/IteratorResponse.php @@ -0,0 +1,92 @@ +iterator = $iterator; + if ($autopilot) { + defer(fn () => $this->processIterator()); + } + parent::__construct(200, array_merge([], [])); + } + + /** + * @return $this + */ + public function processIterator(): static + { + foreach ($this->iterator as $frame) { + $this->tcpConnection->send($frame, true); + } + + if ($this->closeWhenFinish) { + $this->close(); + } + + return $this; + } + + /** + * @return void + */ + public function close(): void + { + $this->tcpConnection->close(); + } + + /** + * @param Iterator|Closure $iterator + * @param TcpConnection $tcpConnection + * @param bool $closeWhenFinish + * @param bool $autopilot + * + * @return IteratorResponse + */ + public static function create( + Iterator|Closure $iterator, + TcpConnection $tcpConnection, + bool $closeWhenFinish = false, + bool $autopilot = true, + ): IteratorResponse { + return new static($iterator, $tcpConnection, $closeWhenFinish, $autopilot); + } +} diff --git a/src/Utils.php b/src/Utils.php new file mode 100644 index 0000000..eb914d0 --- /dev/null +++ b/src/Utils.php @@ -0,0 +1,67 @@ + + + + + + + + + + + + + \ No newline at end of file