Skip to content

Commit

Permalink
Merge pull request #1 from Celerway/streaming
Browse files Browse the repository at this point in the history
Streaming
  • Loading branch information
perbu authored Oct 12, 2024
2 parents fd1f9b3 + edef8d8 commit 1a14ffb
Show file tree
Hide file tree
Showing 30 changed files with 1,967 additions and 149 deletions.
10 changes: 10 additions & 0 deletions examples/streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
OpenAPI Code Generation Example - Streaming
-------------------------------------------

This directory contains an example server using our code generator which implements
a simple streaming API with a single endpoint.

This is the structure:
- `sse.yaml`: Contains the OpenAPI 3.0 specification
- `stdhttp/`: Contains the written and generated code for the server using the standard http package

28 changes: 28 additions & 0 deletions examples/streaming/sse.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
openapi: 3.0.0
info:
title: Simple SSE Service
version: 1.0.0
paths:
/:
get:
summary: Server-Sent Events Stream
description: Provides a stream of JSON documents containing a timestamp and sequence number.
operationId: getStream
responses:
200:
description: SSE Stream
content:
text/event-stream:
schema:
type: object
properties:
time:
type: string
format: date-time
description: Timestamp of the event.
sequence:
type: integer
description: Sequence number of the event.
example:
time: "2023-11-20T10:30:00Z"
sequence: 1
36 changes: 36 additions & 0 deletions examples/streaming/stdhttp/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
SHELL:=/bin/bash

YELLOW := \e[0;33m
RESET := \e[0;0m

GOVER := $(shell go env GOVERSION)
GOMINOR := $(shell bash -c "cut -f2 -d. <<< $(GOVER)")

define execute-if-go-122
@{ \
if [[ 22 -le $(GOMINOR) ]]; then \
$1; \
else \
echo -e "$(YELLOW)Skipping task as you're running Go v1.$(GOMINOR).x which is < Go 1.22, which this module requires$(RESET)"; \
fi \
}
endef

lint:
$(call execute-if-go-122,$(GOBIN)/golangci-lint run ./...)

lint-ci:

$(call execute-if-go-122,$(GOBIN)/golangci-lint run ./... --out-format=colored-line-number --timeout=5m)

generate:
$(call execute-if-go-122,go generate ./...)

test:
$(call execute-if-go-122,go test -cover ./...)

tidy:
$(call execute-if-go-122,go mod tidy)

tidy-ci:
$(call execute-if-go-122,tidied -verbose)
29 changes: 29 additions & 0 deletions examples/streaming/stdhttp/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
module github.com/oapi-codegen/oapi-codegen/v2/examples/streaming/stdhttp

go 1.21.13

replace github.com/oapi-codegen/oapi-codegen/v2 => ../../../

require (
github.com/getkin/kin-openapi v0.127.0
github.com/oapi-codegen/oapi-codegen/v2 v2.0.0-00010101000000-000000000000
github.com/oapi-codegen/runtime v1.1.1
)

require (
github.com/dprotaso/go-yit v0.0.0-20220510233725-9ba8df137936 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/invopop/yaml v0.3.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/perimeterx/marshmallow v1.1.5 // indirect
github.com/speakeasy-api/openapi-overlay v0.9.0 // indirect
github.com/vmware-labs/yaml-jsonpath v0.3.2 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
170 changes: 170 additions & 0 deletions examples/streaming/stdhttp/go.sum

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions examples/streaming/stdhttp/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
"context"
"github.com/oapi-codegen/oapi-codegen/v2/examples/streaming/stdhttp/sse"
"log/slog"
"os"
"os/signal"
"syscall"
)

func main() {
server := sse.NewServer()
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
err := server.Run(ctx)
if err != nil {
slog.Error("server run failed", "error:", err)
os.Exit(1)
}
}
8 changes: 8 additions & 0 deletions examples/streaming/stdhttp/sse/cfg.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# yaml-language-server: $schema=../../../../configuration-schema.json
package: sse
output: streaming.gen.go
generate:
std-http-server: true
strict-server: true
models: true
embedded-spec: true
3 changes: 3 additions & 0 deletions examples/streaming/stdhttp/sse/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package sse

//go:generate go run github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen -config cfg.yaml ../../sse.yaml
89 changes: 89 additions & 0 deletions examples/streaming/stdhttp/sse/impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package sse

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"time"
)

type Server struct {
httpServer *http.Server
}

func NewServer() *Server {
s := &Server{}
strictHandler := NewStrictHandler(s, nil)
handler := Handler(strictHandler)
s.httpServer = &http.Server{
Handler: handler,
Addr: ":8080",
}
return s
}
func (s Server) Run(ctx context.Context) error {
go func() {
<-ctx.Done()
slog.Warn("context cancelled, shutting down")
ctxTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := s.httpServer.Shutdown(ctxTimeout)
cancel()
if err != nil {
slog.Error("httpServer.Shutdown() error", "error", err)
}
}()

err := s.httpServer.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("httpServer.ListenAndServe() error: %w", err)
}
return nil
}

type SObject struct {
Time time.Time `json:"time"`
Sequence int `json:"sequence"`
}

// GetStream handles GET / and will stream a JSON object every second.
func (Server) GetStream(ctx context.Context, _ GetStreamRequestObject) (GetStreamResponseObject, error) {
r, w := io.Pipe() // creates a pipe so that we can write to the response body asynchronously
go func() {
defer w.Close()
seq := 1
ticker := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
slog.Info("request context done, closing stream")
return
case <-ticker.C:
content := getContent(seq)
if _, err := w.Write(content); err != nil {
return
}
if _, err := w.Write([]byte("\n")); err != nil {
return
}
seq++
}
}
}()
return GetStream200TexteventStreamResponse{
Body: r,
ContentLength: 0,
}, nil
}

func getContent(seq int) []byte {
streamObject := SObject{
Time: time.Now(),
Sequence: seq,
}
bytes, _ := json.Marshal(streamObject)
return bytes
}
Loading

0 comments on commit 1a14ffb

Please sign in to comment.