-
Notifications
You must be signed in to change notification settings - Fork 841
/
filter.py
181 lines (148 loc) · 5.21 KB
/
filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from collections import defaultdict
import re
__all__ = ['NaiveFilter', 'BSFilter', 'DFAFilter']
__author__ = 'observer'
__date__ = '2012.01.05'
class NaiveFilter():
'''Filter Messages from keywords
very simple filter implementation
>>> f = NaiveFilter()
>>> f.add("sexy")
>>> f.filter("hello sexy baby")
hello **** baby
'''
def __init__(self):
self.keywords = set([])
def parse(self, path):
for keyword in open(path):
self.keywords.add(keyword.strip().decode('utf-8').lower())
def filter(self, message, repl="*"):
message = unicode(message).lower()
for kw in self.keywords:
message = message.replace(kw, repl)
return message
class BSFilter:
'''Filter Messages from keywords
Use Back Sorted Mapping to reduce replacement times
>>> f = BSFilter()
>>> f.add("sexy")
>>> f.filter("hello sexy baby")
hello **** baby
'''
def __init__(self):
self.keywords = []
self.kwsets = set([])
self.bsdict = defaultdict(set)
self.pat_en = re.compile(r'^[0-9a-zA-Z]+$') # english phrase or not
def add(self, keyword):
if not isinstance(keyword, unicode):
keyword = keyword.decode('utf-8')
keyword = keyword.lower()
if keyword not in self.kwsets:
self.keywords.append(keyword)
self.kwsets.add(keyword)
index = len(self.keywords) - 1
for word in keyword.split():
if self.pat_en.search(word):
self.bsdict[word].add(index)
else:
for char in word:
self.bsdict[char].add(index)
def parse(self, path):
with open(path, "r") as f:
for keyword in f:
self.add(keyword.strip())
def filter(self, message, repl="*"):
if not isinstance(message, unicode):
message = message.decode('utf-8')
message = message.lower()
for word in message.split():
if self.pat_en.search(word):
for index in self.bsdict[word]:
message = message.replace(self.keywords[index], repl)
else:
for char in word:
for index in self.bsdict[char]:
message = message.replace(self.keywords[index], repl)
return message
class DFAFilter():
'''Filter Messages from keywords
Use DFA to keep algorithm perform constantly
>>> f = DFAFilter()
>>> f.add("sexy")
>>> f.filter("hello sexy baby")
hello **** baby
'''
def __init__(self):
self.keyword_chains = {}
self.delimit = '\x00'
def add(self, keyword):
if not isinstance(keyword, unicode):
keyword = keyword.decode('utf-8')
keyword = keyword.lower()
chars = keyword.strip()
if not chars:
return
level = self.keyword_chains
for i in range(len(chars)):
if chars[i] in level:
level = level[chars[i]]
else:
if not isinstance(level, dict):
break
for j in range(i, len(chars)):
level[chars[j]] = {}
last_level, last_char = level, chars[j]
level = level[chars[j]]
last_level[last_char] = {self.delimit: 0}
break
if i == len(chars) - 1:
level[self.delimit] = 0
def parse(self, path):
with open(path) as f:
for keyword in f:
self.add(keyword.strip())
def filter(self, message, repl="*"):
if not isinstance(message, unicode):
message = message.decode('utf-8')
message = message.lower()
ret = []
start = 0
while start < len(message):
level = self.keyword_chains
step_ins = 0
for char in message[start:]:
if char in level:
step_ins += 1
if self.delimit not in level[char]:
level = level[char]
else:
ret.append(repl * step_ins)
start += step_ins - 1
break
else:
ret.append(message[start])
break
else:
ret.append(message[start])
start += 1
return ''.join(ret)
def test_first_character():
gfw = DFAFilter()
gfw.add("1989年")
assert gfw.filter("1989", "*") == "1989"
if __name__ == "__main__":
# gfw = NaiveFilter()
# gfw = BSFilter()
gfw = DFAFilter()
gfw.parse("keywords")
import time
t = time.time()
print gfw.filter("法轮功 我操操操", "*")
print gfw.filter("针孔摄像机 我操操操", "*")
print gfw.filter("售假人民币 我操操操", "*")
print gfw.filter("传世私服 我操操操", "*")
print time.time() - t
test_first_character()