Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Socket SSE example #5

Merged
merged 1 commit into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions examples/bulk-socket/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module bulk

go 1.14

require (
github.com/gin-contrib/sse v0.1.0
github.com/gin-gonic/gin v1.6.3
github.com/hodgesds/iouring-go v0.0.0-20200506041732-4ec64dcb5875
github.com/r3labs/sse v0.0.0-20200310095403-ee05428e4d0e
)
53 changes: 53 additions & 0 deletions examples/bulk-socket/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14=
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/hodgesds/iouring-go v0.0.0-20200506041732-4ec64dcb5875 h1:QSVBRVqTQRNTVnNxffYcJy5MD7YD89SANqHQD5RP+QQ=
github.com/hodgesds/iouring-go v0.0.0-20200506041732-4ec64dcb5875/go.mod h1:HbMokKokhmAlGWaSJRUCUhhTudi+e0ZFvZq9b6JYHRo=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/r3labs/sse v0.0.0-20200310095403-ee05428e4d0e h1:w3ZemLxSM2hb3bHk7wjNaAAluaDQ+9WnWZQV1wcA8f4=
github.com/r3labs/sse v0.0.0-20200310095403-ee05428e4d0e/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20191116160921-f9c825593386 h1:ktbWvQrW08Txdxno1PiDpSxPXG6ndGsfnJjRRtkM0LQ=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
145 changes: 145 additions & 0 deletions examples/bulk-socket/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"time"
"unsafe"

gsse "github.com/gin-contrib/sse"
"github.com/gin-gonic/gin"
"github.com/hodgesds/iouring-go"
"github.com/r3labs/sse"
)

var (
fds = make([]int32, 0)
message []byte
ring *iouring.Ring
)

func main() {
msg := struct {
ID int
Author string
Content string
}{
112,
"Sample User",
"Sample message 123",
}

// Create a static message to deliver that can also be compared against when we receive it over the network
message, _ = json.Marshal(msg)

ring, _ = iouring.New(1024, &iouring.Params{})

// Start a new go routine that sends a message every second
go sendMessage()

r := gin.Default()

// Create a SSE endpoint that hijacks all incoming connections and adds their underlying file descriptors to an array
r.GET("/listen", func(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Writer.WriteHeaderNow()

nc, _, _ := c.Writer.Hijack()

sf, _ := nc.(*net.TCPConn).File()

fds = append(fds, int32(sf.Fd()))
})

l, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}

addr := fmt.Sprintf("http://localhost:%d/listen", l.Addr().(*net.TCPAddr).Port)

go http.Serve(l, r)

// Spawn n many clients to establish an SSE
for i := 0; i < 10; i++ {
time.Sleep(100 * time.Millisecond)
go spawnClient(addr)
}

select {}
}

type backOff struct{}

func (b *backOff) NextBackOff() time.Duration { return -1 }
func (b *backOff) Reset() {}

func spawnClient(addr string) {
c := sse.NewClient(addr)
c.ReconnectStrategy = &backOff{}

// Subscribe to the SSE endpoint
if err := c.Subscribe("", func(evt *sse.Event) {
// If we receive an event that isn't equal to our preset message, it has been corrupted
if string(message) != string(evt.Data) {
log.Fatalf("Client received invalid response, expected: %s but got %s", string(message), string(evt.Data))
}
}); err != nil {
log.Fatalln(err.Error())
}
}

func sendMessage() {
for {
time.Sleep(1 * time.Second)

if err := send(fds, message); err != nil {
log.Fatal(err.Error())
}
}
}

func send(fds []int32, data []byte) error {
fmt.Printf("Sending %d bytes to %d sockets\n", len(data), len(fds))

var b bytes.Buffer
// Encode the JSON message into an SSE
if err := gsse.Encode(&b, gsse.Event{
Event: "message",
Data: json.RawMessage(data),
}); err != nil {
return err
}

sdata := b.Bytes()

wire := bytes.Buffer{}

// Wrap the SSE into the chunked http wire format
fmt.Fprintf(&wire, "%x\r\n", len(sdata))
wire.Write(sdata)
wire.WriteString("\r\n")

rawData := wire.Bytes()

addr := (uint64)(uintptr(unsafe.Pointer(&rawData[0])))
length := uint32(len(rawData))

// Queue up n many SQE's for each file descriptor
for _, fd := range fds {
e, commit := ring.SubmitEntry()

e.Opcode = iouring.WriteFixed
e.Fd = fd
e.Addr = addr
e.Len = length

commit()
}

return ring.Enter(uint(len(fds)), uint(len(fds)), iouring.EnterGetEvents, nil)
}