-
Notifications
You must be signed in to change notification settings - Fork 317
/
storageStream.go
140 lines (132 loc) · 3.06 KB
/
storageStream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package main
import "github.com/liip/sheriff"
//MarshalledStreamsList lists all streams and includes only fields which are safe to serialize.
func (obj *StorageST) MarshalledStreamsList() (interface{}, error) {
obj.mutex.RLock()
defer obj.mutex.RUnlock()
val, err := sheriff.Marshal(&sheriff.Options{
Groups: []string{"api"},
}, obj.Streams)
if err != nil {
return nil, err
}
return val, nil
}
//StreamAdd add stream
func (obj *StorageST) StreamAdd(uuid string, val StreamST) error {
obj.mutex.Lock()
defer obj.mutex.Unlock()
//TODO create empty map bug save https://github.com/liip/sheriff empty not nil map[] != {} json
//data, err := sheriff.Marshal(&sheriff.Options{
// Groups: []string{"config"},
// ApiVersion: v2,
// }, obj)
//Not Work map[] != {}
if obj.Streams == nil {
obj.Streams = make(map[string]StreamST)
}
if _, ok := obj.Streams[uuid]; ok {
return ErrorStreamAlreadyExists
}
for i, i2 := range val.Channels {
i2 = obj.StreamChannelMake(i2)
if !i2.OnDemand {
i2.runLock = true
val.Channels[i] = i2
go StreamServerRunStreamDo(uuid, i)
} else {
val.Channels[i] = i2
}
}
obj.Streams[uuid] = val
err := obj.SaveConfig()
if err != nil {
return err
}
return nil
}
//StreamEdit edit stream
func (obj *StorageST) StreamEdit(uuid string, val StreamST) error {
obj.mutex.Lock()
defer obj.mutex.Unlock()
if tmp, ok := obj.Streams[uuid]; ok {
for i, i2 := range tmp.Channels {
if i2.runLock {
tmp.Channels[i] = i2
obj.Streams[uuid] = tmp
i2.signals <- SignalStreamStop
}
}
for i3, i4 := range val.Channels {
i4 = obj.StreamChannelMake(i4)
if !i4.OnDemand {
i4.runLock = true
val.Channels[i3] = i4
go StreamServerRunStreamDo(uuid, i3)
} else {
val.Channels[i3] = i4
}
}
obj.Streams[uuid] = val
err := obj.SaveConfig()
if err != nil {
return err
}
return nil
}
return ErrorStreamNotFound
}
//StreamReload reload stream
func (obj *StorageST) StopAll() {
obj.mutex.RLock()
defer obj.mutex.RUnlock()
for _, st := range obj.Streams {
for _, i2 := range st.Channels {
if i2.runLock {
i2.signals <- SignalStreamStop
}
}
}
}
//StreamReload reload stream
func (obj *StorageST) StreamReload(uuid string) error {
obj.mutex.RLock()
defer obj.mutex.RUnlock()
if tmp, ok := obj.Streams[uuid]; ok {
for _, i2 := range tmp.Channels {
if i2.runLock {
i2.signals <- SignalStreamRestart
}
}
return nil
}
return ErrorStreamNotFound
}
//StreamDelete stream
func (obj *StorageST) StreamDelete(uuid string) error {
obj.mutex.Lock()
defer obj.mutex.Unlock()
if tmp, ok := obj.Streams[uuid]; ok {
for _, i2 := range tmp.Channels {
if i2.runLock {
i2.signals <- SignalStreamStop
}
}
delete(obj.Streams, uuid)
err := obj.SaveConfig()
if err != nil {
return err
}
return nil
}
return ErrorStreamNotFound
}
//StreamInfo return stream info
func (obj *StorageST) StreamInfo(uuid string) (*StreamST, error) {
obj.mutex.RLock()
defer obj.mutex.RUnlock()
if tmp, ok := obj.Streams[uuid]; ok {
return &tmp, nil
}
return nil, ErrorStreamNotFound
}