From 3b4a682c99458388c5d0cd0e15f019708cbbcf17 Mon Sep 17 00:00:00 2001 From: Josh Date: Mon, 22 Aug 2022 00:06:54 +0800 Subject: [PATCH] Supported async gRPC client close #40 --- cli/__init__.py | 2 +- cli/biz.py | 4 ++-- cli/main.py | 7 ++++-- cli/templates/client.jinja2 | 10 ++++++++ cli/templates/utils.jinja2 | 46 ++++++++++++++++++++++++++++++++++++- 5 files changed, 63 insertions(+), 6 deletions(-) diff --git a/cli/__init__.py b/cli/__init__.py index 7042a39..04188a1 100644 --- a/cli/__init__.py +++ b/cli/__init__.py @@ -1 +1 @@ -__version__ = '2.1.8' +__version__ = '2.2.0' diff --git a/cli/biz.py b/cli/biz.py index 0a083d9..9e6d549 100644 --- a/cli/biz.py +++ b/cli/biz.py @@ -124,8 +124,8 @@ def create_utils_file(utils_path: Path): # noinspection PyTypeChecker def add_service(repo_name: str, service: str, target_dir: str) -> None: work_dir = Path.cwd() - typer.echo(f'> Current work directory is `{work_dir}`') - typer.echo(f'> Target client path is `{target_dir or "."}`') + typer.echo(f' > Current work directory is `{work_dir}`') + typer.echo(f' > Target client path is `{target_dir or "."}`') source_repo_dir = prepare_source_repo(work_dir, repo_name) diff --git a/cli/main.py b/cli/main.py index 0fb801f..ca9d677 100644 --- a/cli/main.py +++ b/cli/main.py @@ -1,7 +1,8 @@ import os + import typer -from cli import biz +from cli import biz, __version__ app = typer.Typer() @@ -11,6 +12,7 @@ def add(service: str, target_dir: str = ''): """add input_service_name's proto_file to current_dir/service_name""" proto_repo = 'proto' + typer.echo(f'{os.linesep} 🏝 bali-cli v{__version__}') typer.echo(f'{os.linesep} 🚀 Start adding {service} ...') biz.add_service(proto_repo, service, target_dir=target_dir) typer.echo(f'{os.linesep} ✅ Service added successfully !') @@ -19,6 +21,7 @@ def add(service: str, target_dir: str = ''): @app.command() def build(): """compile proto_file under cwd/services/rpc""" - typer.echo(f'{os.linesep} 🚀 Start build') + typer.echo(f'{os.linesep} 🏝 bali-cli v{__version__}') + typer.echo(f'{os.linesep} 🚀 Start build ...') biz.build_service() typer.echo(f'{os.linesep} ✅ Service build successfully !') diff --git a/cli/templates/client.jinja2 b/cli/templates/client.jinja2 index b85b840..2fb1144 100644 --- a/cli/templates/client.jinja2 +++ b/cli/templates/client.jinja2 @@ -33,3 +33,13 @@ class {{ service_cls }}(ClientMixin): cache_options = {'timeout': cache_timeout, 'refresh': refresh} return self.request(service, rpc_name, request_protobuf, schema, response_schema_cls, fail_silently, cache_options) {% endfor %} + +class Async{{ service_cls }}({{ service_cls }}): +{% for method in methods %} + async def {{ method[0] | decamelize }}(self{% if method[1] == "google.protobuf.Empty" %}{% else %}, schema: schemas.{{ method[1] }}{% endif %}, *, fail_silently=False, cache_timeout=0, refresh=False) -> schemas.{{ method[2] }}: + service, rpc_name, request_protobuf = "{{ service }}", "{{ method[0] }}", "{{ method[1] }}" + response_schema_cls = schemas.{{ method[2] }} + schema = {% if method[1] == "google.protobuf.Empty" %}None{% else %}schema{% endif %} + cache_options = {'timeout': cache_timeout, 'refresh': refresh} + return await self.aio_request(service, rpc_name, request_protobuf, schema, response_schema_cls, fail_silently, cache_options) +{% endfor %} diff --git a/cli/templates/utils.jinja2 b/cli/templates/utils.jinja2 index b0d5d79..26fa6df 100644 --- a/cli/templates/utils.jinja2 +++ b/cli/templates/utils.jinja2 @@ -1,12 +1,14 @@ # Generated by the bali-cli. DO NOT EDIT! # Please updated the version when utils changed. -# bali-cli utils `v20220513` +# bali-cli utils `v20220821` + import hashlib from datetime import datetime, date from urllib.parse import quote +import grpc from grpc import insecure_channel from google.protobuf import json_format from google.protobuf.empty_pb2 import Empty @@ -107,6 +109,48 @@ class ClientMixin: self.set_cache(cache_key, result, cache_timeout) return result + async def aio_request(self, service, rpc_name, request_protobuf, schema, response_schema_cls, + fail_silently, cache_options): + """asyncio rpc request entry + + :param service: Service name like `UserService` + :param rpc_name: Service name like `GetUser` + :param request_protobuf: + :param schema: RPC request schema objects, it's None when message type is Empty + :param response_schema_cls: + :param fail_silently: failed silently + :param cache_options: cache options include `timeout` and `refresh` (NOT SUPPORT NOW) + :return: + """ + stub_cls = getattr(self.pb2_grpc, f'{service}Stub') + request_data = ParseDict(schema.dict(), + getattr(self.pb2, request_protobuf)()) if schema else Empty() + + channel_target = self._get_channel_target() + with grpc.aio.insecure_channel(channel_target, options=CHANNEL_OPTIONS) as channel: + stub = stub_cls(channel) + try: + self.logger.info("gRPC aio client of %s.%s send: %s. <%s>", service, rpc_name, + request_data, type(request_data)) + response = await getattr(stub, rpc_name)(request_data) + except Exception as ex: + self.logger.error("gRPC aio client of %s.%s exception: %s", service, rpc_name, ex) + if not fail_silently: + raise + self.logger.warning(repr(ex)) + reply = {} + else: + reply = MessageToDict( + response, + including_default_value_fields=True, + preserving_proto_field_name=True, + ) + + self.logger.info("gRPC aio client of %s.%s received: %s", service, rpc_name, reply) + + result = response_schema_cls(**reply) + return result + class ProtobufParser(json_format._Parser): # noqa def _ConvertValueMessage(self, value, message):