-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRadioWatch.go
180 lines (162 loc) · 3.86 KB
/
RadioWatch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
/*
Package radiowatch provides a small framework to crawl radio stations periodically.
*/
package radiowatch
import (
"sync"
"time"
"strings"
log "github.com/Sirupsen/logrus"
)
type (
/*
A Watcher is the main object.
It keeps track of the crawlers, starts crawling,
takes the results and delegates them to the Writer which will persist them.
*/
Watcher struct {
refreshInterval time.Duration
crawlers []Crawler
ticker *time.Ticker
writer Writer
}
/*
A concrete Crawler implementation crawls one specific radio station and returns
information about the currently played track.
*/
Crawler interface {
/*
Takes the needed actions to get information about the currently played track.
Returns the information in a TrackInfo struct or an error.
*/
Crawl() (*TrackInfo, error)
/*
Returns the name of the radio station it is crawling.
*/
Name() string
/*
Returns the time at which this crawler should be run next
*/
NextCrawlTime() time.Time
}
// A Writer takes the TrackInfo and persists it
Writer interface {
// Persists the TrackInfo to a concrete medium.
// How and where it is saved is up to the concrete implementer.
// Any error is silently dropped. The implementor can write a message to stderr.
Write(TrackInfo)
}
)
/*
Returns a new instance of Watcher.
Takes a concrete implementation of Writer which handles the persisting of results
*/
func NewWatcher(resultsWriter Writer) *Watcher {
w := &Watcher{writer: resultsWriter}
w.SetInterval("60s")
return w
}
/*
The watcher will check the crawlers every {interval} seconds if its crawling time
is in the past and should therefore be crawled now.
Uses time.ParseDuration and takes therefore the same input like "60s" or "5m".
Returns only an error if the input was invalid.
*/
func (w *Watcher) SetInterval(interval string) error {
duration, err := time.ParseDuration(interval)
if err != nil {
return err
}
w.refreshInterval = duration
return nil
}
/*
Add a concrete Crawler to this watcher
*/
func (w *Watcher) AddCrawler(c Crawler) {
w.crawlers = append(w.crawlers, c)
}
/*
Add several Crawlers at once
*/
func (w *Watcher) AddCrawlers(crawlers []Crawler) {
for _, e := range crawlers {
w.AddCrawler(e)
}
}
/*
Run all Crawlers.
*/
func (w *Watcher) runCrawlers() {
start := time.Now()
tracks := make(chan *TrackInfo)
var wg sync.WaitGroup
var counter uint16
go func() {
for _, c := range w.crawlers {
if c.NextCrawlTime().Before(start) {
wg.Add(1)
go func(crawler Crawler) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
log.WithFields(log.Fields{
"crawler": crawler.Name(),
"message": r,
}).Error("Crawler panicked")
}
}()
counter++
track, err := crawler.Crawl()
if err != nil {
log.WithFields(log.Fields{
"error": err.Error(),
"crawler": crawler.Name(),
}).Error("Error while crawling")
return
}
log.WithFields(log.Fields{
"station": track.Station,
"artist": track.Artist,
"title": track.Title,
}).Info("Crawled station")
track.Artist = strings.TrimSpace(track.Artist)
track.Title = strings.TrimSpace(track.Title)
tracks <- track
}(c)
}
}
wg.Wait()
close(tracks)
}()
for track := range tracks {
go func() {
w.writer.Write(*track)
}()
}
if counter > 0 {
log.WithFields(log.Fields{
"count": counter,
"duration": time.Now().Sub(start).Seconds(),
}).Info("Crawling finished")
}
}
/*
Starts the crawling. This will check ever {interval} seconds if one of the
crawler should been run and starts it.
*/
func (w *Watcher) StartCrawling() {
w.runCrawlers()
w.ticker = time.NewTicker(w.refreshInterval)
go func() {
for range w.ticker.C {
w.runCrawlers()
}
}()
}
/*
Stops the crawling
*/
func (w *Watcher) StopCrawling() {
w.ticker.Stop()
}