基于前缀词典实现高效的词图扫描,生成句子中汉字所有可能成词情况所构成的有向无环图 (DAG)
采用了动态规划查找最大概率路径, 找出基于词频的最大切分组合
对于未登录词,采用了基于汉字成词能力的 HMM 模型,使用了 Viterbi 算法
- 代码解析
# +? 匹配1次或者多次 re_userdict = re.compile('^(.+?)( [0-9]+)?( [a-z]+)?$', re.U) # 匹配一个数字或者字母 re_eng = re.compile('[a-zA-Z0-9]', re.U) # \u4E00-\u9FD5a-zA-Z0-9+#&\._ : All non-space characters. Will be handled with re_han # \r\n|\s : whitespace characters. Will not be handled. #匹配中文 英文字母 数字 后面的特殊字符 re_han_default = re.compile("([\u4E00-\u9FD5a-zA-Z0-9+#&\._%]+)", re.U) re_skip_default = re.compile("(\r\n|\s)", re.U) # 匹配中文 re_han_cut_all = re.compile("([\u4E00-\u9FD5]+)", re.U) # 匹配不是英文 数字 后面的特殊字符的字符 re_skip_cut_all = re.compile("[^a-zA-Z0-9+#\n]", re.U)
- 调用路径: __cut_all --> get_DAG--->check_initialized--->initialized--->gen_pfdict(可能调用)
- initialized执行功能 >>1. 允许初始化新路径下的字典(优先),否则默认使用类定义的字典路径 >>2. 尝试加载模型缓存文件 >>>1. 对 DICT_WRITING[abs_path] = wlock 加锁 >>>2. 首先判断锁的状态, 以及一些条件判断 >>>3. 尝试从缓存文件中加载模型(使用marshal package) >>>4. 加载成功的情况下,获得self.FREQ, self.total >>3. 加载失败,调用gen_pfdict函数,获得self.FREQ, self.total,并且这两数据 缓存到缓存文件中 >>4. self.initialized = True
- gen_pfdict函数 >>1. 输入是open(dict_path, "rb")对象 >>2. 输出是: >>>1. dict数据结构,存储字典文件中词语、词语出现的频率,同时存储词语 的前缀词,并且不在词典中的前缀词的的频率设置为0 即self.FREQ >>>2. 词语频率的累加和,即self.total
for lineno, line in enumerate(f, 1): try: line = line.strip().decode('utf-8') word, freq = line.split(' ')[:2] freq = int(freq) lfreq[word] = freq ltotal += freq for ch in xrange(len(word)): # 获得该词语的所有前缀词 wfrag = word[:ch + 1] # 不在词典中的前缀词的频率设置为0 if wfrag not in lfreq: lfreq[wfrag] = 0 except ValueError: raise ValueError( 'invalid dictionary entry in %s at Line %s: %s' % (f_name, lineno, line))
- get_DAG函数执行功能 >>1. 不使用Tri树,使用前缀集合的原因,减少内存占用,加快速度, fxsjy/jieba#187 >>2. get_DAG可以看成是获得词图(有向无环图),也可以看成是获得以中文字 为开始的前缀集合
def get_DAG(self, sentence): """ 根据字典构造输入句子的词图 :param sentence: :return: ** DAG是一个字典结构,存储词图,没有使用邻接表 ** key存储的开始字符,value前缀词集合,其中存储的前缀词结束的位置 """ self.check_initialized() DAG = {} N = len(sentence) for k in xrange(N): tmplist = [] i = k frag = sentence[k] # 遍历k至len(sentence)之间的连续字符,判断能否组成字典中的词语 while i < N and frag in self.FREQ: # 不会存储频率为零的前缀词语(也就是没在字典文件中) if self.FREQ[frag]: tmplist.append(i) i += 1 frag = sentence[k:i + 1] if not tmplist: tmplist.append(k) DAG[k] = tmplist return DAG
- __cut_all函数执行功能
def __cut_all(self, sentence): """ 输出所有的子串,比如 我来到北京清华大学, 分词输出为: Full Mode: 我/ 来到/ 北京/ 清华/ 清华大学/ 华大/ 大学 :param sentence: :return: """ dag = self.get_DAG(sentence) # 表示前一个词语的结束位置 old_j = -1 for k, L in iteritems(dag): # len(L) == 1表示这个字符单独成词 if len(L) == 1 and k > old_j: yield sentence[k:L[0] + 1] old_j = L[0] else: # 输出list中所有的词语 for j in L: if j > k: yield sentence[k:j + 1] old_j = j
- calc函数
def calc(self, sentence, DAG, route):
** 从右往左计算,对句子从右往左反向计算最大概率(一些教科书上可能是从左往右,
这里反向是因为汉语句子的重心经常落在后面, 就是落在右边, 因为通常情况下形容词太多, 后面的才是主干,
因此, 从右往左计算, 正确率要高于从左往右计算, 这个类似于逆向最大匹配)
** 增加了结束节点N, 权重初始化为logtoal = log(self.total)
** 参考词图,计算的公式是 sentence_score[j] = log∏w, 计算sentence每一个位置的得分
** DAG[idx] 获取idx位置的前缀集合,x是其中的一个前缀结束位置,[idx:x + 1]可以组合成词的
** 存储最大的概率值和最大概率值的结束位置
:param sentence: 句子
:param DAG: sentence的词图, 数组的数组
:param route: dict,也就是需要返回的数据结构
N = len(sentence)
route[N] = (0, 0)
logtotal = log(self.total)
for idx in xrange(N - 1, -1, -1):
# 不明白为什么减去 logtoal
route[idx] = max((log(self.FREQ.get(sentence[idx:x + 1]) or 1) -
logtotal + route[x + 1][0], x) for x in DAG[idx])
- 使用词图 HMM分割
def __cut_DAG(self, sentence): """ 算法: ** get_DAG 或者前缀集合(词图) ** 动态规划计算最大概率 ** 对于连续的单字组成单字串,如果没有出现在词典中,但是是一个前缀词, 使用finalseg.cut进行分割,否则输出单字 :param sentence: :return: """ DAG = self.get_DAG(sentence) route = {} self.calc(sentence, DAG, route) x = 0 buf = '' N = len(sentence) while x < N: y = route[x][1] + 1 l_word = sentence[x:y] if y - x == 1: buf += l_word # 遇到一个词语的时候,输出,并且尝试对之前连续的单字的字符串进行分割 else: if buf: if len(buf) == 1: yield buf buf = '' else: # 单字串没有出现在词典中,才调用分割,否则输出单字 if not self.FREQ.get(buf): recognized = finalseg.cut(buf) for t in recognized: yield t else: for elem in buf: yield elem buf = '' # 输出该次的词语,非单字 yield l_word x = y if buf: if len(buf) == 1: yield buf elif not self.FREQ.get(buf): recognized = finalseg.cut(buf) for t in recognized: yield t else: for elem in buf: yield elem
- 使用词图,但是不使用HMM分割单字串
def __cut_DAG_NO_HMM(self, sentence):
** 使用最大概率,但是不适用HMM分割
** 单字处理,如果分割出来的词语是一个英文字母或者数字并且长度为1,那么将这种类型的单字符连接起来
:param sentence:
DAG = self.get_DAG(sentence)
route = {}
self.calc(sentence, DAG, route)
x = 0
N = len(sentence)
buf = ''
while x < N:
y = route[x][1] + 1
l_word = sentence[x:y]
if re_eng.match(l_word) and len(l_word) == 1:
buf += l_word
x = y
if buf:
yield buf
buf = ''
yield l_word
x = y
if buf:
yield buf
buf = ''
- 为了搜索的分词函数, 产生额外的2、3-gram
def cut_for_search(self, sentence, HMM=True):
Finer segmentation for search engines.
words = self.cut(sentence, HMM=HMM)
for w in words:
if len(w) > 2:
for i in xrange(len(w) - 1):
gram2 = w[i:i + 2]
if self.FREQ.get(gram2):
yield gram2
if len(w) > 3:
for i in xrange(len(w) - 2):
gram3 = w[i:i + 3]
if self.FREQ.get(gram3):
yield gram3
yield w
- 模型已经预测完成,主要是通过viterbi算法进行预测
- def viterbi(obs, states, start_p, trans_p, emit_p)函数
- def __cut(sentence)函数,调用viterbi函数,并且组装序列预测的的结果
- def cut(sentence)函数,对外提供的接口函数
- 代码结构和分词基本一致,分词,词典中的词语使用已有的词性,否则使用HMM
- 三个特点和主要缺陷讨论 http://dev.dafan.info/detail/452310?p=14-40
参考文献: TextRank: Bringing Order into Texts
- 节点:The vertices added to the graph can be restricted with syntactic filters, which select only lexical units of a certain part of speech.
- 边: We are using a co-occurrence relation,controlled by the distance between word occurrences:two vertices are connected if their correspondinglexical units co-occur within a window of maximum words, where can be set anywhere from 2 to 10 words. Co-occurrence links express relations between syntactic elements, and similar to the semantic links found useful for the task of word sense disambiguation
- 执行像Pagerank一样的迭代算法,更新每一个节点的分数
class UndirectWeightedGraph:
d = 0.85
def __init__(self):
self.graph = defaultdict(list)
def addEdge(self, start, end, weight):
# use a tuple (start, end, weight) instead of a Edge object
self.graph[start].append((start, end, weight))
self.graph[end].append((end, start, weight))
def rank(self):
ws = defaultdict(float)
outSum = defaultdict(float)
wsdef = 1.0 / (len(self.graph) or 1.0)
for n, out in self.graph.items():
ws[n] = wsdef
outSum[n] = sum((e[2] for e in out), 0.0)
# this line for build stable iteration
sorted_keys = sorted(self.graph.keys())
for x in xrange(10): # 10 iters
for n in sorted_keys:
s = 0
for e in self.graph[n]:
s += e[2] / outSum[e[1]] * ws[e[1]]
ws[n] = (1 - self.d) + self.d * s
(min_rank, max_rank) = (sys.float_info[0], sys.float_info[3])
for w in itervalues(ws):
if w < min_rank:
min_rank = w
if w > max_rank:
max_rank = w
for n, w in ws.items():
# to unify the weights, don't *100.
ws[n] = (w - min_rank / 10.0) / (max_rank - min_rank / 10.0)
return ws
class TextRank(KeywordExtractor):
def __init__(self):
self.tokenizer = self.postokenizer = jieba.posseg.dt
self.stop_words = self.STOP_WORDS.copy()
self.pos_filt = frozenset(('ns', 'n', 'vn', 'v'))
self.span = 5
def pairfilter(self, wp):
return (wp.flag in self.pos_filt and len(wp.word.strip()) >= 2
and wp.word.lower() not in self.stop_words)
def textrank(self, sentence, topK=20, withWeight=False, allowPOS=('ns', 'n', 'vn', 'v'), withFlag=False):
Extract keywords from sentence using TextRank algorithm.
- topK: return how many top keywords. `None` for all possible words.
- withWeight: if True, return a list of (word, weight);
if False, return a list of words.
- allowPOS: the allowed POS list eg. ['ns', 'n', 'vn', 'v'].
if the POS of w is not in this list, it will be filtered.
- withFlag: if True, return a list of pair(word, weight) like posseg.cut
if False, return a list of words
self.pos_filt = frozenset(allowPOS)
g = UndirectWeightedGraph()
cm = defaultdict(int)
words = tuple(self.tokenizer.cut(sentence))
for i, wp in enumerate(words):
if self.pairfilter(wp):
for j in xrange(i + 1, i + self.span):
if j >= len(words):
if not self.pairfilter(words[j]):
if allowPOS and withFlag:
cm[(wp, words[j])] += 1
cm[(wp.word, words[j].word)] += 1
for terms, w in cm.items():
g.addEdge(terms[0], terms[1], w)
nodes_rank = g.rank()
if withWeight:
tags = sorted(nodes_rank.items(), key=itemgetter(1), reverse=True)
tags = sorted(nodes_rank, key=nodes_rank.__getitem__, reverse=True)
if topK:
return tags[:topK]
return tags
extract_tags = textrank