Skip to content

Commit

Permalink
Merge branch 'master-grpc-go'
Browse files Browse the repository at this point in the history
  • Loading branch information
yangjuncode committed Feb 25, 2021
2 parents 50a5c8a + dabedfb commit ea74b8f
Show file tree
Hide file tree
Showing 149 changed files with 12,625 additions and 8,618 deletions.
11 changes: 8 additions & 3 deletions .github/ISSUE_TEMPLATE/bug.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
---
name: Bug Report
about: Create a report to help us improve
about: Report a non-security bug. For suspected security vulnerabilities or crashes, please use "Report a Security Vulnerability", below.
labels: 'Type: Bug'

---

Please see the FAQ in our main README.md, then answer the questions below before
submitting your issue.
NOTE: if you are reporting is a potential security vulnerability or a crash,
please follow our CVE process at
https://github.com/grpc/proposal/blob/master/P4-grpc-cve-process.md instead of
filing an issue here.

Please see the FAQ in our main README.md, then answer the questions below
before submitting your issue.

### What version of gRPC are you using?

Expand Down
31 changes: 31 additions & 0 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: "CodeQL"

on:
push:
branches: [ master ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master ]
schedule:
- cron: '24 20 * * 3'

jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest

strategy:
fail-fast: false

steps:
- name: Checkout repository
uses: actions/checkout@v2

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
with:
languages: go

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
1 change: 1 addition & 0 deletions balancer/grpclb/grpc_lb_v1/load_balancer_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions balancer/rls/internal/proto/grpc_lookup_v1/rls_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 16 additions & 10 deletions benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,19 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/benchmark"
bm "google.golang.org/grpc/benchmark"
"google.golang.org/grpc/benchmark/flags"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/benchmark/latency"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/test/bufconn"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)

var (
Expand Down Expand Up @@ -257,7 +261,7 @@ func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Fea
// service. The client is configured using the different options in the passed
// 'bf'. Also returns a cleanup function to close the client and release
// resources.
func makeClient(bf stats.Features) (testpb.BenchmarkServiceClient, func()) {
func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) {
nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
opts := []grpc.DialOption{}
sopts := []grpc.ServerOption{}
Expand Down Expand Up @@ -325,7 +329,7 @@ func makeClient(bf stats.Features) (testpb.BenchmarkServiceClient, func()) {
lis = nw.Listener(lis)
stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
conn := bm.NewClientConn("" /* target not used */, opts...)
return testpb.NewBenchmarkServiceClient(conn), func() {
return testgrpc.NewBenchmarkServiceClient(conn), func() {
conn.Close()
stopper()
}
Expand All @@ -349,7 +353,7 @@ func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)

streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
streams := make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
stream, err := tc.StreamingCall(context.Background())
if err != nil {
Expand Down Expand Up @@ -400,14 +404,16 @@ func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, r
}, cleanup
}

func setupUnconstrainedStream(bf stats.Features) ([]testpb.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
func setupUnconstrainedStream(bf stats.Features) ([]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)

streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
streams := make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1")
ctx := metadata.NewOutgoingContext(context.Background(), md)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
stream, err := tc.UnconstrainedStreamingCall(context.Background())
stream, err := tc.StreamingCall(ctx)
if err != nil {
logger.Fatalf("%v.UnconstrainedStreamingCall(_) = _, %v", tc, err)
logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
streams[i] = stream
}
Expand All @@ -424,13 +430,13 @@ func setupUnconstrainedStream(bf stats.Features) ([]testpb.BenchmarkService_Stre

// Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
// request and response sizes.
func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int) {
if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil {
logger.Fatalf("DoUnaryCall failed: %v", err)
}
}

func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
logger.Fatalf("DoStreamingRoundTrip failed: %v", err)
}
Expand Down
52 changes: 22 additions & 30 deletions benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ import (
"net"

"google.golang.org/grpc"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)

var logger = grpclog.Component("benchmark")
Expand All @@ -45,8 +48,6 @@ func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
body := make([]byte, size)
switch t {
case testpb.PayloadType_COMPRESSABLE:
case testpb.PayloadType_UNCOMPRESSABLE:
logger.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
default:
logger.Fatalf("Unsupported payload type: %d", t)
}
Expand All @@ -62,7 +63,7 @@ func NewPayload(t testpb.PayloadType, size int) *testpb.Payload {
}

type testServer struct {
testpb.UnimplementedBenchmarkServiceServer
testgrpc.UnimplementedBenchmarkServiceServer
}

func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
Expand All @@ -71,7 +72,15 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*
}, nil
}

