Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming #1

Merged
merged 20 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions examples/streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
OpenAPI Code Generation Example - Streaming
-------------------------------------------

This directory contains an example server using our code generator which implements
a simple streaming API with a single endpoint.

This is the structure:
- `sse.yaml`: Contains the OpenAPI 3.0 specification
- `stdhttp/`: Contains the written and generated code for the server using the standard http package

28 changes: 28 additions & 0 deletions examples/streaming/sse.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
openapi: 3.0.0
info:
title: Simple SSE Service
version: 1.0.0
paths:
/:
get:
summary: Server-Sent Events Stream
description: Provides a stream of JSON documents containing a timestamp and sequence number.
operationId: getStream
responses:
200:
description: SSE Stream
content:
text/event-stream:
schema:
type: object
properties:
time:
type: string
format: date-time
description: Timestamp of the event.
sequence:
type: integer
description: Sequence number of the event.
example:
time: "2023-11-20T10:30:00Z"
sequence: 1
36 changes: 36 additions & 0 deletions examples/streaming/stdhttp/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
SHELL:=/bin/bash

YELLOW := \e[0;33m
RESET := \e[0;0m

GOVER := $(shell go env GOVERSION)
GOMINOR := $(shell bash -c "cut -f2 -d. <<< $(GOVER)")

define execute-if-go-122
@{ \
if [[ 22 -le $(GOMINOR) ]]; then \
$1; \
else \
echo -e "$(YELLOW)Skipping task as you're running Go v1.$(GOMINOR).x which is < Go 1.22, which this module requires$(RESET)"; \
fi \
}
endef

lint:
$(call execute-if-go-122,$(GOBIN)/golangci-lint run ./...)

lint-ci:

$(call execute-if-go-122,$(GOBIN)/golangci-lint run ./... --out-format=colored-line-number --timeout=5m)

generate:
$(call execute-if-go-122,go generate ./...)

test:
$(call execute-if-go-122,go test -cover ./...)

tidy:
$(call execute-if-go-122,go mod tidy)

tidy-ci:
$(call execute-if-go-122,tidied -verbose)
29 changes: 29 additions & 0 deletions examples/streaming/stdhttp/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
module github.com/oapi-codegen/oapi-codegen/v2/examples/streaming/stdhttp

go 1.21.13

replace github.com/oapi-codegen/oapi-codegen/v2 => ../../../

require (
github.com/getkin/kin-openapi v0.127.0
github.com/oapi-codegen/oapi-codegen/v2 v2.0.0-00010101000000-000000000000
github.com/oapi-codegen/runtime v1.1.1
)

require (
github.com/dprotaso/go-yit v0.0.0-20220510233725-9ba8df137936 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/invopop/yaml v0.3.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/perimeterx/marshmallow v1.1.5 // indirect
github.com/speakeasy-api/openapi-overlay v0.9.0 // indirect
github.com/vmware-labs/yaml-jsonpath v0.3.2 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
170 changes: 170 additions & 0 deletions examples/streaming/stdhttp/go.sum

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions examples/streaming/stdhttp/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
"context"
"github.com/oapi-codegen/oapi-codegen/v2/examples/streaming/stdhttp/sse"
"log/slog"
"os"
"os/signal"
"syscall"
)

func main() {
server := sse.NewServer()
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
err := server.Run(ctx)
if err != nil {
slog.Error("server run failed", "error:", err)
os.Exit(1)
}
}
8 changes: 8 additions & 0 deletions examples/streaming/stdhttp/sse/cfg.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# yaml-language-server: $schema=../../../../configuration-schema.json
package: sse
output: streaming.gen.go
generate:
std-http-server: true
strict-server: true
models: true
embedded-spec: true
3 changes: 3 additions & 0 deletions examples/streaming/stdhttp/sse/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package sse

//go:generate go run github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen -config cfg.yaml ../../sse.yaml
89 changes: 89 additions & 0 deletions examples/streaming/stdhttp/sse/impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package sse

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"time"
)

type Server struct {
httpServer *http.Server
}

func NewServer() *Server {
s := &Server{}
strictHandler := NewStrictHandler(s, nil)
handler := Handler(strictHandler)
s.httpServer = &http.Server{
Handler: handler,
Addr: ":8080",
}
return s
}
func (s Server) Run(ctx context.Context) error {
go func() {
<-ctx.Done()
slog.Warn("context cancelled, shutting down")
ctxTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := s.httpServer.Shutdown(ctxTimeout)
cancel()
if err != nil {
slog.Error("httpServer.Shutdown() error", "error", err)
}
}()

err := s.httpServer.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("httpServer.ListenAndServe() error: %w", err)
}
return nil
}

type SObject struct {
Time time.Time `json:"time"`
Sequence int `json:"sequence"`
}

// GetStream handles GET / and will stream a JSON object every second.
func (Server) GetStream(ctx context.Context, _ GetStreamRequestObject) (GetStreamResponseObject, error) {
r, w := io.Pipe() // creates a pipe so that we can write to the response body asynchronously
go func() {
defer w.Close()
seq := 1
ticker := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
slog.Info("request context done, closing stream")
return
case <-ticker.C:
content := getContent(seq)
if _, err := w.Write(content); err != nil {
return
}
if _, err := w.Write([]byte("\n")); err != nil {
return
}
seq++
}
}
}()
return GetStream200TexteventStreamResponse{
Body: r,
ContentLength: 0,
}, nil
}

func getContent(seq int) []byte {
streamObject := SObject{
Time: time.Now(),
Sequence: seq,
}
bytes, _ := json.Marshal(streamObject)
return bytes
}
Loading
Loading