This repository has been archived by the owner. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
109 lines (90 loc) · 2.7 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package priopool
import (
"container/heap"
"errors"
"fmt"
"sync"
"github.com/panjf2000/ants/v2"
)
type (
// PriorityPool is a pool of goroutines with priority queue buffer.
// Based on panjf2000/ants and stdlib heap libraries.
PriorityPool struct {
mu sync.Mutex // thread safe access to priority queue
pool *ants.Pool
queue priorityQueue
limit int
}
)
const defaultQueueCapacity = 10
var (
// ErrQueueOverload will be returned on submit operation
// when both goroutines pool and priority queue are full.
ErrQueueOverload = errors.New("pool and priority queue are full")
// ErrPoolCapacitySize will be returned when constructor
// provided with non-positive pool capacity.
ErrPoolCapacitySize = errors.New("pool capacity must be positive")
)
// New creates instance of priority pool. Pool capacity must be positive.
// Zero queue capacity disables priority queue. Negative queue capacity
// disables priority queue length limit.
func New(poolCapacity, queueCapacity int) (*PriorityPool, error) {
if poolCapacity <= 0 {
return nil, ErrPoolCapacitySize
}
pool, err := ants.NewPool(poolCapacity, ants.WithNonblocking(true))
if err != nil {
return nil, fmt.Errorf("creating pool instance: %w", err)
}
var queue priorityQueue
switch {
case queueCapacity >= 0:
queue.tasks = make([]*priorityQueueTask, 0, queueCapacity)
case queueCapacity < 0:
queue.tasks = make([]*priorityQueueTask, 0, defaultQueueCapacity)
}
return &PriorityPool{
pool: pool,
queue: queue,
limit: queueCapacity,
}, nil
}
// Submit sends the task into priority pool. Non-blocking operation. If pool has
// available workers, then task executes immediately. If pool is full, then task
// is stored in priority queue. It will be executed when available worker pops
// the task from priority queue. Tasks from queue do not evict running tasks
// from pool. Tasks with bigger priority number are popped earlier.
// If queue is full, submit returns ErrQueueOverload error.
func (p *PriorityPool) Submit(priority uint32, task func()) error {
p.mu.Lock() // lock from the beginning to avoid starving
defer p.mu.Unlock()
err := p.pool.Submit(func() {
task()
// pick the highest priority item from the queue
// process items until queue is empty
for {
p.mu.Lock()
if p.queue.Len() == 0 {
p.mu.Unlock()
return
}
queueF := heap.Pop(&p.queue)
p.mu.Unlock()
queueF.(*priorityQueueTask).value()
}
})
if err == nil {
return nil
}
if !errors.Is(err, ants.ErrPoolOverload) {
return fmt.Errorf("pool submit: %w", err)
}
if p.limit >= 0 && p.queue.Len() >= p.limit {
return ErrQueueOverload
}
heap.Push(&p.queue, &priorityQueueTask{
value: task,
priority: int(priority),
})
return nil
}