func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
// UnconstrainedStreamingHeader indicates to the StreamingCall handler that its
// behavior should be unconstrained (constant send/receive in parallel) instead
// of ping-pong.
const UnconstrainedStreamingHeader = "unconstrained-streaming"

func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingHeader]) != 0 {
return s.UnconstrainedStreamingCall(stream)
}
response := &testpb.SimpleResponse{
Payload: new(testpb.Payload),
}
Expand All @@ -93,7 +102,7 @@ func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallS
}
}

func (s *testServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error {
func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
in := new(testpb.SimpleRequest)
// Receive a message to learn response type and size.
err := stream.RecvMsg(in)
Expand Down Expand Up @@ -144,7 +153,7 @@ func (s *testServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_U
// byteBufServer is a gRPC server that sends and receives byte buffer.
// The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
type byteBufServer struct {
testpb.UnimplementedBenchmarkServiceServer
testgrpc.UnimplementedBenchmarkServiceServer
respSize int32
}

Expand All @@ -154,24 +163,7 @@ func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest)
return &testpb.SimpleResponse{}, nil
}

func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
for {
var in []byte
err := stream.(grpc.ServerStream).RecvMsg(&in)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
out := make([]byte, s.respSize)
if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
return err
}
}
}

func (s *byteBufServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error {
func (s *byteBufServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
for {
var in []byte
err := stream.(grpc.ServerStream).RecvMsg(&in)
Expand Down Expand Up @@ -211,13 +203,13 @@ func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
s := grpc.NewServer(opts...)
switch info.Type {
case "protobuf":
testpb.RegisterBenchmarkServiceServer(s, &testServer{})
testgrpc.RegisterBenchmarkServiceServer(s, &testServer{})
case "bytebuf":
respSize, ok := info.Metadata.(int32)
if !ok {
logger.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
}
testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
testgrpc.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
default:
logger.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
}
Expand All @@ -228,7 +220,7 @@ func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
}

// DoUnaryCall performs an unary RPC with given stub and request and response sizes.
func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error {
func DoUnaryCall(tc testgrpc.BenchmarkServiceClient, reqSize, respSize int) error {
pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
req := &testpb.SimpleRequest{
ResponseType: pl.Type,
Expand All @@ -242,7 +234,7 @@ func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error
}

// DoStreamingRoundTrip performs a round trip for a single streaming rpc.
func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
func DoStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
req := &testpb.SimpleRequest{
ResponseType: pl.Type,
Expand All @@ -263,7 +255,7 @@ func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, re
}

// DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
func DoByteBufStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
out := make([]byte, reqSize)
if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
Expand Down
6 changes: 4 additions & 2 deletions benchmark/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/benchmark"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/syscall"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)

var (
Expand Down Expand Up @@ -164,7 +166,7 @@ func runWithConn(cc *grpc.ClientConn, req *testpb.SimpleRequest, warmDeadline, e
}

func makeCaller(cc *grpc.ClientConn, req *testpb.SimpleRequest) func() {
client := testpb.NewBenchmarkServiceClient(cc)
client := testgrpc.NewBenchmarkServiceClient(cc)
if *rpcType == "unary" {
return func() {
if _, err := client.UnaryCall(context.Background(), req); err != nil {
Expand Down
Loading

0 comments on commit ea74b8f

Please sign in to comment.