Skip to content

Commit

Permalink
Add bitrate to webrtc/mse/mp4 consumer info (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnjooiopa authored Apr 30, 2024
1 parent d18980b commit eba9dfd
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 6 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ go2rtc.yaml
go2rtc.json

0_test.go

.goreload
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
dev:
goreload .
2 changes: 2 additions & 0 deletions pkg/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type Info struct {
Senders []*Sender `json:"senders,omitempty"`
Recv int `json:"recv,omitempty"`
Send int `json:"send,omitempty"`
Bitrate int `json:"bitrate,omitempty"` // bytes per second
}

const (
Expand Down Expand Up @@ -147,6 +148,7 @@ type SuperConsumer struct {
Medias []*Media `json:"medias,omitempty"`
Senders []*Sender `json:"senders,omitempty"`
Send int `json:"send,omitempty"`
Bitrate int `json:"bitrate,omitempty"` // bytes per second
}

func (s *SuperConsumer) GetMedias() []*Media {
Expand Down
21 changes: 21 additions & 0 deletions pkg/custom/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package custom

import (
"time"
)

func StartBitrateWorker(send, bitrate *int, stop chan struct{}) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

prevSendBytes := *send
for {
select {
case <-ticker.C:
*bitrate = *send - prevSendBytes
prevSendBytes = *send
case <-stop:
return
}
}
}
14 changes: 11 additions & 3 deletions pkg/isapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package isapi

import (
"errors"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"io"
"net"
"net/http"
"net/url"

"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)

type Client struct {
Expand All @@ -20,6 +21,9 @@ type Client struct {
medias []*core.Media
sender *core.Sender
send int

bitrate int // bytes per second
stopBitrateWorker chan struct{}
}

func NewClient(rawURL string) (*Client, error) {
Expand All @@ -32,7 +36,9 @@ func NewClient(rawURL string) (*Client, error) {
u.Scheme = "http"
u.Path = ""

return &Client{url: u.String()}, nil
c := &Client{url: u.String()}
c.startBitrateWorker()
return c, nil
}

func (c *Client) Dial() (err error) {
Expand Down Expand Up @@ -137,6 +143,8 @@ func (c *Client) Close() (err error) {

tcp.Close(res)

c.stopBitrateWorker <- struct{}{}

return nil
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/isapi/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package isapi

import (
"encoding/json"

"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
Expand Down Expand Up @@ -52,9 +53,10 @@ func (c *Client) Stop() (err error) {

func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "ISAPI active consumer",
Medias: c.medias,
Send: c.send,
Type: "ISAPI active consumer",
Medias: c.medias,
Send: c.send,
Bitrate: c.bitrate,
}
if c.sender != nil {
info.Senders = []*core.Sender{c.sender}
Expand Down
8 changes: 8 additions & 0 deletions pkg/isapi/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package isapi

import "github.com/AlexxIT/go2rtc/pkg/custom"

func (c *Client) startBitrateWorker() {
c.stopBitrateWorker = make(chan struct{})
go custom.StartBitrateWorker(&c.send, &c.bitrate, c.stopBitrateWorker)
}
4 changes: 4 additions & 0 deletions pkg/mp4/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type Consumer struct {
Rotate int `json:"-"`
ScaleX int `json:"-"`
ScaleY int `json:"-"`

stopBitrateWorker chan struct{}
}

func NewConsumer(medias []*core.Media) *Consumer {
Expand Down Expand Up @@ -52,6 +54,7 @@ func NewConsumer(medias []*core.Media) *Consumer {
wr: core.NewWriteBuffer(nil),
}
cons.Medias = medias
cons.startBitrateWorker()
return cons
}

Expand Down Expand Up @@ -185,5 +188,6 @@ func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {

func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
c.stopBitrateWorker <- struct{}{}
return c.wr.Close()
}
8 changes: 8 additions & 0 deletions pkg/mp4/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package mp4

import "github.com/AlexxIT/go2rtc/pkg/custom"

func (c *Consumer) startBitrateWorker() {
c.stopBitrateWorker = make(chan struct{})
go custom.StartBitrateWorker(&c.SuperConsumer.Send, &c.SuperConsumer.Bitrate, c.stopBitrateWorker)
}
6 changes: 6 additions & 0 deletions pkg/webrtc/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type Conn struct {
offer string
remote string
closed core.Waiter

bitrate int // bytes per second
stopBitrateWorker chan struct{}
}

func NewConn(pc *webrtc.PeerConnection) *Conn {
Expand Down Expand Up @@ -127,11 +130,14 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
}
})

c.startBitrateWorker()

return c
}

func (c *Conn) Close() error {
c.closed.Done(nil)
c.stopBitrateWorker <- struct{}{}
return c.pc.Close()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/webrtc/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (c *Conn) MarshalJSON() ([]byte, error) {
Senders: c.senders,
Recv: c.recv,
Send: c.send,
Bitrate: c.bitrate,
}
return json.Marshal(info)
}
10 changes: 10 additions & 0 deletions pkg/webrtc/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package webrtc

import (
"github.com/AlexxIT/go2rtc/pkg/custom"
)

func (c *Conn) startBitrateWorker() {
c.stopBitrateWorker = make(chan struct{})
go custom.StartBitrateWorker(&c.send, &c.bitrate, c.stopBitrateWorker)
}

0 comments on commit eba9dfd

Please sign in to comment.