From 2f4661dfb3832abe17996cdddbfb75b179b9c056 Mon Sep 17 00:00:00 2001 From: Kim Pepper Date: Mon, 21 Oct 2024 14:14:47 +1100 Subject: [PATCH] Switch to PSR interfaces Signed-off-by: Kim Pepper --- composer.json | 34 ++-- samples/index.php | 42 ++++- src/OpenSearch/Client.php | 62 +++++-- .../Exceptions/NoAsyncClientException.php | 24 +++ src/OpenSearch/Endpoints/AbstractEndpoint.php | 7 + src/OpenSearch/RequestFactory.php | 141 +++++++++++++++ src/OpenSearch/Transport.php | 169 +++++------------- 7 files changed, 325 insertions(+), 154 deletions(-) create mode 100644 src/OpenSearch/Common/Exceptions/NoAsyncClientException.php create mode 100644 src/OpenSearch/RequestFactory.php diff --git a/composer.json b/composer.json index 9efae1db..3ca956ae 100644 --- a/composer.json +++ b/composer.json @@ -21,22 +21,33 @@ } ], "require": { - "php": "^7.3 || ^8.0", - "ext-json": ">=1.3.7", + "php": "^8.1", "ext-curl": "*", - "ezimuel/ringphp": "^1.2.2", + "ext-json": ">=1.3.7", + "php-http/async-client-implementation": "^1.0", + "php-http/discovery": "^1.20", + "php-http/guzzle7-adapter": "^1.0", + "psr/http-client": "^1.0", + "psr/http-client-implementation": "^1.0", + "psr/http-factory": "^1.1", + "psr/http-factory-implementation": "^2.4", + "psr/http-message": "^2.0", + "psr/http-message-implementation": "^1.0", "psr/log": "^1|^2|^3", "symfony/yaml": "*" }, "require-dev": { "ext-zip": "*", "aws/aws-sdk-php": "^3.0", - "friendsofphp/php-cs-fixer": "^3.0", - "mockery/mockery": "^1.2", - "phpstan/phpstan": "^1.7.15", - "phpstan/phpstan-mockery": "^1.1.0", - "phpunit/phpunit": "^9.3", - "symfony/finder": "~4.0 || ~5.0" + "friendsofphp/php-cs-fixer": "^v3.64", + "guzzlehttp/psr7": "^2.7", + "mockery/mockery": "^1.6", + "phpstan/phpstan": "^1.12", + "phpstan/phpstan-mockery": "^1.1", + "phpunit/phpunit": "^9.6", + "symfony/finder": "^6.4|^7.0", + "symfony/http-client": "^7.1", + "symfony/http-client-contracts": "^3.5" }, "suggest": { "monolog/monolog": "Allows for client-level logging and tracing", @@ -54,7 +65,10 @@ } }, "config": { - "sort-packages": true + "sort-packages": true, + "allow-plugins": { + "php-http/discovery": true + } }, "scripts": { "php-cs": [ diff --git a/samples/index.php b/samples/index.php index b7bb97f8..e60c7e53 100644 --- a/samples/index.php +++ b/samples/index.php @@ -5,17 +5,45 @@ * SPDX-License-Identifier: Apache-2.0 */ +use OpenSearch\Client; + require_once __DIR__ . '/vendor/autoload.php'; -$client = OpenSearch\ClientBuilder::fromConfig([ - 'Hosts' => [ - 'https://localhost:9200' - ], - 'BasicAuthentication' => ['admin', getenv('OPENSEARCH_PASSWORD')], - 'Retries' => 2, - 'SSLVerification' => false +// Guzzle example + +$guzzleClient = new \GuzzleHttp\Client([ + 'base_uri' => 'https://localhost:9200', + 'auth' => ['admin', getenv('OPENSEARCH_PASSWORD')], + 'verify' => false, + 'retries' => 2, + 'headers' => [ + 'Accept' => 'application/json', + 'Content-Type' => 'application/json', + 'User-Agent' => sprintf('opensearch-php/%s (%s; PHP %s)', Client::VERSION, PHP_OS, PHP_VERSION), + ] +]); +$requestFactory = new \OpenSearch\RequestFactory(); +$transport = new OpenSearch\Transport($guzzleClient, $requestFactory); + +$client = new OpenSearch\Client($transport, $requestFactory, $httpFactory); +$info = $client->info(); + +echo "{$info['version']['distribution']}: {$info['version']['number']}\n"; + +// Symfony example + +$symfonyClient = \Symfony\Component\HttpClient\HttpClient::create([ + 'base_uri' => 'https://localhost:9200', + 'auth_basic' => ['admin', getenv('OPENSEARCH_PASSWORD')], + 'verify_peer' => false, + 'max_retries' => 2, + 'headers' => [ + 'Accept' => 'application/json', + 'Content-Type' => 'application/json', + ], ]); +$client = new OpenSearch\Client($symfonyClient); $info = $client->info(); echo "{$info['version']['distribution']}: {$info['version']['number']}\n"; diff --git a/src/OpenSearch/Client.php b/src/OpenSearch/Client.php index e679a65e..01fba7c7 100644 --- a/src/OpenSearch/Client.php +++ b/src/OpenSearch/Client.php @@ -21,12 +21,11 @@ namespace OpenSearch; +use Http\Promise\Promise; use OpenSearch\Common\Exceptions\BadMethodCallException; -use OpenSearch\Common\Exceptions\NoNodesAvailableException; use OpenSearch\Endpoints\AbstractEndpoint; -use OpenSearch\Namespaces\NamespaceBuilderInterface; -use OpenSearch\Namespaces\BooleanRequestWrapper; use OpenSearch\Namespaces\AsyncSearchNamespace; +use OpenSearch\Namespaces\BooleanRequestWrapper; use OpenSearch\Namespaces\CatNamespace; use OpenSearch\Namespaces\ClusterNamespace; use OpenSearch\Namespaces\DanglingIndicesNamespace; @@ -36,6 +35,7 @@ use OpenSearch\Namespaces\KnnNamespace; use OpenSearch\Namespaces\MlNamespace; use OpenSearch\Namespaces\MonitoringNamespace; +use OpenSearch\Namespaces\NamespaceBuilderInterface; use OpenSearch\Namespaces\NodesNamespace; use OpenSearch\Namespaces\NotificationsNamespace; use OpenSearch\Namespaces\ObservabilityNamespace; @@ -43,14 +43,15 @@ use OpenSearch\Namespaces\QueryNamespace; use OpenSearch\Namespaces\RemoteStoreNamespace; use OpenSearch\Namespaces\RollupsNamespace; -use OpenSearch\Namespaces\SearchPipelineNamespace; use OpenSearch\Namespaces\SearchableSnapshotsNamespace; +use OpenSearch\Namespaces\SearchPipelineNamespace; use OpenSearch\Namespaces\SecurityNamespace; use OpenSearch\Namespaces\SnapshotNamespace; use OpenSearch\Namespaces\SqlNamespace; use OpenSearch\Namespaces\SslNamespace; use OpenSearch\Namespaces\TasksNamespace; use OpenSearch\Namespaces\TransformsNamespace; +use Psr\Http\Message\ResponseInterface; /** * Class Client @@ -66,6 +67,11 @@ class Client */ public $transport; + /** + * Whether the client is async mode. + */ + protected bool $isAsync = false; + /** * @var array */ @@ -1882,35 +1888,59 @@ public function extractArgument(array &$params, string $arg) } } + /** + * Check if the client is running in async mode. + */ + public function isAsync(): bool + { + return $this->isAsync; + } + + /** + * Set the client to run in async mode. + */ + public function setAsync(bool $isAsync): Client + { + $this->isAsync = $isAsync; + return $this; + } + /** * Sends a raw request to the cluster - * @return callable|array - * @throws NoNodesAvailableException + * @return \Http\Promise\Promise|\Psr\Http\Message\ResponseInterface + * @throws \Psr\Http\Client\ClientExceptionInterface + * @throws \Exception */ - public function request(string $method, string $uri, array $attributes = []) + public function request(string $method, string $uri, array $attributes = []): Promise|ResponseInterface { $params = $attributes['params'] ?? []; $body = $attributes['body'] ?? null; - $options = $attributes['options'] ?? []; - $promise = $this->transport->performRequest($method, $uri, $params, $body, $options); + $request = $this->transport->createRequest($method, $uri, $params, $body); - return $this->transport->resultOrFuture($promise, $options); + if ($this->isAsync) { + return $this->transport->sendAsyncRequest($request); + } + return $this->transport->sendRequest($request); } /** - * @return callable|array + * @return \Http\Promise\Promise|\Psr\Http\Message\ResponseInterface + * @throws \Psr\Http\Client\ClientExceptionInterface + * @throws \Exception */ - private function performRequest(AbstractEndpoint $endpoint) + private function performRequest(AbstractEndpoint $endpoint): Promise|ResponseInterface { - $promise = $this->transport->performRequest( + $request = $this->transport->createRequest( $endpoint->getMethod(), $endpoint->getURI(), $endpoint->getParams(), $endpoint->getBody(), - $endpoint->getOptions() ); - - return $this->transport->resultOrFuture($promise, $endpoint->getOptions()); + if ($this->isAsync) { + return $this->transport->sendAsyncRequest($request); + } + return $this->transport->sendRequest($request); } + } diff --git a/src/OpenSearch/Common/Exceptions/NoAsyncClientException.php b/src/OpenSearch/Common/Exceptions/NoAsyncClientException.php new file mode 100644 index 00000000..b95ce053 --- /dev/null +++ b/src/OpenSearch/Common/Exceptions/NoAsyncClientException.php @@ -0,0 +1,24 @@ +options['client']['headers'] ?? []; + } + /** * @param array $params Note: this is passed by-reference! */ diff --git a/src/OpenSearch/RequestFactory.php b/src/OpenSearch/RequestFactory.php new file mode 100644 index 00000000..c5a1561c --- /dev/null +++ b/src/OpenSearch/RequestFactory.php @@ -0,0 +1,141 @@ +getUriFactory()->createUri($uri); + $uri = $uri->withQuery(http_build_query($params)); + $request = $this->getRequestFactory()->createRequest($method, $uri); + if ($body !== null) { + $bodyJson = $this->getSerializer()->serialize($body); + $bodyStream = $this->getStreamFactory()->createStream($bodyJson); + $request = $request->withBody($bodyStream); + } + foreach ($headers as $name => $value) { + $request = $request->withHeader($name, $value); + } + return $request; + } + + /** + * Get the serializer to use for serializing request and response bodies. + */ + public function getSerializer(): ?SerializerInterface + { + if ($this->serializer) { + return $this->serializer; + } + return new SmartSerializer(); + } + + /** + * Set the serializer to use for serializing request and response bodies. + */ + public function setSerializer(?SerializerInterface $serializer): self + { + $this->serializer = $serializer; + return $this; + } + + /** + * Get the request factory to use for creating requests. + * + * If no request factory is set, the discovery mechanism will be used to find + * a request factory. + * + * @throws \Http\Discovery\Exception\NotFoundException + */ + private function getRequestFactory(): RequestFactoryInterface + { + if ($this->requestFactory) { + return $this->requestFactory; + } + + return $this->requestFactory = Psr17FactoryDiscovery::findRequestFactory(); + } + + /** + * Set the request factory to use for creating requests. + */ + public function setRequestFactory(RequestFactoryInterface $requestFactory): self + { + $this->requestFactory = $requestFactory; + return $this; + } + + /** + * Get the stream factory to use for creating streams. + * + * If no stream factory is set, the discovery mechanism will be used to find + * a stream factory. + * + * @throws \Http\Discovery\Exception\NotFoundException + */ + private function getStreamFactory(): StreamFactoryInterface + { + if ($this->streamFactory) { + return $this->streamFactory; + } + return $this->streamFactory = Psr17FactoryDiscovery::findStreamFactory(); + } + + /** + * Set the stream factory to use for creating streams. + */ + public function setStreamFactory(?StreamFactoryInterface $streamFactory): self + { + $this->streamFactory = $streamFactory; + return $this; + } + + /** + * Get the URI factory to use for creating URIs. + * + * If no URI factory is set, the discovery mechanism will be used to find + * a URI factory. + * + * @throws \Http\Discovery\Exception\NotFoundException + */ + private function getUriFactory(): UriFactoryInterface + { + if ($this->uriFactory) { + return $this->uriFactory; + } + return $this->uriFactory = Psr17FactoryDiscovery::findUriFactory(); + } + + /** + * Set the URI factory to use for creating URIs. + */ + public function setUriFactory(?UriFactoryInterface $uriFactory): self + { + $this->uriFactory = $uriFactory; + return $this; + } + +} diff --git a/src/OpenSearch/Transport.php b/src/OpenSearch/Transport.php index df6b5b98..7c1b03cb 100644 --- a/src/OpenSearch/Transport.php +++ b/src/OpenSearch/Transport.php @@ -21,158 +21,85 @@ namespace OpenSearch; -use OpenSearch\Common\Exceptions; -use OpenSearch\ConnectionPool\AbstractConnectionPool; -use OpenSearch\Connections\Connection; -use OpenSearch\Connections\ConnectionInterface; -use GuzzleHttp\Ring\Future\FutureArrayInterface; -use Psr\Log\LoggerInterface; - -class Transport +use Http\Client\HttpAsyncClient; +use Http\Discovery\HttpAsyncClientDiscovery; +use Http\Promise\Promise; +use OpenSearch\Common\Exceptions\NoAsyncClientException; +use Psr\Http\Client\ClientInterface; +use Psr\Http\Message\RequestInterface; +use Psr\Http\Message\ResponseInterface; + +final class Transport implements ClientInterface, HttpAsyncClient { - /** - * @var AbstractConnectionPool - */ - public $connectionPool; - - /** - * @var LoggerInterface - */ - private $log; + private ?HttpAsyncClient $asyncClient = null; /** - * @var int - */ - public $retryAttempts = 0; - - /** - * @var ConnectionInterface + * Transport class is responsible for dispatching requests to the + * underlying cluster connections */ - public $lastConnection; + public function __construct( + protected readonly ClientInterface $client, + protected readonly RequestFactory $requestFactory, + ) { + } /** - * @var int + * Create a new request. */ - public $retries; + public function createRequest(string $method, string $uri, array $params = [], mixed $body = null): RequestInterface + { + return $this->requestFactory->createRequest($method, $uri, $params, $body); + } /** - * Transport class is responsible for dispatching requests to the - * underlying cluster connections - * - * @param int $retries - * @param bool $sniffOnStart - * @param ConnectionPool\AbstractConnectionPool $connectionPool - * @param \Psr\Log\LoggerInterface $log Monolog logger object + * {@inheritdoc} */ - public function __construct(int $retries, AbstractConnectionPool $connectionPool, LoggerInterface $log, bool $sniffOnStart = false) + public function sendAsyncRequest(RequestInterface $request): Promise { - $this->log = $log; - $this->connectionPool = $connectionPool; - $this->retries = $retries; - - if ($sniffOnStart === true) { - $this->log->notice('Sniff on Start.'); - $this->connectionPool->scheduleCheck(); - } + $httpAsyncClient = $this->getAsyncClient(); + return $httpAsyncClient->sendAsyncRequest($request); } /** - * Returns a single connection from the connection pool - * Potentially performs a sniffing step before returning + * {@inheritdoc} */ - public function getConnection(): ConnectionInterface + public function sendRequest(RequestInterface $request): ResponseInterface { - return $this->connectionPool->nextConnection(); + return $this->client->sendRequest($request); } /** - * Perform a request to the Cluster - * - * @param string $method HTTP method to use - * @param string $uri HTTP URI to send request to - * @param array $params Optional query parameters - * @param mixed|null $body Optional query body - * @param array $options - * - * @throws Common\Exceptions\NoNodesAvailableException|\Exception + * Set the async client to use for async requests. */ - public function performRequest(string $method, string $uri, array $params = [], $body = null, array $options = []): FutureArrayInterface + public function setAsyncClient(HttpAsyncClient $asyncClient): self { - try { - $connection = $this->getConnection(); - } catch (Exceptions\NoNodesAvailableException $exception) { - $this->log->critical('No alive nodes found in cluster'); - throw $exception; - } - - $response = []; - $caughtException = null; - $this->lastConnection = $connection; - - $future = $connection->performRequest( - $method, - $uri, - $params, - $body, - $options, - $this - ); - - $future->promise()->then( - //onSuccess - function ($response) { - $this->retryAttempts = 0; - // Note, this could be a 4xx or 5xx error - }, - //onFailure - function ($response) { - $code = $response->getCode(); - // Ignore 400 level errors, as that means the server responded just fine - if ($code < 400 || $code >= 500) { - // Otherwise schedule a check - $this->connectionPool->scheduleCheck(); - } - } - ); - - return $future; + $this->asyncClient = $asyncClient; + return $this; } /** - * @param FutureArrayInterface $result Response of a request (promise) - * @param array $options Options for transport + * Get the async client to use for async requests. + * + * If no async client is set, the discovery mechanism will be used to find + * an async client. * - * @return callable|array + * @throws NoAsyncClientException */ - public function resultOrFuture(FutureArrayInterface $result, array $options = []) + private function getAsyncClient(): HttpAsyncClient { - $response = null; - $async = isset($options['client']['future']) ? $options['client']['future'] : null; - if (is_null($async) || $async === false) { - do { - $result = $result->wait(); - } while ($result instanceof FutureArrayInterface); + if ($this->asyncClient) { + return $this->asyncClient; } - return $result; - } - - public function shouldRetry(array $request): bool - { - if ($this->retryAttempts < $this->retries) { - $this->retryAttempts += 1; - return true; + if ($this->client instanceof HttpAsyncClient) { + return $this->asyncClient = $this->client; } - return false; + try { + return $this->asyncClient = HttpAsyncClientDiscovery::find(); + } catch (\Exception $e) { + throw new NoAsyncClientException('No async HTTP client found. Install a package providing "php-http/async-client-implementation"', 0, $e); + } } - /** - * Returns the last used connection so that it may be inspected. Mainly - * for debugging/testing purposes. - */ - public function getLastConnection(): ConnectionInterface - { - return $this->lastConnection; - } }