From 04b4917f8dee59bdcec23c7a1af90bd27197beb2 Mon Sep 17 00:00:00 2001 From: Josh Liburdi Date: Tue, 22 Nov 2022 09:33:29 -0800 Subject: [PATCH] feat: Add gRPC Support (#34) --- .devcontainer/Dockerfile | 8 +- .devcontainer/devcontainer.json | 8 +- .devcontainer/install-dev-tools.sh | 2 + .github/workflows/code.yml | 8 +- .golangci.yml | 2 +- build/config/sink.libsonnet | 4 + .../aws/lambda/autoscaling/Dockerfile | 8 +- .../aws/lambda/substation/Dockerfile | 8 +- build/container/file/Dockerfile | 9 +- build/scripts/proto/compile.sh | 4 + examples/service/README.md | 3 + examples/service/config.jsonnet | 8 + examples/service/main.go | 96 ++++++++ go.mod | 6 +- go.sum | 18 +- internal/file/file.go | 111 +++++++++ internal/service/server.go | 43 ++++ internal/service/sink.go | 54 +++++ internal/sink/grpc.go | 111 +++++++++ internal/sink/sink.go | 4 + proto/v1beta/sink.pb.go | 215 ++++++++++++++++++ proto/v1beta/sink.proto | 19 ++ proto/v1beta/sink_grpc.pb.go | 139 +++++++++++ 23 files changed, 870 insertions(+), 18 deletions(-) create mode 100644 .devcontainer/install-dev-tools.sh create mode 100644 build/scripts/proto/compile.sh create mode 100644 examples/service/README.md create mode 100644 examples/service/config.jsonnet create mode 100644 examples/service/main.go create mode 100644 internal/file/file.go create mode 100644 internal/service/server.go create mode 100644 internal/service/sink.go create mode 100644 internal/sink/grpc.go create mode 100644 proto/v1beta/sink.pb.go create mode 100644 proto/v1beta/sink.proto create mode 100644 proto/v1beta/sink_grpc.pb.go diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index c313ee67..d756a284 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -10,4 +10,10 @@ RUN sudo apt-get update && \ # install AWS CLI sudo apt-get install -y awscli && \ # install pip and boto3 - sudo apt-get install -y python3-pip && pip3 install boto3 && pip3 install black + sudo apt-get install -y python3-pip && pip3 install boto3 && pip3 install black && \ + # install protobuf compiler tools + # https://grpc.io/docs/languages/go/quickstart/#prerequisites + sudo apt-get install -y protobuf-compiler && \ + go install google.golang.org/protobuf/cmd/protoc-gen-go@latest && \ + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest && \ + export PATH="$PATH:$(go env GOPATH)/bin" diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index f2841225..4c729ff3 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,14 +1,12 @@ { "name": "Go", "build": { - "dockerfile": "Dockerfile", + "dockerfile": "Dockerfile" }, "extensions": [ "golang.go", "hashicorp.terraform", + "zxh404.vscode-proto3" ], - "remoteUser": "vscode", - "mounts": [ - "source=${localEnv:HOME}/.aws/,target=/home/vscode/.aws/,type=bind,consistency=cached" - ] + "postCreateCommand": "bash .devcontainer/install-dev-tools.sh" } diff --git a/.devcontainer/install-dev-tools.sh b/.devcontainer/install-dev-tools.sh new file mode 100644 index 00000000..c9ab69eb --- /dev/null +++ b/.devcontainer/install-dev-tools.sh @@ -0,0 +1,2 @@ +# compile proto +sh build/scripts/proto/compile.sh diff --git a/.github/workflows/code.yml b/.github/workflows/code.yml index ee449cb8..9d2f4aa9 100644 --- a/.github/workflows/code.yml +++ b/.github/workflows/code.yml @@ -30,7 +30,6 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - name: setup uses: actions/setup-python@v2 with: @@ -39,3 +38,10 @@ jobs: - run: | pip3 install black find -name *.py | xargs black --check + + proto: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: bufbuild/buf-setup-action@v1 + - uses: bufbuild/buf-lint-action@v1 diff --git a/.golangci.yml b/.golangci.yml index 070e9644..12a4484f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -136,4 +136,4 @@ issues: - goconst - gosec - noctx - - wrapcheck \ No newline at end of file + - wrapcheck diff --git a/build/config/sink.libsonnet b/build/config/sink.libsonnet index 8001ed33..08d3a2f5 100644 --- a/build/config/sink.libsonnet +++ b/build/config/sink.libsonnet @@ -15,6 +15,10 @@ type: 'firehose', settings: { stream: stream }, }, + grpc(server, timeout='', certificate=''): { + type: 'grpc', + settings: { server: server, timeout: timeout, certificate: certificate }, + }, s3(bucket, prefix='', prefix_key=''): { type: 's3', settings: { bucket: bucket, prefix: prefix, prefix_key: prefix_key }, diff --git a/build/container/aws/lambda/autoscaling/Dockerfile b/build/container/aws/lambda/autoscaling/Dockerfile index 523085e9..e51fa146 100644 --- a/build/container/aws/lambda/autoscaling/Dockerfile +++ b/build/container/aws/lambda/autoscaling/Dockerfile @@ -13,10 +13,14 @@ RUN apt-get update && \ WORKDIR /usr/local/go/src/substation/ COPY ./go.mod . COPY ./go.sum . -RUN go mod download +RUN apt-get install -y protobuf-compiler && \ + go install google.golang.org/protobuf/cmd/protoc-gen-go@latest && \ + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest && \ + go mod download -# copy src, unit test, build +# copy src, build proto, unit test, build app COPY . /usr/local/go/src/substation/ +RUN sh /usr/local/go/src/substation/build/scripts/proto/compile.sh RUN go test -timeout 30s -v ./... WORKDIR /usr/local/go/src/substation/cmd/aws/lambda/autoscaling RUN go build -o /var/task/main diff --git a/build/container/aws/lambda/substation/Dockerfile b/build/container/aws/lambda/substation/Dockerfile index 70c858cb..0157cb60 100644 --- a/build/container/aws/lambda/substation/Dockerfile +++ b/build/container/aws/lambda/substation/Dockerfile @@ -13,10 +13,14 @@ RUN apt-get update && \ WORKDIR /usr/local/go/src/substation/ COPY ./go.mod . COPY ./go.sum . -RUN go mod download +RUN apt-get install -y protobuf-compiler && \ + go install google.golang.org/protobuf/cmd/protoc-gen-go@latest && \ + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest && \ + go mod download -# copy src, unit test, build +# copy src, build proto, unit test, build app COPY . /usr/local/go/src/substation/ +RUN sh /usr/local/go/src/substation/build/scripts/proto/compile.sh RUN go test -timeout 30s -v ./... WORKDIR /usr/local/go/src/substation/cmd/aws/lambda/substation RUN go build -o /var/task/main diff --git a/build/container/file/Dockerfile b/build/container/file/Dockerfile index 81dbed90..1f9602d9 100644 --- a/build/container/file/Dockerfile +++ b/build/container/file/Dockerfile @@ -5,10 +5,15 @@ ENV GO111MODULE=on WORKDIR /usr/local/go/src/substation/ COPY ./go.mod . COPY ./go.sum . -RUN go mod download +RUN apt-get update && \ + apt-get install -y protobuf-compiler && \ + go install google.golang.org/protobuf/cmd/protoc-gen-go@latest && \ + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest && \ + go mod download -# copy src, unit test, build +# copy src, build proto, unit test, build app COPY . /usr/local/go/src/substation/ +RUN sh /usr/local/go/src/substation/build/scripts/proto/compile.sh RUN go test -timeout 30s -v ./... WORKDIR /usr/local/go/src/substation/cmd/file/substation RUN CGO_ENABLED=0 go build -o /bin/substation diff --git a/build/scripts/proto/compile.sh b/build/scripts/proto/compile.sh new file mode 100644 index 00000000..0337f7a2 --- /dev/null +++ b/build/scripts/proto/compile.sh @@ -0,0 +1,4 @@ +#!/bin/sh +protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + proto/*/*.proto diff --git a/examples/service/README.md b/examples/service/README.md new file mode 100644 index 00000000..8d575897 --- /dev/null +++ b/examples/service/README.md @@ -0,0 +1,3 @@ +# service + +This example is a toy implementation of Substation's gRPC service and shows how it can be used for inter-process communication (IPC) to send processed data from the sink back to the source. Refer to the [gRPC quickstart guide](https://grpc.io/docs/languages/go/quickstart/#regenerate-grpc-code) for compiling the protobuf. diff --git a/examples/service/config.jsonnet b/examples/service/config.jsonnet new file mode 100644 index 00000000..98058287 --- /dev/null +++ b/examples/service/config.jsonnet @@ -0,0 +1,8 @@ +local sinklib = import '../../build/config/sink.libsonnet'; + +{ + sink: sinklib.grpc(server='localhost:50051'), + transform: { + type: 'transfer' + }, +} diff --git a/examples/service/main.go b/examples/service/main.go new file mode 100644 index 00000000..55f851a8 --- /dev/null +++ b/examples/service/main.go @@ -0,0 +1,96 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + "sync" + + "github.com/brexhq/substation/cmd" + "github.com/brexhq/substation/config" + "github.com/brexhq/substation/internal/service" + + "golang.org/x/sync/errgroup" +) + +func main() { + sub := cmd.New() + + bytes, err := os.ReadFile("./config.json") + if err != nil { + panic(err) + } + if err := json.Unmarshal(bytes, &sub.Config); err != nil { + panic(err) + } + + // maintains app state + group, ctx := errgroup.WithContext(context.TODO()) + + // create the gRPC server + server := service.Server{} + server.Setup() + + // deferring guarantees that the gRPC server will shutdown + defer server.Stop() + + // create the server API for the Sink service and register it with the server + srv := &service.Sink{} + server.RegisterSink(srv) + + // gRPC server runs in a goroutine to prevent blocking main + group.Go(func() error { + return server.Start("localhost:50051") + }) + + // sink goroutine + var sinkWg sync.WaitGroup + sinkWg.Add(1) + group.Go(func() error { + return sub.Sink(ctx, &sinkWg) + }) + + // transform goroutine + var transformWg sync.WaitGroup + transformWg.Add(1) + group.Go(func() error { + return sub.Transform(ctx, &transformWg) + }) + + // ingest goroutine + group.Go(func() error { + data := [][]byte{ + []byte(`{"foo":"bar"}`), + []byte(`{"baz":"qux"}`), + []byte(`{"quux":"corge"}`), + } + + cap := config.NewCapsule() + + fmt.Println("sending capsules into Substation ...") + for _, d := range data { + fmt.Println(string(d)) + cap.SetData(d) + sub.Send(cap) + } + + sub.WaitTransform(&transformWg) + sub.WaitSink(&sinkWg) + + return nil + }) + + // block until all Substation processing is complete + if err := sub.Block(ctx, group); err != nil { + panic(err) + } + + // block until the gRPC server has received all capsules and the stream is closed + srv.Block() + + fmt.Println("returning capsules sent from gRPC sink ...") + for _, cap := range srv.Capsules { + fmt.Println(string(cap.Data())) + } +} diff --git a/go.mod b/go.mod index 601c2903..e390d65b 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,9 @@ require ( github.com/umisama/go-regexpcache v0.0.0-20150417035358-2444a542492f go.uber.org/goleak v1.2.0 golang.org/x/net v0.0.0-20220418201149-a630d4f3e7a2 - golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + google.golang.org/grpc v1.35.0 + google.golang.org/protobuf v1.26.0-rc.1 ) require ( @@ -34,6 +36,4 @@ require ( golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/genproto v0.0.0-20210114201628-6edceaf6022f // indirect - google.golang.org/grpc v1.35.0 // indirect - google.golang.org/protobuf v1.26.0-rc.1 // indirect ) diff --git a/go.sum b/go.sum index 113b2d65..95afe3ea 100644 --- a/go.sum +++ b/go.sum @@ -83,6 +83,9 @@ github.com/klauspost/compress v1.15.0 h1:xqfchp4whNFxn5A4XFyyYtitiWI8Hy5EW59jEwc github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -95,12 +98,15 @@ github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/tidwall/gjson v1.12.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.14.0 h1:6aeJ0bzojgWLa82gDQHcx3S0Lr/O51I9bJ5nv6JFx5w= github.com/tidwall/gjson v1.14.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= @@ -119,6 +125,7 @@ github.com/valyala/fasthttp v1.34.0/go.mod h1:epZA5N+7pY6ZaEKRmstzOuYJx9HI8DI1oa github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= @@ -133,8 +140,10 @@ golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -143,6 +152,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -153,8 +163,9 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -162,7 +173,9 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -184,6 +197,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -217,11 +231,13 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1 h1:7QnIQpGRHE5RnLKnESfDoxm2dTapTZua5a0kS0A+VXQ= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/file/file.go b/internal/file/file.go new file mode 100644 index 00000000..cbd880be --- /dev/null +++ b/internal/file/file.go @@ -0,0 +1,111 @@ +package file + +import ( + "context" + "fmt" + "io" + "os" + "strings" + + "github.com/brexhq/substation/internal/aws/s3manager" + "github.com/brexhq/substation/internal/errors" + "github.com/brexhq/substation/internal/http" +) + +var ( + httpClient http.HTTP + s3managerAPI s3manager.DownloaderAPI +) + +// errEmptyFile is returned when Get is called but finds an empty file. +const errEmptyFile = errors.Error("empty file found") + +// errNoFile is returned when Get is called but no file is found. +const errNoFile = errors.Error("no file found") + +/* +Get retrieves a file from these locations (in order): + +- Local disk + +- HTTP or HTTPS URL + +- AWS S3 + +If a file is found, then it is saved to disk and the path is returned. The caller is responsible for removing files when they are no longer needed. +*/ +func Get(ctx context.Context, path string) (string, error) { + file, err := os.CreateTemp("", "tempfile") + if err != nil { + return "", fmt.Errorf("file %s: %v", path, err) + } + + if _, err := os.Stat(path); err == nil { + buf, err := os.ReadFile(path) + if err != nil { + return "", fmt.Errorf("file %s: %v", path, errEmptyFile) + } + + if len(buf) == 0 { + return "", fmt.Errorf("file %s: %v", path, errEmptyFile) + } + + if _, err := file.Write(buf); err != nil { + return "", fmt.Errorf("file %s: %v", path, err) + } + + return file.Name(), nil + } + + if strings.HasPrefix(path, "http://") || strings.HasPrefix(path, "https://") { //nolint:nestif // err checking + if !httpClient.IsEnabled() { + httpClient.Setup() + } + + resp, err := httpClient.Get(ctx, path) + if err != nil { + return "", fmt.Errorf("file %s: %v", path, err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("file %s: %v", path, err) + } + + if len(body) == 0 { + return "", fmt.Errorf("file %s: %v", path, errEmptyFile) + } + + if _, err := file.Write(body); err != nil { + return "", fmt.Errorf("file %s: %v", path, err) + } + + return file.Name(), nil + } + + if strings.HasPrefix(path, "s3://") { + if !s3managerAPI.IsEnabled() { + s3managerAPI.Setup() + } + + // "s3://bucket/key" becomes ["bucket" "key"] + paths := strings.SplitN(strings.TrimPrefix(path, "s3://"), "/", 2) + buf, size, err := s3managerAPI.Download(ctx, paths[0], paths[1]) + if err != nil { + return "", fmt.Errorf("file %s: %v", path, err) + } + + if size == 0 { + return "", fmt.Errorf("file %s: %v", path, errEmptyFile) + } + + if _, err := file.Write(buf); err != nil { + return "", fmt.Errorf("file %s: %v", path, err) + } + + return file.Name(), nil + } + + return "", fmt.Errorf("file %s: %v", path, errNoFile) +} diff --git a/internal/service/server.go b/internal/service/server.go new file mode 100644 index 00000000..c682931d --- /dev/null +++ b/internal/service/server.go @@ -0,0 +1,43 @@ +package service + +import ( + "fmt" + "net" + + pb "github.com/brexhq/substation/proto/v1beta" + "google.golang.org/grpc" +) + +// Server wraps a gRPC server and provides methods for managing server state. +type Server struct { + server *grpc.Server +} + +// Setup creates a new gRPC server. +func (s *Server) Setup(opt ...grpc.ServerOption) { + s.server = grpc.NewServer(opt...) +} + +// Start starts the gRPC server. This method blocks the caller until the server is stopped. +func (s *Server) Start(addr string) error { + lis, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("grpc start: %v", err) + } + + if err := s.server.Serve(lis); err != nil { + return fmt.Errorf("grpc serve: %v", err) + } + + return nil +} + +// Stop stops the gRPC server. +func (s *Server) Stop() { + s.server.Stop() +} + +// RegisterSink registers the server API for the Sink service with the gRPC server. +func (s *Server) RegisterSink(srv *Sink) { + pb.RegisterSinkServiceServer(s.server, srv) +} diff --git a/internal/service/sink.go b/internal/service/sink.go new file mode 100644 index 00000000..23a3cfb4 --- /dev/null +++ b/internal/service/sink.go @@ -0,0 +1,54 @@ +package service + +import ( + "fmt" + "io" + "time" + + "github.com/brexhq/substation/config" + pb "github.com/brexhq/substation/proto/v1beta" +) + +// Sink implements the server API for the Sink service. +type Sink struct { + pb.UnimplementedSinkServiceServer + // Capsules can be optionally used to store all capsules sent by the client. + Capsules []config.Capsule + // isClosed describes the state of the gRPC stream: false is open and true is closed. + isClosed bool +} + +// Send implements the Send RPC. +func (s *Sink) Send(stream pb.SinkService_SendServer) error { + var count uint32 + capsule := config.NewCapsule() + + // all data is read from the stream before sending an acknowledgement + for { + recv, err := stream.Recv() + if err == io.EOF { + s.isClosed = true + + return stream.SendAndClose(&pb.SendResponse{}) + } + if err != nil { + return fmt.Errorf("grpc sink recv: %v", err) + } + + capsule.SetData(recv.Data).SetMetadata(recv.Metadata) //nolint:errcheck // no err check required + s.Capsules = append(s.Capsules, capsule) + + count++ + } +} + +// Block blocks the caller until the gRPC stream is closed. This signals that all data was received by the server. +func (s *Sink) Block() { + for { + if !s.isClosed { + time.Sleep(100 * time.Millisecond) + } else { + break + } + } +} diff --git a/internal/sink/grpc.go b/internal/sink/grpc.go new file mode 100644 index 00000000..bc2229f0 --- /dev/null +++ b/internal/sink/grpc.go @@ -0,0 +1,111 @@ +package sink + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/brexhq/substation/config" + "github.com/brexhq/substation/internal/file" + pb "github.com/brexhq/substation/proto/v1beta" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +/* +gRPC sinks data to a server that implements the server API for the Sink service. This sink can also be used for inter-process communication (IPC) by using a localhost server. By default, the sink creates an insecure connection that is unauthenticated and unencrypted. + +The sink has these settings: + + Server: + Address and port number for the server that data is sent to + Timeout (optional): + Amount of time (in seconds) to wait before cancelling the request + defaults to 10 seconds + Certificate (optional): + File containing the server certificate, enables SSL/TLS server authentication + The certificate file can be stored locally or remotely + +When loaded with a factory, the sink uses this JSON configuration: + + { + "type": "grpc", + "settings": { + "server": "localhost:50051" + } + } +*/ +type Grpc struct { + Server string `json:"server"` + Timeout int `json:"timeout"` + Certificate string `json:"certificate"` +} + +func (sink *Grpc) Send(ctx context.Context, ch *config.Channel) error { + // https://grpc.io/docs/guides/auth/#base-case---no-encryption-or-authentication + creds := grpc.WithTransportCredentials(insecure.NewCredentials()) + + // https://grpc.io/docs/guides/auth/#with-server-authentication-ssltls + if sink.Certificate != "" { + cert, err := file.Get(ctx, sink.Certificate) + if err != nil { + return fmt.Errorf("sink grpc: %v", err) + } + defer os.Remove(cert) + + c, err := credentials.NewClientTLSFromFile(cert, "") + if err != nil { + return fmt.Errorf("sink grpc: %v", err) + } + + creds = grpc.WithTransportCredentials(c) + } + + var opts []grpc.DialOption + opts = append(opts, creds) + + conn, err := grpc.DialContext(ctx, sink.Server, opts...) + if err != nil { + return fmt.Errorf("sink grpc: %v", err) + } + defer conn.Close() + + timeout := 10 * time.Second + if sink.Timeout != 0 { + timeout = time.Duration(sink.Timeout) * time.Second + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + client := pb.NewSinkServiceClient(conn) + stream, err := client.Send(ctx, grpc.WaitForReady(true)) + if err != nil { + return fmt.Errorf("sink grpc: %v", err) + } + + for capsule := range ch.C { + select { + case <-ctx.Done(): + return ctx.Err() + default: + p := &pb.SendRequest{ + Data: capsule.Data(), + } + + if err := stream.Send(p); err != nil { + return fmt.Errorf("sink grpc: %v", err) + } + } + } + + // server must acknowledge the receipt of all capsules + // if this doesn't happen, then the app will deadlock + if _, err := stream.CloseAndRecv(); err != nil { + return fmt.Errorf("sink grpc: %v", err) + } + + return nil +} diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 121d5134..452ba62d 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -31,6 +31,10 @@ func Factory(cfg config.Config) (Sink, error) { var s Firehose _ = config.Decode(cfg.Settings, &s) return &s, nil + case "grpc": + var s Grpc + _ = config.Decode(cfg.Settings, &s) + return &s, nil case "kinesis": var s Kinesis _ = config.Decode(cfg.Settings, &s) diff --git a/proto/v1beta/sink.pb.go b/proto/v1beta/sink.pb.go new file mode 100644 index 00000000..b1278c8e --- /dev/null +++ b/proto/v1beta/sink.pb.go @@ -0,0 +1,215 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.12.4 +// source: proto/v1beta/sink.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// SendRequest mirrors the Capsule struct defined in config +type SendRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` + Metadata []byte `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"` +} + +func (x *SendRequest) Reset() { + *x = SendRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_v1beta_sink_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SendRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SendRequest) ProtoMessage() {} + +func (x *SendRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_v1beta_sink_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SendRequest.ProtoReflect.Descriptor instead. +func (*SendRequest) Descriptor() ([]byte, []int) { + return file_proto_v1beta_sink_proto_rawDescGZIP(), []int{0} +} + +func (x *SendRequest) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +func (x *SendRequest) GetMetadata() []byte { + if x != nil { + return x.Metadata + } + return nil +} + +// SendResponse is sent by the server to acknowledge successful receipt of all requests +type SendResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *SendResponse) Reset() { + *x = SendResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_v1beta_sink_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SendResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SendResponse) ProtoMessage() {} + +func (x *SendResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_v1beta_sink_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SendResponse.ProtoReflect.Descriptor instead. +func (*SendResponse) Descriptor() ([]byte, []int) { + return file_proto_v1beta_sink_proto_rawDescGZIP(), []int{1} +} + +var File_proto_v1beta_sink_proto protoreflect.FileDescriptor + +var file_proto_v1beta_sink_proto_rawDesc = []byte{ + 0x0a, 0x17, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2f, 0x73, + 0x69, 0x6e, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x3d, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1a, 0x0a, 0x08, 0x6d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x6d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x22, 0x0e, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x50, 0x0a, 0x0b, 0x53, 0x69, 0x6e, 0x6b, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x41, 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x19, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x53, 0x65, 0x6e, + 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x42, 0x24, 0x5a, 0x22, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x72, 0x65, 0x78, 0x68, 0x71, 0x2f, 0x73, 0x75, + 0x62, 0x73, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_v1beta_sink_proto_rawDescOnce sync.Once + file_proto_v1beta_sink_proto_rawDescData = file_proto_v1beta_sink_proto_rawDesc +) + +func file_proto_v1beta_sink_proto_rawDescGZIP() []byte { + file_proto_v1beta_sink_proto_rawDescOnce.Do(func() { + file_proto_v1beta_sink_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_v1beta_sink_proto_rawDescData) + }) + return file_proto_v1beta_sink_proto_rawDescData +} + +var file_proto_v1beta_sink_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_proto_v1beta_sink_proto_goTypes = []interface{}{ + (*SendRequest)(nil), // 0: proto.v1beta.SendRequest + (*SendResponse)(nil), // 1: proto.v1beta.SendResponse +} +var file_proto_v1beta_sink_proto_depIdxs = []int32{ + 0, // 0: proto.v1beta.SinkService.Send:input_type -> proto.v1beta.SendRequest + 1, // 1: proto.v1beta.SinkService.Send:output_type -> proto.v1beta.SendResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_proto_v1beta_sink_proto_init() } +func file_proto_v1beta_sink_proto_init() { + if File_proto_v1beta_sink_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_v1beta_sink_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SendRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_v1beta_sink_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SendResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_v1beta_sink_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_v1beta_sink_proto_goTypes, + DependencyIndexes: file_proto_v1beta_sink_proto_depIdxs, + MessageInfos: file_proto_v1beta_sink_proto_msgTypes, + }.Build() + File_proto_v1beta_sink_proto = out.File + file_proto_v1beta_sink_proto_rawDesc = nil + file_proto_v1beta_sink_proto_goTypes = nil + file_proto_v1beta_sink_proto_depIdxs = nil +} diff --git a/proto/v1beta/sink.proto b/proto/v1beta/sink.proto new file mode 100644 index 00000000..32d08906 --- /dev/null +++ b/proto/v1beta/sink.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package proto.v1beta; + +option go_package = "github.com/brexhq/substation/proto"; + +// Sink mirrors the Sink interface defined in internal/sink +service SinkService { + rpc Send(stream SendRequest) returns (SendResponse) {} +} + +// SendRequest mirrors the Capsule struct defined in config +message SendRequest { + bytes data = 1; + bytes metadata = 2; +} + +// SendResponse is sent by the server to acknowledge successful receipt of all requests +message SendResponse {} diff --git a/proto/v1beta/sink_grpc.pb.go b/proto/v1beta/sink_grpc.pb.go new file mode 100644 index 00000000..3a018f0a --- /dev/null +++ b/proto/v1beta/sink_grpc.pb.go @@ -0,0 +1,139 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.12.4 +// source: proto/v1beta/sink.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// SinkServiceClient is the client API for SinkService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type SinkServiceClient interface { + Send(ctx context.Context, opts ...grpc.CallOption) (SinkService_SendClient, error) +} + +type sinkServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewSinkServiceClient(cc grpc.ClientConnInterface) SinkServiceClient { + return &sinkServiceClient{cc} +} + +func (c *sinkServiceClient) Send(ctx context.Context, opts ...grpc.CallOption) (SinkService_SendClient, error) { + stream, err := c.cc.NewStream(ctx, &SinkService_ServiceDesc.Streams[0], "/proto.v1beta.SinkService/Send", opts...) + if err != nil { + return nil, err + } + x := &sinkServiceSendClient{stream} + return x, nil +} + +type SinkService_SendClient interface { + Send(*SendRequest) error + CloseAndRecv() (*SendResponse, error) + grpc.ClientStream +} + +type sinkServiceSendClient struct { + grpc.ClientStream +} + +func (x *sinkServiceSendClient) Send(m *SendRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *sinkServiceSendClient) CloseAndRecv() (*SendResponse, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(SendResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// SinkServiceServer is the server API for SinkService service. +// All implementations must embed UnimplementedSinkServiceServer +// for forward compatibility +type SinkServiceServer interface { + Send(SinkService_SendServer) error + mustEmbedUnimplementedSinkServiceServer() +} + +// UnimplementedSinkServiceServer must be embedded to have forward compatible implementations. +type UnimplementedSinkServiceServer struct { +} + +func (UnimplementedSinkServiceServer) Send(SinkService_SendServer) error { + return status.Errorf(codes.Unimplemented, "method Send not implemented") +} +func (UnimplementedSinkServiceServer) mustEmbedUnimplementedSinkServiceServer() {} + +// UnsafeSinkServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SinkServiceServer will +// result in compilation errors. +type UnsafeSinkServiceServer interface { + mustEmbedUnimplementedSinkServiceServer() +} + +func RegisterSinkServiceServer(s grpc.ServiceRegistrar, srv SinkServiceServer) { + s.RegisterService(&SinkService_ServiceDesc, srv) +} + +func _SinkService_Send_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SinkServiceServer).Send(&sinkServiceSendServer{stream}) +} + +type SinkService_SendServer interface { + SendAndClose(*SendResponse) error + Recv() (*SendRequest, error) + grpc.ServerStream +} + +type sinkServiceSendServer struct { + grpc.ServerStream +} + +func (x *sinkServiceSendServer) SendAndClose(m *SendResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *sinkServiceSendServer) Recv() (*SendRequest, error) { + m := new(SendRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// SinkService_ServiceDesc is the grpc.ServiceDesc for SinkService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var SinkService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "proto.v1beta.SinkService", + HandlerType: (*SinkServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Send", + Handler: _SinkService_Send_Handler, + ClientStreams: true, + }, + }, + Metadata: "proto/v1beta/sink.proto", +}