forked from nutsdb/nutsdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fd_manager.go
210 lines (192 loc) · 4.9 KB
/
fd_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package nutsdb
import (
"math"
"os"
"path/filepath"
"strings"
"sync"
)
const (
DefaultMaxFileNums = 256
)
const (
TooManyFileOpenErrSuffix = "too many open files"
)
// fdManager hold a fd cache in memory, it lru based cache.
type fdManager struct {
lock sync.Mutex
cache map[string]*FdInfo
fdList *doubleLinkedList
size int
cleanThresholdNums int
maxFdNums int
}
// newFdm will return a fdManager object
func newFdm(maxFdNums int, cleanThreshold float64) (fdm *fdManager) {
fdm = &fdManager{
cache: map[string]*FdInfo{},
fdList: initDoubleLinkedList(),
size: 0,
maxFdNums: DefaultMaxFileNums,
}
fdm.cleanThresholdNums = int(math.Floor(0.5 * float64(fdm.maxFdNums)))
if maxFdNums > 0 {
fdm.maxFdNums = maxFdNums
}
if cleanThreshold > 0.0 && cleanThreshold < 1.0 {
fdm.cleanThresholdNums = int(math.Floor(cleanThreshold * float64(fdm.maxFdNums)))
}
return fdm
}
// FdInfo holds base fd info
type FdInfo struct {
fd *os.File
path string
using uint
next *FdInfo
prev *FdInfo
}
// getFd go through this method to get fd.
func (fdm *fdManager) getFd(path string) (fd *os.File, err error) {
fdm.lock.Lock()
defer fdm.lock.Unlock()
cleanPath := filepath.Clean(path)
if fdInfo := fdm.cache[cleanPath]; fdInfo == nil {
fd, err = os.OpenFile(cleanPath, os.O_CREATE|os.O_RDWR, 0o644)
if err == nil {
// if the numbers of fd in cache larger than the cleanThreshold in config, we will clean useless fd in cache
if fdm.size >= fdm.cleanThresholdNums {
err = fdm.cleanUselessFd()
}
// if the numbers of fd in cache larger than the max numbers of fd in config, we will not add this fd to cache
if fdm.size >= fdm.maxFdNums {
return fd, nil
}
// add this fd to cache
fdm.addToCache(fd, cleanPath)
return fd, nil
} else {
// determine if there are too many open files, we will first clean useless fd in cache and try open this file again
if strings.HasSuffix(err.Error(), TooManyFileOpenErrSuffix) {
cleanErr := fdm.cleanUselessFd()
// if something wrong in cleanUselessFd, we will return "open too many files" err, because we want user not the main err is that
if cleanErr != nil {
return nil, err
}
// try open this file again,if it still returns err, we will show this error to user
fd, err = os.OpenFile(cleanPath, os.O_CREATE|os.O_RDWR, 0o644)
if err != nil {
return nil, err
}
// add to cache if open this file successfully
fdm.addToCache(fd, cleanPath)
}
return fd, err
}
} else {
fdInfo.using++
fdm.fdList.moveNodeToFront(fdInfo)
return fdInfo.fd, nil
}
}
// addToCache add fd to cache
func (fdm *fdManager) addToCache(fd *os.File, cleanPath string) {
fdInfo := &FdInfo{
fd: fd,
using: 1,
path: cleanPath,
}
fdm.fdList.addNode(fdInfo)
fdm.size++
fdm.cache[cleanPath] = fdInfo
}
// reduceUsing when RWManager object close, it will go through this method let fdm know it return the fd to cache
func (fdm *fdManager) reduceUsing(path string) {
fdm.lock.Lock()
defer fdm.lock.Unlock()
cleanPath := filepath.Clean(path)
node, isExist := fdm.cache[cleanPath]
if !isExist {
panic("unexpected the node is not in cache")
}
node.using--
}
// close means the cache.
func (fdm *fdManager) close() error {
fdm.lock.Lock()
defer fdm.lock.Unlock()
node := fdm.fdList.tail.prev
for node != fdm.fdList.head {
err := node.fd.Close()
if err != nil {
return err
}
delete(fdm.cache, node.path)
fdm.size--
node = node.prev
}
fdm.fdList.head.next = fdm.fdList.tail
fdm.fdList.tail.prev = fdm.fdList.head
return nil
}
type doubleLinkedList struct {
head *FdInfo
tail *FdInfo
size int
}
func initDoubleLinkedList() *doubleLinkedList {
list := &doubleLinkedList{
head: &FdInfo{},
tail: &FdInfo{},
size: 0,
}
list.head.next = list.tail
list.tail.prev = list.head
return list
}
func (list *doubleLinkedList) addNode(node *FdInfo) {
list.head.next.prev = node
node.next = list.head.next
list.head.next = node
node.prev = list.head
list.size++
}
func (list *doubleLinkedList) removeNode(node *FdInfo) {
node.prev.next = node.next
node.next.prev = node.prev
node.prev = nil
node.next = nil
}
func (list *doubleLinkedList) moveNodeToFront(node *FdInfo) {
list.removeNode(node)
list.addNode(node)
}
func (fdm *fdManager) cleanUselessFd() error {
cleanNums := fdm.cleanThresholdNums
node := fdm.fdList.tail.prev
for node != nil && node != fdm.fdList.head && cleanNums > 0 {
nextItem := node.prev
if node.using == 0 {
fdm.fdList.removeNode(node)
err := node.fd.Close()
if err != nil {
return err
}
fdm.size--
delete(fdm.cache, node.path)
cleanNums--
}
node = nextItem
}
return nil
}
func (fdm *fdManager) closeByPath(path string) error {
fdm.lock.Lock()
defer fdm.lock.Unlock()
fdInfo, ok := fdm.cache[path]
if !ok {
return nil
}
fdm.fdList.removeNode(fdInfo)
return fdInfo.fd.Close()
}