-
Notifications
You must be signed in to change notification settings - Fork 0
/
schedule.go
105 lines (97 loc) · 3.07 KB
/
schedule.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"context"
"github.com/Garionion/ffmpeg-playout/api"
"github.com/Garionion/playout-controller/fahrplan"
"github.com/Garionion/playout-controller/store"
"github.com/golang/protobuf/ptypes"
"github.com/grafov/bcast"
jsoniter "github.com/json-iterator/go"
"log"
"time"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
//nolint:funlen
func schedule(cfg *Configuration, store *store.Store, jobs map[int]fahrplan.PlayoutJob, scheduledJobs map[int]api.ScheduledJob, addPadding bool) map[int]api.ScheduledJob {
servers := store.GrpcClients
defaultRoom, defRoomExist := servers[""]
for _, job := range jobs {
playoutClient, ok := servers[job.Room]
if !ok {
if defRoomExist {
log.Printf("server for Room %s not found, using default Room\n", job.Room)
playoutClient = defaultRoom
} else {
log.Printf("server for Room %s not found\n", job.Room)
continue
}
}
var postPadding time.Duration
if !job.Next.IsZero() && cfg.MaxPostPadding > job.Next.Sub(job.Start.Add(job.Duration)) {
postPadding = job.Next.Sub(job.Start.Add(job.Duration))
} else {
postPadding = cfg.MaxPostPadding
}
jobStop := job.Start.Add(job.Duration)
if addPadding {
job.Start = job.Start.Add(-cfg.PrePadding)
jobStop = jobStop.Add(postPadding)
}
start, err := ptypes.TimestampProto(job.Start)
if err != nil {
log.Printf("%d: Failed to convert Start-Timestamp: %v", job.ID, err)
}
stop, err := ptypes.TimestampProto(jobStop)
if err != nil {
log.Printf("%d: Failed to convert Stop-Timestamp: %v", job.ID, err)
}
playoutJob := &api.Job{
StartAt: start,
StopAt: stop,
Source: job.Source,
ID: int64(job.ID),
Version: job.Version,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
scheduledJob, err := playoutClient.SchedulePlayout(ctx, playoutJob)
if err != nil {
log.Printf("Failed to schedule %d: %v", job.ID, err)
cancel()
continue
}
log.Printf("Scheduled %v", job.ID)
scheduledJob.Room = job.Room
scheduledJobs[job.ID] = *scheduledJob
cancel()
}
return scheduledJobs
}
func removeAlreadyScheduledJobs(jobs map[int]fahrplan.PlayoutJob, scheduled map[int]api.ScheduledJob) map[int]fahrplan.PlayoutJob {
for id := range scheduled {
if jobs[id].Version != scheduled[id].Version {
continue
}
delete(jobs, id)
}
return jobs
}
func scheduler(cfg *Configuration, store *store.Store, upcomingChannel *bcast.Member, scheduledChannel *bcast.Member) chan struct{} {
quit := make(chan struct{})
go func(cfg *Configuration, upcomingChannel *bcast.Member, scheduledChannel *bcast.Member) {
scheduled := make(map[int]api.ScheduledJob)
for upcoming := range upcomingChannel.Read {
u := upcoming.(map[int]fahrplan.PlayoutJob)
if !cfg.AutoSchedule {
continue
}
toSchedule := removeAlreadyScheduledJobs(u, scheduled)
if len(toSchedule) == 0 {
log.Println("Nothing new to Schedule")
} else {
scheduled = schedule(cfg, store, toSchedule, scheduled, true)
scheduledChannel.Send(scheduled)
}
}
}(cfg, upcomingChannel, scheduledChannel)
return quit
}