Skip to content

Commit

Permalink
Supported async gRPC client
Browse files Browse the repository at this point in the history
close #40
  • Loading branch information
JoshYuJump committed Aug 21, 2022
1 parent 633815c commit 3b4a682
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 6 deletions.
2 changes: 1 addition & 1 deletion cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.1.8'
__version__ = '2.2.0'
4 changes: 2 additions & 2 deletions cli/biz.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ def create_utils_file(utils_path: Path):
# noinspection PyTypeChecker
def add_service(repo_name: str, service: str, target_dir: str) -> None:
work_dir = Path.cwd()
typer.echo(f'> Current work directory is `{work_dir}`')
typer.echo(f'> Target client path is `{target_dir or "."}`')
typer.echo(f' > Current work directory is `{work_dir}`')
typer.echo(f' > Target client path is `{target_dir or "."}`')

source_repo_dir = prepare_source_repo(work_dir, repo_name)

Expand Down
7 changes: 5 additions & 2 deletions cli/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os

import typer

from cli import biz
from cli import biz, __version__

app = typer.Typer()

Expand All @@ -11,6 +12,7 @@ def add(service: str, target_dir: str = ''):
"""add input_service_name's proto_file to current_dir/service_name"""
proto_repo = 'proto'

typer.echo(f'{os.linesep} 🏝 bali-cli v{__version__}')
typer.echo(f'{os.linesep} 🚀 Start adding {service} ...')
biz.add_service(proto_repo, service, target_dir=target_dir)
typer.echo(f'{os.linesep} ✅ Service added successfully !')
Expand All @@ -19,6 +21,7 @@ def add(service: str, target_dir: str = ''):
@app.command()
def build():
"""compile proto_file under cwd/services/rpc"""
typer.echo(f'{os.linesep} 🚀 Start build')
typer.echo(f'{os.linesep} 🏝 bali-cli v{__version__}')
typer.echo(f'{os.linesep} 🚀 Start build ...')
biz.build_service()
typer.echo(f'{os.linesep} ✅ Service build successfully !')
10 changes: 10 additions & 0 deletions cli/templates/client.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,13 @@ class {{ service_cls }}(ClientMixin):
cache_options = {'timeout': cache_timeout, 'refresh': refresh}
return self.request(service, rpc_name, request_protobuf, schema, response_schema_cls, fail_silently, cache_options)
{% endfor %}

class Async{{ service_cls }}({{ service_cls }}):
{% for method in methods %}
async def {{ method[0] | decamelize }}(self{% if method[1] == "google.protobuf.Empty" %}{% else %}, schema: schemas.{{ method[1] }}{% endif %}, *, fail_silently=False, cache_timeout=0, refresh=False) -> schemas.{{ method[2] }}:
service, rpc_name, request_protobuf = "{{ service }}", "{{ method[0] }}", "{{ method[1] }}"
response_schema_cls = schemas.{{ method[2] }}
schema = {% if method[1] == "google.protobuf.Empty" %}None{% else %}schema{% endif %}
cache_options = {'timeout': cache_timeout, 'refresh': refresh}
return await self.aio_request(service, rpc_name, request_protobuf, schema, response_schema_cls, fail_silently, cache_options)
{% endfor %}
46 changes: 45 additions & 1 deletion cli/templates/utils.jinja2
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# Generated by the bali-cli. DO NOT EDIT!
# Please updated the version when utils changed.

# bali-cli utils `v20220513`
# bali-cli utils `v20220821`


import hashlib
from datetime import datetime, date
from urllib.parse import quote

import grpc
from grpc import insecure_channel
from google.protobuf import json_format
from google.protobuf.empty_pb2 import Empty
Expand Down Expand Up @@ -107,6 +109,48 @@ class ClientMixin:
self.set_cache(cache_key, result, cache_timeout)
return result

async def aio_request(self, service, rpc_name, request_protobuf, schema, response_schema_cls,
fail_silently, cache_options):
"""asyncio rpc request entry

:param service: Service name like `UserService`
:param rpc_name: Service name like `GetUser`
:param request_protobuf:
:param schema: RPC request schema objects, it's None when message type is Empty
:param response_schema_cls:
:param fail_silently: failed silently
:param cache_options: cache options include `timeout` and `refresh` (NOT SUPPORT NOW)
:return:
"""
stub_cls = getattr(self.pb2_grpc, f'{service}Stub')
request_data = ParseDict(schema.dict(),
getattr(self.pb2, request_protobuf)()) if schema else Empty()

channel_target = self._get_channel_target()
with grpc.aio.insecure_channel(channel_target, options=CHANNEL_OPTIONS) as channel:
stub = stub_cls(channel)
try:
self.logger.info("gRPC aio client of %s.%s send: %s. <%s>", service, rpc_name,
request_data, type(request_data))
response = await getattr(stub, rpc_name)(request_data)
except Exception as ex:
self.logger.error("gRPC aio client of %s.%s exception: %s", service, rpc_name, ex)
if not fail_silently:
raise
self.logger.warning(repr(ex))
reply = {}
else:
reply = MessageToDict(
response,
including_default_value_fields=True,
preserving_proto_field_name=True,
)

self.logger.info("gRPC aio client of %s.%s received: %s", service, rpc_name, reply)

result = response_schema_cls(**reply)
return result


class ProtobufParser(json_format._Parser): # noqa
def _ConvertValueMessage(self, value, message):
Expand Down

0 comments on commit 3b4a682

Please sign in to comment.