From 999ce7781ab0213636de79c727d1b92eb06560e5 Mon Sep 17 00:00:00 2001 From: holmes1412 Date: Fri, 19 Mar 2021 19:39:15 +0800 Subject: [PATCH 1/2] add trpc protocol --- CMakeLists_Headers.txt | 1 + src/generator/generator.cc | 4 +- src/generator/printer.h | 46 +- src/message/CMakeLists.txt | 3 +- src/message/rpc_message_trpc.cc | 662 +++++++++++++++++++++++++ src/message/rpc_message_trpc.h | 259 ++++++++++ src/message/rpc_meta_trpc.proto | 109 ++++ src/rpc_define.h | 4 + src/rpc_types.h | 15 + tutorial/CMakeLists.txt | 2 + tutorial/tutorial-11-trpc_pb_server.cc | 67 +++ tutorial/tutorial-12-trpc_pb_client.cc | 56 +++ 12 files changed, 1220 insertions(+), 8 deletions(-) create mode 100644 src/message/rpc_message_trpc.cc create mode 100644 src/message/rpc_message_trpc.h create mode 100644 src/message/rpc_meta_trpc.proto create mode 100644 tutorial/tutorial-11-trpc_pb_server.cc create mode 100644 tutorial/tutorial-12-trpc_pb_client.cc diff --git a/CMakeLists_Headers.txt b/CMakeLists_Headers.txt index 1221a18d..32d765bb 100644 --- a/CMakeLists_Headers.txt +++ b/CMakeLists_Headers.txt @@ -7,6 +7,7 @@ set(INCLUDE_HEADERS src/message/rpc_message_srpc.h src/message/rpc_message_thrift.h src/message/rpc_message_brpc.h + src/message/rpc_message_trpc.h src/thrift/rpc_thrift_buffer.h src/thrift/rpc_thrift_enum.h src/thrift/rpc_thrift_idl.h diff --git a/src/generator/generator.cc b/src/generator/generator.cc index faeeec31..ed883e3e 100644 --- a/src/generator/generator.cc +++ b/src/generator/generator.cc @@ -282,6 +282,7 @@ void Generator::generate_srpc_file(const idl_info& cur_info) rpc_list.push_back("SRPC"); rpc_list.push_back("SRPCHttp"); rpc_list.push_back("BRPC"); + rpc_list.push_back("TRPC"); } for (const auto& desc : cur_info.desc_list) @@ -316,7 +317,8 @@ void Generator::generate_srpc_file(const idl_info& cur_info) for (const auto& type : rpc_list) { - this->printer.print_client_constructor(type, desc.block_name); + this->printer.print_client_constructor(type, desc.block_name, + cur_info.package_name); this->printer.print_client_methods(type, desc.block_name, desc.rpcs); this->printer.print_client_create_task(type, desc.block_name, desc.rpcs); } diff --git a/src/generator/printer.h b/src/generator/printer.h index 3d1de6b2..71f60044 100644 --- a/src/generator/printer.h +++ b/src/generator/printer.h @@ -73,6 +73,26 @@ static inline std::string make_package_prefix(const std::vector& pa return "::" + package_str + "::" + param; } +static inline std::string make_trpc_service_prefix(const std::vector& package, + const std::string& service) +{ + if (package.size() == 0) + return service; + + std::string package_prefix = package[0] + "."; + + for (size_t i = 1; i < package.size(); i++) + package_prefix = package_prefix + package[i] + "."; + + return package_prefix + service; +} + +static inline std::string make_trpc_method_prefix(const std::string& service, + const std::string& method) +{ + return "/package." + service + "/" + method; +} + static inline bool is_simple_type(int8_t data_type) { return data_type == srpc::TDT_BOOL @@ -624,19 +644,25 @@ class Printer fprintf(this->out_file, "}"); } - void print_client_constructor(const std::string& type, const std::string& service) + void print_client_constructor(const std::string& type, const std::string& service, + const std::vector& package) { bool is_srpc_thrift = (this->is_thrift && (type == "SRPC" || type == "SRPCHttp")); const char *method_ip = is_srpc_thrift ? client_constructor_methods_ip_srpc_thrift_format.c_str() : ""; const char *method_params = is_srpc_thrift ? client_constructor_methods_params_srpc_thrift_format.c_str() : ""; + std::string full_service = service; + + if (type == "TRPC") + full_service = make_trpc_service_prefix(package, service); + fprintf(this->out_file, this->client_constructor_methods_format.c_str(), type.c_str(), type.c_str(), - type.c_str(), service.c_str(), + type.c_str(), full_service.c_str(), method_ip, type.c_str(), type.c_str(), type.c_str(), - type.c_str(), service.c_str(), + type.c_str(), full_service.c_str(), method_params, type.c_str()); } @@ -649,17 +675,21 @@ class Printer std::string req = change_include_prefix(rpc.request_name); std::string resp = change_include_prefix(rpc.response_name); + std::string full_method = rpc.method_name; + if (type == "TRPC") + full_method = make_trpc_method_prefix(service, rpc.method_name); + fprintf(this->out_file, this->client_method_format.c_str(), type.c_str(), rpc.method_name.c_str(), req.c_str(), rpc.method_name.c_str(), - rpc.method_name.c_str(), + full_method.c_str(), type.c_str(), rpc.method_name.c_str(), req.c_str(), resp.c_str(), rpc.method_name.c_str(), resp.c_str(), type.c_str(), rpc.method_name.c_str(), req.c_str(), - resp.c_str(), resp.c_str(), rpc.method_name.c_str(), resp.c_str()); + resp.c_str(), resp.c_str(), full_method.c_str(), resp.c_str()); } if (this->is_thrift) @@ -733,9 +763,13 @@ class Printer { for (const auto& rpc : rpcs) { + std::string full_method = rpc.method_name; + if (type == "TRPC") + full_method = make_trpc_method_prefix(service, rpc.method_name); + fprintf(this->out_file, this->client_create_task_format.c_str(), type.c_str(), type.c_str(), rpc.method_name.c_str(), - rpc.method_name.c_str(), rpc.method_name.c_str()); + rpc.method_name.c_str(), full_method.c_str()); /* type.c_str(), service.c_str(), type.c_str(), rpc.method_name.c_str(), diff --git a/src/message/CMakeLists.txt b/src/message/CMakeLists.txt index cae4e595..ea4cd9a1 100644 --- a/src/message/CMakeLists.txt +++ b/src/message/CMakeLists.txt @@ -3,13 +3,14 @@ project(message) include_directories(${CMAKE_CURRENT_BINARY_DIR}) -set(PROTO_LIST rpc_meta.proto rpc_meta_brpc.proto rpc_span.proto) +set(PROTO_LIST rpc_meta.proto rpc_meta_brpc.proto rpc_span.proto rpc_meta_trpc.proto) protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${PROTO_LIST}) set(SRC rpc_message_brpc.cc rpc_message_srpc.cc rpc_message_thrift.cc + rpc_message_trpc.cc ${PROTO_SRCS} ${PROTO_HDRS} ) diff --git a/src/message/rpc_message_trpc.cc b/src/message/rpc_message_trpc.cc new file mode 100644 index 00000000..b911cc16 --- /dev/null +++ b/src/message/rpc_message_trpc.cc @@ -0,0 +1,662 @@ +/* + Copyright (c) 2021 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "rpc_message_trpc.h" +#include "rpc_meta_trpc.pb.h" +#include "rpc_basic.h" +#include "rpc_compress.h" +#include "rpc_zero_copy_stream.h" + +namespace srpc +{ + +TRPCMessage::TRPCMessage() +{ + this->nreceived = 0; + this->meta_buf = NULL; + this->meta_len = 0; + this->message_len = 0; + memset(this->header, 0, sizeof (this->header)); + this->message = new RPCBuffer(); +} + +inline TRPCMessage::~TRPCMessage() +{ + delete this->message; + delete []this->meta_buf; + delete this->meta; +} + +TRPCRequest::TRPCRequest() +{ + this->meta = new RequestProtocol(); +} + +TRPCResponse::TRPCResponse() +{ + this->meta = new ResponseProtocol(); +} + +int TRPCMessage::encode(struct iovec vectors[], int max, size_t size_limit) +{ + size_t sz = TRPC_HEADER_SIZE + this->meta_len + this->message_len; + + if (sz > 0x7FFFFFFF) + { + errno = EOVERFLOW; + return -1; + } + + int ret; + char *p = this->header; + + *(uint16_t *)p = ntohs((uint16_t)TrpcMagic::TRPC_MAGIC_VALUE); + p += 2; + + p += 1; // TrpcDataFrameType + p += 1; // TrpcDataFrameState + + *(uint32_t *)(p) = htonl((uint32_t)sz); + p += 4; + + *(uint16_t *)(p) = htons((uint16_t)this->meta_len); + // 2: stream_id + 4 : reserved + + vectors[0].iov_base = this->header; + vectors[0].iov_len = TRPC_HEADER_SIZE; + + vectors[1].iov_base = this->meta_buf; + vectors[1].iov_len = this->meta_len; + + ret = this->message->encode(vectors + 2, max - 2); + if (ret < 0) + return ret; + + return 2 + ret; +} + +int TRPCMessage::append(const void *buf, size_t *size, size_t size_limit) +{ + uint32_t *p; + uint16_t *sp; + size_t header_left, body_received, buf_len; + if (this->nreceived < TRPC_HEADER_SIZE) + { + //receive header + header_left = TRPC_HEADER_SIZE - this->nreceived; + if (*size >= header_left) + { + //receive the whole header and ready to recieve body + memcpy(this->header + this->nreceived, buf, header_left); + this->nreceived += header_left; + + sp = (uint16_t *)this->header; + uint16_t magic_value = ntohs(*sp); + if (magic_value != TrpcMagic::TRPC_MAGIC_VALUE || + this->header[2] || this->header[3]) + { + errno = EBADMSG; + return -1; + } + + p = (uint32_t *)this->header + 1; + buf_len = ntohl(*p); + sp = (uint16_t *)this->header + 4; + this->meta_len = ntohs(*sp); + + this->message_len = buf_len - TRPC_HEADER_SIZE - this->meta_len; + buf_len -= TRPC_HEADER_SIZE; + + if (buf_len >= size_limit) + { + errno = EMSGSIZE; + return -1; + } + else if (buf_len > 0) + { + if (*size - header_left > buf_len) + *size = header_left + buf_len; + + this->meta_buf = new char[this->meta_len]; + + if (*size - header_left <= this->meta_len) + { + memcpy(this->meta_buf, (const char *)buf + header_left, + *size - header_left); + } else { + memcpy(this->meta_buf, (const char *)buf + header_left, + this->meta_len); + + this->message->append((const char *)buf + header_left + this->meta_len, + *size - header_left - this->meta_len, + BUFFER_MODE_COPY); + } + + this->nreceived += *size - header_left; + if (this->nreceived == TRPC_HEADER_SIZE + buf_len) + return 1; + else + return 0; + } + else if (*size == header_left) + { + return 1; // means body_size == 0 and finish recieved header + } + else + { + // means buf_len < 0 + errno = EBADMSG; + return -1; + } + } + else + { + // only receive header + memcpy(this->header + this->nreceived, buf, *size); + this->nreceived += *size; + return 0; + } + } + else + { + // have already received the header and now is for body only + body_received = this->nreceived - TRPC_HEADER_SIZE; + buf_len = this->meta_len + this->message_len; + if (body_received + *size > buf_len) + *size = buf_len - body_received; + + if (body_received + *size <= this->meta_len) + { + memcpy(this->meta_buf + body_received, buf, *size); + } else if (body_received < this->meta_len) { + memcpy(this->meta_buf + body_received, buf, + this->meta_len - body_received); + + if (body_received + *size > this->meta_len)// useless. always true + this->message->append((const char *)buf + this->meta_len - body_received, + *size - this->meta_len + body_received, + BUFFER_MODE_COPY); + } else { + this->message->append((const char *)buf, *size, BUFFER_MODE_COPY); + } + + this->nreceived += *size; + return this->nreceived == TRPC_HEADER_SIZE + buf_len; + } +} + +bool TRPCRequest::deserialize_meta() +{ + RequestProtocol *meta = static_cast(this->meta); + + if (!meta->ParseFromArray(this->meta_buf, (int)this->meta_len)) + return false; + + if (meta->version() != TrpcProtoVersion::TRPC_PROTO_V1 || + meta->call_type() != TrpcCallType::TRPC_UNARY_CALL || + meta->content_type() != TrpcContentEncodeType::TRPC_PROTO_ENCODE) + { + // this->trpc_error = ST_ERR_UNSUPPORTED_PROTO_TYPE; + return false; + } + +// this->timeout = meta->timeout(); + return true; +} + +bool TRPCResponse::deserialize_meta() +{ + ResponseProtocol *meta = static_cast(this->meta); + + if (!meta->ParseFromArray(this->meta_buf, (int)this->meta_len)) + return false; + + this->srpc_status_code = RPCStatusOK; + + if (meta->ret() != TrpcRetCode::TRPC_INVOKE_SUCCESS) + { + this->srpc_status_code = meta->ret(); + this->srpc_error_msg = meta->error_msg(); + } + + return true; +} + +bool TRPCRequest::serialize_meta() +{ + RequestProtocol *meta = static_cast(this->meta); + meta->set_content_type(TrpcContentEncodeType::TRPC_PROTO_ENCODE); + meta->set_version(TrpcProtoVersion::TRPC_PROTO_V1); + meta->set_call_type(TrpcCallType::TRPC_UNARY_CALL); + + this->meta_len = meta->ByteSize(); + this->meta_buf = new char[this->meta_len]; + return this->meta->SerializeToArray(this->meta_buf, (int)this->meta_len); +} + +bool TRPCResponse::serialize_meta() +{ + ResponseProtocol *meta = static_cast(this->meta); + meta->set_version(TrpcProtoVersion::TRPC_PROTO_V1); + meta->set_call_type(TrpcCallType::TRPC_UNARY_CALL); + + this->meta_len = meta->ByteSize(); + this->meta_buf = new char[this->meta_len]; + if (this->srpc_status_code != RPCStatusOK) + { + meta->set_ret(this->status_code_srpc_trpc(this->srpc_status_code)); //TODO + meta->set_error_msg(this->error_msg_srpc_trpc(this->srpc_status_code)); + // this->srpc_error_msg + } + + return meta->SerializeToArray(this->meta_buf, (int)this->meta_len); +} + +inline int TRPCMessage::compress_type_trpc_srpc(int trpc_content_encoding) const +{ + switch (trpc_content_encoding) + { + case TrpcCompressType::TRPC_DEFAULT_COMPRESS : + return RPCCompressNone; + case TrpcCompressType::TRPC_GZIP_COMPRESS : + return RPCCompressGzip; + case TrpcCompressType::TRPC_SNAPPY_COMPRESS : + return RPCCompressSnappy; + default : + return -1; + } +} + +inline int TRPCMessage::compress_type_srpc_trpc(int srpc_compress_type) const +{ + switch (srpc_compress_type) + { + case RPCCompressNone : + return TrpcCompressType::TRPC_DEFAULT_COMPRESS; + case RPCCompressGzip : + return TrpcCompressType::TRPC_GZIP_COMPRESS; + case RPCCompressSnappy : + return TrpcCompressType::TRPC_SNAPPY_COMPRESS; + case RPCCompressZlib : + return TrpcCompressType::TRPC_ZLIB_COMPRESS; + case RPCCompressLz4 : + return TrpcCompressType::TRPC_LZ4_COMPRESS; + default : + return -1; + } +} + +inline int TRPCMessage::data_type_trpc_srpc(int trpc_content_type) const +{ + switch (trpc_content_type) + { + case TrpcContentEncodeType::TRPC_PROTO_ENCODE : + return RPCDataProtobuf; + case TrpcContentEncodeType::TRPC_JSON_ENCODE : + return RPCDataJson; + default : + return -1; + } +} + +inline int TRPCMessage::data_type_srpc_trpc(int srpc_data_type) const +{ + switch (srpc_data_type) + { + case RPCDataProtobuf : + return TrpcContentEncodeType::TRPC_PROTO_ENCODE; + case RPCDataJson : + return TrpcContentEncodeType::TRPC_JSON_ENCODE; + default : + return -1; + } +} + +int TRPCMessage::status_code_srpc_trpc(int srpc_status_code) const +{ + return 0;//TODO +} + +const char *TRPCMessage::error_msg_srpc_trpc(int srpc_status_code) const +{ + return "";//TODO +} + +int TRPCRequest::get_compress_type() const +{ + RequestProtocol *meta = static_cast(this->meta); + + return this->compress_type_trpc_srpc(meta->content_encoding()); +} + +void TRPCRequest::set_compress_type(int type) +{ + RequestProtocol *meta = static_cast(this->meta); + + meta->set_content_encoding(this->compress_type_srpc_trpc(type)); +} + +int TRPCRequest::get_data_type() const +{ + RequestProtocol *meta = static_cast(this->meta); + + return this->data_type_trpc_srpc(meta->content_type()); +} + +void TRPCRequest::set_data_type(int type) +{ + RequestProtocol *meta = static_cast(this->meta); + + meta->set_content_type(this->data_type_srpc_trpc(type)); +} + +void TRPCRequest::set_request_id(int32_t req_id) +{ + RequestProtocol *meta = static_cast(this->meta); + + meta->set_request_id(req_id); +} + +int32_t TRPCRequest::get_request_id() const +{ + RequestProtocol *meta = static_cast(this->meta); + + return meta->request_id(); +} + +const std::string& TRPCRequest::get_service_name() const +{ + RequestProtocol *meta = static_cast(this->meta); + + return meta->callee(); +} + +const std::string& TRPCRequest::get_method_name() const +{ + RequestProtocol *meta = static_cast(this->meta); + + return meta->func(); +} + +void TRPCRequest::set_service_name(const std::string& service_name) +{ + RequestProtocol *meta = static_cast(this->meta); + + meta->set_callee(service_name); +} + +void TRPCRequest::set_method_name(const std::string& method_name) +{ + RequestProtocol *meta = static_cast(this->meta); + + meta->set_func(method_name); +} + +int TRPCResponse::get_compress_type() const +{ + ResponseProtocol *meta = static_cast(this->meta); + + return this->compress_type_trpc_srpc(meta->content_encoding()); +} + +int TRPCResponse::get_data_type() const +{ + ResponseProtocol *meta = static_cast(this->meta); + + return this->data_type_trpc_srpc(meta->content_type()); +} + +void TRPCResponse::set_compress_type(int type) +{ + ResponseProtocol *meta = static_cast(this->meta); + + meta->set_content_encoding(this->compress_type_srpc_trpc(type)); +} + +void TRPCResponse::set_data_type(int type) +{ + ResponseProtocol *meta = static_cast(this->meta); + + meta->set_content_type(this->data_type_srpc_trpc(type)); +} + +void TRPCResponse::set_request_id(int32_t req_id) +{ + ResponseProtocol *meta = static_cast(this->meta); + + meta->set_request_id(req_id); +} + +int32_t TRPCResponse::get_request_id() const +{ + ResponseProtocol *meta = static_cast(this->meta); + + return meta->request_id(); +} + +int TRPCResponse::get_status_code() const +{ + return this->srpc_status_code; +} + +void TRPCResponse::set_status_code(int code) +{ + this->srpc_status_code = code; + if (code != RPCStatusOK) + this->srpc_error_msg = this->get_errmsg(); +} + +int TRPCResponse::get_error() const +{ + ResponseProtocol *meta = static_cast(this->meta); + + return meta->ret();//TODO +} + +void TRPCResponse::set_error(int error) +{ + ResponseProtocol *meta = static_cast(this->meta); + + meta->set_ret(error); +} + +const char *TRPCResponse::get_errmsg() const +{ + ResponseProtocol *meta = static_cast(this->meta); + + return meta->error_msg().c_str(); +} + +int TRPCMessage::serialize(const ProtobufIDLMessage *pb_msg) +{ + if (!pb_msg) //TODO: make sure trpc is OK to send a NULL user pb_msg + return RPCStatusOK; + + ResponseProtocol *meta = dynamic_cast(this->meta); + bool is_resp = (meta != NULL); + + int msg_len = pb_msg->ByteSize(); + RPCOutputStream stream(this->message, pb_msg->ByteSize()); + int ret = pb_msg->SerializeToZeroCopyStream(&stream) ? 0 : -1; + + if (ret < 0) + return is_resp ? RPCStatusRespSerializeError : RPCStatusReqSerializeError; + + this->message_len = msg_len; + return RPCStatusOK; +} + +int TRPCMessage::deserialize(ProtobufIDLMessage *pb_msg) +{ + ResponseProtocol *meta = dynamic_cast(this->meta); + bool is_resp = (meta != NULL); + + RPCInputStream stream(this->message); + + if (pb_msg->ParseFromZeroCopyStream(&stream) == false) + return is_resp ? RPCStatusRespDeserializeError : RPCStatusReqDeserializeError; + + return RPCStatusOK; +} + +int TRPCMessage::compress() +{ + ResponseProtocol *meta = dynamic_cast(this->meta); + bool is_resp = (meta != NULL); + int type = this->get_compress_type(); + size_t buflen = this->message_len; + int status_code = RPCStatusOK; + + if (buflen == 0) + return status_code; + + if (type == RPCCompressNone) + return status_code; + + static RPCCompressor *compressor = RPCCompressor::get_instance(); + int ret = compressor->lease_compressed_size(type, buflen); + + if (ret == -2) + return is_resp ? RPCStatusReqCompressNotSupported : RPCStatusRespCompressNotSupported; + else if (ret <= 0) + return is_resp ? RPCStatusRespCompressSizeInvalid : RPCStatusReqCompressSizeInvalid; + + //buflen = ret; + RPCBuffer *dst_buf = new RPCBuffer(); + ret = compressor->serialize_to_compressed(this->message, dst_buf, type); + + if (ret == -2) + status_code = is_resp ? RPCStatusRespCompressNotSupported : RPCStatusReqCompressNotSupported; + else if (ret == -1) + status_code = is_resp ? RPCStatusRespCompressError : RPCStatusReqCompressError; + else if (ret <= 0) + status_code = is_resp ? RPCStatusRespCompressSizeInvalid : RPCStatusReqCompressSizeInvalid; + else + buflen = ret; + + if (status_code == RPCStatusOK) + { + delete this->message; + this->message = dst_buf; + this->message_len = buflen; + } else { + delete dst_buf; + } + + return status_code; +} + +int TRPCMessage::decompress() +{ + ResponseProtocol *meta = dynamic_cast(this->meta); + bool is_resp = (meta != NULL); + int type = this->get_compress_type(); + int status_code = RPCStatusOK; + + if (this->message_len == 0 || type == RPCCompressNone) + return status_code; + + RPCBuffer *dst_buf = new RPCBuffer(); + static RPCCompressor *compressor = RPCCompressor::get_instance(); + int ret = compressor->parse_from_compressed(this->message, dst_buf, type); + + if (ret == -2) + status_code = is_resp ? RPCStatusRespDecompressNotSupported : RPCStatusReqDecompressNotSupported; + else if (ret == -1) + status_code = is_resp ? RPCStatusRespDecompressError : RPCStatusReqDecompressError; + else if (ret <= 0) + status_code = is_resp ? RPCStatusRespDecompressSizeInvalid : RPCStatusReqDecompressSizeInvalid; + + if (status_code == RPCStatusOK) + { + delete this->message; + this->message = dst_buf; + this->message_len = ret; + } else { + delete dst_buf; + } + + return status_code; +} + +bool TRPCRequest::set_meta_module_data(const RPCModuleData& data) +{ + RequestProtocol *meta = static_cast(this->meta); + + for (auto & pair : data) + meta->mutable_trans_info()->insert({pair.first, pair.second}); + + return true; +} + +bool TRPCRequest::get_meta_module_data(RPCModuleData& data) const +{ + RequestProtocol *meta = static_cast(this->meta); + + for (auto & pair : meta->trans_info()) + data.insert(pair); + + return true; +} + +bool TRPCRequest::trim_service_prefix() +{ + RequestProtocol *meta = static_cast(this->meta); + std::string *service = meta->mutable_callee(); + + auto pos = service->find_last_of('.'); + if (pos == std::string::npos) + return false; + + meta->set_callee(service->substr(pos + 1 , service->length())); + return true; +} + +bool TRPCRequest::trim_method_prefix() +{ + RequestProtocol *meta = static_cast(this->meta); + std::string *method = meta->mutable_func(); + + auto pos = method->find_last_of('/'); + if (pos == std::string::npos) + return false; + + meta->set_func(method->substr(pos + 1, method->length())); + return true; +} + +bool TRPCResponse::set_meta_module_data(const RPCModuleData& data) +{ + ResponseProtocol *meta = static_cast(this->meta); + + for (auto & pair : data) + meta->mutable_trans_info()->insert({pair.first, pair.second}); + + return true; +} + +bool TRPCResponse::get_meta_module_data(RPCModuleData& data) const +{ + ResponseProtocol *meta = static_cast(this->meta); + + for (auto & pair : meta->trans_info()) + data.insert(pair); + + return true; +} + +} // namespace srpc + diff --git a/src/message/rpc_message_trpc.h b/src/message/rpc_message_trpc.h new file mode 100644 index 00000000..a30ea0d9 --- /dev/null +++ b/src/message/rpc_message_trpc.h @@ -0,0 +1,259 @@ +/* + Copyright (c) 2021 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef __RPC_MESSAGE_TRPC_H__ +#define __RPC_MESSAGE_TRPC_H__ + +#ifdef _WIN32 +#include +#else +#include +#endif + +#include "rpc_message.h" +#include "rpc_basic.h" + +namespace srpc +{ + +static constexpr int TRPC_HEADER_SIZE = 16; + +class TRPCMessage : public RPCMessage +{ +public: + TRPCMessage(); + virtual ~TRPCMessage(); + + int encode(struct iovec vectors[], int max, size_t size_limit); + int append(const void *buf, size_t *size, size_t size_limit); + + bool get_attachment_nocopy(const char **attachment, size_t *len) const { return false; } + void set_attachment_nocopy(const char *attachment, size_t len) { } + +public: + using RPCMessage::serialize; + using RPCMessage::deserialize; + int serialize(const ProtobufIDLMessage *pb_msg) override; + int deserialize(ProtobufIDLMessage *pb_msg) override; + int compress() override; + int decompress() override; + +protected: + char header[TRPC_HEADER_SIZE]; + size_t nreceived; + size_t meta_len; + size_t message_len; + char *meta_buf; + RPCBuffer *message; + ProtobufIDLMessage *meta; + + int compress_type_trpc_srpc(int trpc_content_encoding) const; + int compress_type_srpc_trpc(int srpc_compress_type) const; + int data_type_trpc_srpc(int trpc_content_type) const; + int data_type_srpc_trpc(int srpc_data_type) const; + int status_code_srpc_trpc(int srpc_status_code) const; + const char *error_msg_srpc_trpc(int srpc_status_code) const; +}; + +class TRPCRequest : public TRPCMessage +{ +public: + TRPCRequest(); + + bool serialize_meta(); + bool deserialize_meta(); + + const std::string& get_service_name() const; + const std::string& get_method_name() const; + + void set_service_name(const std::string& service_name); + void set_method_name(const std::string& method_name); + + int get_compress_type() const override; + void set_compress_type(int type) override; + + int get_data_type() const override; + void set_data_type(int type) override; + + void set_request_id(int32_t req_id); + int32_t get_request_id() const; + + bool set_meta_module_data(const RPCModuleData& data) override; + bool get_meta_module_data(RPCModuleData& data) const override; + + bool trim_service_prefix(); + bool trim_method_prefix(); +}; + +class TRPCResponse : public TRPCMessage +{ +public: + TRPCResponse(); + + bool serialize_meta(); + bool deserialize_meta(); + + int get_compress_type() const override; + void set_compress_type(int type) override; + + int get_data_type() const override; + void set_data_type(int type) override; + + int get_status_code() const; + int get_error() const; + const char *get_errmsg() const; + + void set_status_code(int code); + void set_error(int error); + + void set_request_id(int32_t req_id); + int32_t get_request_id() const; + + bool set_meta_module_data(const RPCModuleData& data) override; + bool get_meta_module_data(RPCModuleData& data) const override; + +protected: + int srpc_status_code = RPCStatusOK; + std::string srpc_error_msg; +}; + +class TRPCStdRequest : public protocol::ProtocolMessage, public RPCRequest, public TRPCRequest +{ +public: + int encode(struct iovec vectors[], int max) override + { + return this->TRPCRequest::encode(vectors, max, this->size_limit); + } + + int append(const void *buf, size_t *size) override + { + return this->TRPCRequest::append(buf, size, this->size_limit); + } + +public: + bool serialize_meta() override + { + return this->TRPCRequest::serialize_meta(); + } + + bool deserialize_meta() override + { + return this->TRPCRequest::deserialize_meta(); + } + +public: + const std::string& get_service_name() const override + { + return this->TRPCRequest::get_service_name(); + } + + const std::string& get_method_name() const override + { + return this->TRPCRequest::get_method_name(); + } + + void set_service_name(const std::string& service_name) override + { + return this->TRPCRequest::set_service_name(service_name); + } + + void set_method_name(const std::string& method_name) override + { + return this->TRPCRequest::set_method_name(method_name); + } + + bool set_meta_module_data(const RPCModuleData& data) override + { + return this->TRPCRequest::set_meta_module_data(data); + } + + bool get_meta_module_data(RPCModuleData& data) const override + { + return this->TRPCRequest::get_meta_module_data(data); + } + +public: + TRPCStdRequest() { this->size_limit = RPC_BODY_SIZE_LIMIT; } +}; + +class TRPCStdResponse : public protocol::ProtocolMessage, public RPCResponse, public TRPCResponse +{ +public: + int encode(struct iovec vectors[], int max) override + { + return this->TRPCResponse::encode(vectors, max, this->size_limit); + } + + int append(const void *buf, size_t *size) override + { + return this->TRPCResponse::append(buf, size, this->size_limit); + } + +public: + bool serialize_meta() override + { + return this->TRPCResponse::serialize_meta(); + } + + bool deserialize_meta() override + { + return this->TRPCResponse::deserialize_meta(); + } + +public: + int get_status_code() const override + { + return this->TRPCResponse::get_status_code(); + } + + int get_error() const override + { + return this->TRPCResponse::get_error(); + } + + const char *get_errmsg() const override + { + return this->TRPCResponse::get_errmsg(); + } + + void set_status_code(int code) override + { + return this->TRPCResponse::set_status_code(code); + } + + void set_error(int error) override + { + return this->TRPCResponse::set_error(error); + } + + bool set_meta_module_data(const RPCModuleData& data) override + { + return this->TRPCResponse::set_meta_module_data(data); + } + + bool get_meta_module_data(RPCModuleData& data) const override + { + return this->TRPCResponse::get_meta_module_data(data); + } + +public: + TRPCStdResponse() { this->size_limit = RPC_BODY_SIZE_LIMIT; } +}; + +} // namespace srpc + +#endif + diff --git a/src/message/rpc_meta_trpc.proto b/src/message/rpc_meta_trpc.proto new file mode 100644 index 00000000..2558885a --- /dev/null +++ b/src/message/rpc_meta_trpc.proto @@ -0,0 +1,109 @@ +syntax = "proto3"; + +enum TrpcMagic { + TRPC_DEFAULT_NONE = 0x00; + TRPC_MAGIC_VALUE = 0x930; +} + +enum TrpcDataFrameType { + TRPC_UNARY_FRAME = 0x00; + TRPC_STREAM_FRAME = 0x01; +} + +enum TrpcDataFrameState { + TRPC_NO_STATE = 0x00; + TRPC_STREAM_FINISH = 0x01; +} + +enum TrpcProtoVersion { + TRPC_PROTO_V1 = 0; +} + +enum TrpcCallType { + TRPC_UNARY_CALL = 0; + TRPC_ONEWAY_CALL = 1; + TRPC_CLIENT_STREAM_CALL = 2; + TRPC_SERVER_STREAM_CALL = 3; + TRPC_BIDI_STREAM_CALL = 4; +} +enum TrpcMessageType { + TRPC_DEFAULT = 0x00; + TRPC_DYEING_MESSAGE = 0x01; + TRPC_TRACE_MESSAGE = 0x02; + TRPC_MULTI_ENV_MESSAGE = 0x04; + TRPC_GRID_MESSAGE = 0x08; + TRPC_SETNAME_MESSAGE = 0x10; +} + +enum TrpcContentEncodeType { + TRPC_PROTO_ENCODE = 0; + TRPC_JCE_ENCODE = 1; + TRPC_JSON_ENCODE = 2; + TRPC_FLATBUFFER_ENCODE = 3; + TRPC_NOOP_ENCODE = 4; +} + +enum TrpcCompressType { + TRPC_DEFAULT_COMPRESS = 0; + TRPC_GZIP_COMPRESS = 1; + TRPC_SNAPPY_COMPRESS = 2; + // srpc framework support zlib and lz4 + TRPC_ZLIB_COMPRESS = 3; + TRPC_LZ4_COMPRESS = 4; +} + +enum TrpcRetCode { + TRPC_INVOKE_SUCCESS = 0; + TRPC_SERVER_DECODE_ERR = 1; + TRPC_SERVER_ENCODE_ERR = 2; + + TRPC_SERVER_NOSERVICE_ERR = 11; + TRPC_SERVER_NOFUNC_ERR = 12; + + TRPC_SERVER_TIMEOUT_ERR = 21; + TRPC_SERVER_OVERLOAD_ERR = 22; + + TRPC_SERVER_SYSTEM_ERR = 31; + + TRPC_SERVER_AUTH_ERR = 41; + + TRPC_CLIENT_INVOKE_TIMEOUT_ERR = 101; + + TRPC_CLIENT_CONNECT_ERR = 111; + + TRPC_CLIENT_ENCODE_ERR = 121; + TRPC_CLIENT_DECODE_ERR = 122; + + TRPC_CLIENT_ROUTER_ERR = 131; + + TRPC_CLINET_NETWORK_ERR = 141; + + TRPC_INVOKE_UNKNOWN_ERR = 999; +} + +message RequestProtocol { + uint32 version = 1; + uint32 call_type = 2; + uint32 request_id = 3; + uint32 timeout = 4; + bytes caller = 5; + bytes callee = 6; + bytes func = 7; + uint32 message_type = 8; + map trans_info = 9; + uint32 content_type = 10; + uint32 content_encoding = 11; +} + +message ResponseProtocol { + uint32 version = 1; + uint32 call_type = 2; + uint32 request_id = 3; + int32 ret = 4; + int32 func_ret = 5; + bytes error_msg = 6; + uint32 message_type = 7; + map trans_info = 8; + uint32 content_type = 9; + uint32 content_encoding = 10; +} diff --git a/src/rpc_define.h b/src/rpc_define.h index 389c3640..44680290 100644 --- a/src/rpc_define.h +++ b/src/rpc_define.h @@ -48,6 +48,10 @@ using ThriftHttpServer = RPCServer; using ThriftHttpClient = RPCClient; using ThriftHttpClientTask = ThriftHttpClient::TASK; +using TRPCServer = RPCServer; +using TRPCClient = RPCClient; +using TRPCClientTask = TRPCClient::TASK; + } // namespace srpc #endif diff --git a/src/rpc_types.h b/src/rpc_types.h index 1c0c6dec..7e0bde41 100644 --- a/src/rpc_types.h +++ b/src/rpc_types.h @@ -22,6 +22,7 @@ #include "rpc_message_srpc.h" #include "rpc_message_thrift.h" #include "rpc_message_brpc.h" +#include "rpc_message_trpc.h" namespace srpc { @@ -105,6 +106,20 @@ struct RPCTYPEThriftHttp } }; +struct RPCTYPETRPC +{ + using REQ = TRPCStdRequest; + using RESP = TRPCStdResponse; + static constexpr RPCDataType default_data_type = RPCDataProtobuf; + + static inline void server_reply_init(const REQ *req, RESP *resp) + { + const_cast(req)->trim_service_prefix(); + const_cast(req)->trim_method_prefix(); + resp->set_request_id(req->get_request_id()); + } +}; + } // end namespace srpc #endif diff --git a/tutorial/CMakeLists.txt b/tutorial/CMakeLists.txt index 34aa204f..154f908e 100644 --- a/tutorial/CMakeLists.txt +++ b/tutorial/CMakeLists.txt @@ -82,6 +82,8 @@ set(TUTORIAL_PB_LIST tutorial-06-brpc_pb_client tutorial-09-client_task tutorial-10-server_async + tutorial-11-trpc_pb_server + tutorial-12-trpc_pb_client ) if (APPLE) diff --git a/tutorial/tutorial-11-trpc_pb_server.cc b/tutorial/tutorial-11-trpc_pb_server.cc new file mode 100644 index 00000000..491dcf00 --- /dev/null +++ b/tutorial/tutorial-11-trpc_pb_server.cc @@ -0,0 +1,67 @@ +/* + Copyright (c) 2021 sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include +#include "echo_pb.srpc.h" +#include "workflow/WFFacilities.h" +#include "srpc/rpc_types.h" + +using namespace srpc; + +static WFFacilities::WaitGroup wait_group(1); + +class ExampleServiceImpl : public Example::Service +{ +public: + void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override + { +// ctx->set_compress_type(RPCCompressGzip); + response->set_message("This is SRPC framework TRPC protocol. Hi back."); + + printf("Server Echo()\nget_req:\n%s\nset_resp:\n%s\n", + request->DebugString().c_str(), + response->DebugString().c_str()); + } +}; + +static void sig_handler(int signo) +{ + wait_group.done(); +} + +int main() +{ + GOOGLE_PROTOBUF_VERIFY_VERSION; + signal(SIGINT, sig_handler); + signal(SIGTERM, sig_handler); + + TRPCServer server; + ExampleServiceImpl impl; + + server.add_service(&impl); + + if (server.start(1412) == 0) + { + wait_group.wait(); + server.stop(); + } + else + perror("server start"); + + google::protobuf::ShutdownProtobufLibrary(); + return 0; +} + diff --git a/tutorial/tutorial-12-trpc_pb_client.cc b/tutorial/tutorial-12-trpc_pb_client.cc new file mode 100644 index 00000000..e1b9a13d --- /dev/null +++ b/tutorial/tutorial-12-trpc_pb_client.cc @@ -0,0 +1,56 @@ +/* + Copyright (c) 2021 sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include +#include "echo_pb.srpc.h" +#include "srpc/rpc_types.h" + +using namespace srpc; + +int main() +{ + Example::TRPCClient client("127.0.0.1", 1412); + + //async + EchoRequest req; + req.set_message("Hello, sogou rpc!"); + req.set_name("1412"); + + client.Echo(&req, [](EchoResponse *response, RPCContext *ctx) { + if (ctx->success()) + printf("%s\n", response->DebugString().c_str()); + else + printf("status[%d] error[%d] errmsg:%s\n", + ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg()); + }); + + //sync + EchoRequest sync_req; + EchoResponse sync_resp; + RPCSyncContext sync_ctx; + + sync_req.set_message("Hello, sogou rpc!"); + sync_req.set_name("Sync"); + client.Echo(&sync_req, &sync_resp, &sync_ctx); + if (sync_ctx.success) + printf("%s\n", sync_resp.DebugString().c_str()); + else + printf("status[%d] error[%d] errmsg:%s\n", + sync_ctx.status_code, sync_ctx.error, sync_ctx.errmsg.c_str()); + + return 0; +} + From 87f97230c79a5ea1ea4c59ad558f3eaf8b16d899 Mon Sep 17 00:00:00 2001 From: holmes1412 Date: Fri, 19 Mar 2021 21:20:35 +0800 Subject: [PATCH 2/2] update trpc protocol method name --- src/generator/printer.h | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/generator/printer.h b/src/generator/printer.h index 71f60044..42c2744c 100644 --- a/src/generator/printer.h +++ b/src/generator/printer.h @@ -87,10 +87,16 @@ static inline std::string make_trpc_service_prefix(const std::vector& package, + const std::string& service, const std::string& method) { - return "/package." + service + "/" + method; + std::string method_prefix = "/"; + + for (size_t i = 0; i < package.size(); i++) + method_prefix = method_prefix + package[i] + "."; + + return method_prefix + service + "/" + method; } static inline bool is_simple_type(int8_t data_type) @@ -668,7 +674,8 @@ class Printer void print_client_methods(const std::string& type, const std::string& service, - const std::vector& rpcs) + const std::vector& rpcs, + const std::vector& package) { for (const auto& rpc : rpcs) { @@ -677,7 +684,7 @@ class Printer std::string full_method = rpc.method_name; if (type == "TRPC") - full_method = make_trpc_method_prefix(service, rpc.method_name); + full_method = make_trpc_method_prefix(package, service, rpc.method_name); fprintf(this->out_file, this->client_method_format.c_str(), type.c_str(), rpc.method_name.c_str(), @@ -759,13 +766,14 @@ class Printer } void print_client_create_task(const std::string& type, const std::string& service, - const std::vector& rpcs) + const std::vector& rpcs, + const std::vector& package) { for (const auto& rpc : rpcs) { std::string full_method = rpc.method_name; if (type == "TRPC") - full_method = make_trpc_method_prefix(service, rpc.method_name); + full_method = make_trpc_method_prefix(package, service, rpc.method_name); fprintf(this->out_file, this->client_create_task_format.c_str(), type.c_str(), type.c_str(), rpc.method_name.c_str(),