forked from yinhanyan/DS-FD
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lmfd.py
117 lines (98 loc) · 3.57 KB
/
lmfd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import numpy as np
import numpy.typing as npt
from dataclasses import dataclass, field
from collections import deque
from typing import Dict
from scipy import linalg
from copy import deepcopy
@dataclass
class LMBlock():
sketch: npt.NDArray | None
start: np.uint64
end: np.uint64
size: float
class LMFD():
def __init__(self, N, d, sketch_dim) -> None:
self.L = 1
self.levels: Dict[int, deque[LMBlock]] = {1: deque()}
self.N = N
self.d = d
self.sketch_dim = sketch_dim
self.b = self.sketch_dim
self.time = np.uint64(0)
self.B: LMBlock = LMBlock(None, self.time, self.time, 0.)
def __merge(self, B1, B2):
if B1 is not None and B2 is not None:
stacked = np.vstack([B1, B2])
_, s, Vt = linalg.svd(
stacked, full_matrices=False, lapack_driver='gesvd')
s = s ** 2
if len(s) > self.sketch_dim:
s = s[:self.sketch_dim] - s[self.sketch_dim]
Vt = Vt[:self.sketch_dim]
merged = Vt * np.sqrt(s).reshape(-1, 1)
return merged
elif B1 is not None:
return B1
elif B2 is not None:
return B2
else:
return np.zeros((self.sketch_dim, self.d), dtype=np.float64)
# @profile
def fit(self, X, t=None):
if t != None:
self.time = np.uint64(t)
else:
self.time += np.uint64(1)
for l in list(reversed(self.levels)):
q = self.levels[l]
while len(q) != 0:
if q[0].end+self.N <= self.time:
q.popleft()
else:
break
else:
if l != 1:
del self.levels[l]
self.L = l-1
self.B.size += [email protected]
self.B.end = self.time
if self.B.sketch is None:
self.B.sketch = X
self.B.start = self.time
else:
self.B.sketch = np.vstack([self.B.sketch, X])
# or len(self.B.sketch) >= self.sketch_dim:
if self.B.size >= self.sketch_dim:
self.levels[1].append(self.B)
self.B = LMBlock(None, self.time, self.time, 0.)
for l in range(1, self.L+1):
if len(self.levels[l]) >= self.b+1:
if l+1 not in self.levels:
self.levels[l+1] = deque()
self.L = l+1
B1 = self.levels[l].popleft()
if B1.size + self.levels[l][0].size >= (2**(l+1))*self.sketch_dim:
self.levels[l+1].append(B1)
else:
B2 = self.levels[l].popleft()
merged = self.__merge(B1.sketch, B2.sketch)
self.levels[l+1].append(LMBlock(merged, start=B1.start,
end=B2.end, size=B1.size+B2.size))
# @profile
def get(self):
ret = deepcopy(self.B.sketch)
for i in range(self.L, 0, -1):
for B in self.levels[i]:
ret = self.__merge(ret, B.sketch)
return ret, None, None, None
def get_size(self):
res = 0
if self.B.sketch is not None:
res += self.B.sketch.shape[0]
for l in self.levels:
for b in self.levels[l]:
if b.sketch is not None:
res += b.sketch.shape[0]
return res
# return sum([b.sketch.shape[0] if b.sketch is not None else 0 for l in self.levels for b in self.levels[l]]) + self.B.sketch.shape[0] if self.B.sketch is not None else 0