forked from emad-elsaid/xlog
-
Notifications
You must be signed in to change notification settings - Fork 0
/
each.go
136 lines (111 loc) · 2.44 KB
/
each.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package xlog
import (
"context"
"regexp"
"runtime"
"sync"
"golang.org/x/sync/errgroup"
)
func init() {
Listen(AfterWrite, clearPagesCache)
Listen(AfterDelete, clearPagesCache)
}
// a List of directories that should be ignored by directory walking function.
// for example the versioning extension can register `.versions` directory to be
// ignored
var ignoredDirs = []*regexp.Regexp{
regexp.MustCompile(`\..+`), // Ignore any hidden directory
}
// IgnoreDirectory Register a pattern to be ignored when walking directories.
func IgnoreDirectory(r *regexp.Regexp) {
ignoredDirs = append(ignoredDirs, r)
}
var pages []Page
func Pages(ctx context.Context) []Page {
if pages == nil {
populatePagesCache(ctx)
}
return pages[:]
}
// EachPage iterates on all available pages. many extensions
// uses it to get all pages and maybe parse them and extract needed information
func EachPage(ctx context.Context, f func(Page)) {
if pages == nil {
populatePagesCache(ctx)
}
currentPages := pages
for _, p := range currentPages {
select {
case <-ctx.Done():
return
default:
f(p)
}
}
}
var concurrency = runtime.NumCPU() * 4
// EachPageCon Similar to EachPage but iterates concurrently
func EachPageCon(ctx context.Context, f func(Page)) {
if pages == nil {
populatePagesCache(ctx)
}
grp, ctx := errgroup.WithContext(ctx)
grp.SetLimit(concurrency)
currentPages := pages
for _, p := range currentPages {
select {
case <-ctx.Done():
break
default:
grp.Go(func() (err error) { f(p); return })
}
}
grp.Wait()
}
// MapPageCon Similar to EachPage but iterates concurrently
func MapPageCon[T any](ctx context.Context, f func(Page) *T) []*T {
if pages == nil {
populatePagesCache(ctx)
}
grp, ctx := errgroup.WithContext(ctx)
grp.SetLimit(concurrency)
currentPages := pages
output := make([]*T, 0, len(currentPages))
var outputLck sync.Mutex
for _, p := range currentPages {
select {
case <-ctx.Done():
break
default:
grp.Go(func() (err error) {
val := f(p)
if val == nil {
return
}
outputLck.Lock()
output = append(output, val)
outputLck.Unlock()
return
})
}
}
grp.Wait()
return output
}
func clearPagesCache(p Page) (err error) {
pages = nil
return nil
}
func populatePagesCache(ctx context.Context) {
pages = []Page{}
for _, s := range sources {
select {
case <-ctx.Done():
return
default:
s.Each(ctx, func(p Page) {
pages = append(pages, p)
})
}
}
}