前面我们已经介绍过 AC自动机 ,但在实际使用当中如果需要构建的词典树特别大,原始版本的AC自动机在做查询时耗时会比较多,而基于双数组trie树的AC自动机恰好能够弥补这一缺陷。
下面我们将基于hankcs实现的 AhoCorasickDoubleArrayTrie 代码来讲解双数组trie树的AC自动机的构建以及查询过程。
双数组trie树AC自动机的构建过程分为三步:
public void build(Map<String, V> map) { // 把值保存下来 v = (V[]) map.values().toArray(); l = new int[v.length]; Set<String> keySet = map.keySet(); // 构建二分trie树 addAllKeyword(keySet); // 在二分trie树的基础上构建双数组trie树 buildDoubleArrayTrie(keySet.size()); used = null; // 构建failure表并且合并output表 constructFailureStates(); rootState = null; loseWeight(); }
这里 addAllKeyword(keySet)
构建trie树跟 AC自动机 中的一样,需要在尾节点处的emits里添加keystring,但由于双数组trie树AC自动机只保存数组,不储存树结构,因此这里 数组v内储存所有的词,而emits中添加的是keyword在数组v里的index
private void addKeyword(String keyword, int index) { State currentState = this.rootState; for (Character character : keyword.toCharArray()) { currentState = currentState.addState(character);//trie树添加节点 } currentState.addEmit(index);//尾节点处添加词的index到emits l[index] = keyword.length(); }
outer
循环,找到一个begin
,使得满足对于当前节点tCurrent
和它的所有子节点siblings
,有:begin = base[tCurrent]
begin + code(sibling)
都没用被占用outer: while (true) { pos++;//每当有一个sibling的位置 begin + code(sibling) 已经被占用,pos就加一 if (allocSize <= pos) resize(pos + 1); if (check[pos] != 0) { nonzero_num++; continue; } else if (first == 0) { nextCheckPos = pos; first = 1; } begin = pos - siblings.get(0).getKey(); //这里begin用来记录此时tCurrent的base值 if (allocSize <= (begin + siblings.get(siblings.size() - 1).getKey())) { // progress can be zero // 防止progress产生除零错误 double toSize = Math.max(1.05, 1.0 * keySize / (progress + 1)) * allocSize; int maxSize = (int) (Integer.MAX_VALUE * 0.95); if (allocSize >= maxSize) throw new RuntimeException("Double array trie is too big."); else resize((int) Math.min(toSize, maxSize)); } if (used[begin]) continue; for (int i = 1; i < siblings.size(); i++) if (check[begin + siblings.get(i).getKey()] != 0)//说明位置begin + siblings.get(i).getKey()已经被占用 continue outer; break; }
Note:代码中利用
check[begin + siblings.get(i).getKey()]
是否为零来判断位置begin + code(sibling)
是否已经被占用,若不为零,则说明该位置已经被占用
tCurrent
的base值之后,就可以将所有子节点siblings的check值设为base[tCurrent]
:for (Map.Entry<Integer, State> sibling : siblings) { check[begin + sibling.getKey()] = begin; }
尽管 双数组trie树 中介绍的是 check[child_index] = father_index
,但由于base
是一个单射,所以这里可以直接将check[child_index]
设为 base[father_index]
siblingQueue
中,重复上述过程,通过检验它的子节点index是否冲突才能确认该节点的base值for (Map.Entry<Integer, State> sibling : siblings) { List<Map.Entry<Integer, State>> new_siblings = new ArrayList<Map.Entry<Integer, State>>(sibling.getValue().getSuccess().entrySet().size() + 1); if (fetch(sibling.getValue(), new_siblings) == 0) // 表示当前子节点sibling是叶子节点 { base[begin + sibling.getKey()] = (-sibling.getValue().getLargestValueId() - 1); progress++; } else //对于还有子节点的sibling来说,就要将其加入siblingQueue,然后重复上面的循环 { siblingQueue.add(new AbstractMap.SimpleEntry<Integer, List<Map.Entry<Integer, State>>>(begin + sibling.getKey(), new_siblings)); } sibling.getValue().setIndex(begin + sibling.getKey());//sibling的index是可以确定的 }
Note:不管是否是尾节点,
tCurrent
所有的siblings
的index都是知道的了。所以最后都有sibling.getValue().setIndex(begin + sibling.getKey());
tCurrent
的base值设为begin
:Integer parentBaseIndex = tCurrent.getKey(); if (parentBaseIndex != null) { base[parentBaseIndex] = begin; }
fail node的寻找过程跟 AC自动机 中是一样的,唯一的区别是这里的fail和output都需要做成数组。
假设 index 为 i 的节点,其 fail node 的 index 为 j ,那么有 :
f
a
i
l
[
i
]
=
j
fail[i] = j
fail[i]=j
public void setFailure(State failState, int fail[]) { this.failure = failState; fail[index] = failState.index; }
output数组是用来放每个位置节点的所有emits的:
o
u
t
p
u
t
[
S
t
a
t
e
.
i
n
d
e
x
]
=
S
t
a
t
e
.
e
m
i
t
s
output[State.index] = State.emits
output[State.index]=State.emits
由于 State.emits 本身是个一维数组,所以output是一个二维数组。
private void constructOutput(State targetState) { Collection<Integer> emit = targetState.emit(); if (emit == null || emit.size() == 0) return; int[] output = new int[emit.size()]; Iterator<Integer> it = emit.iterator(); for (int i = 0; i < output.length; ++i) { output[i] = it.next(); } AhoCorasickDoubleArrayTrie.this.output[targetState.getIndex()] = output; }
getState()
查找currentState的子节点中是否有字text(i),如果有,则返回这个节点的index;如果子节点中没有该字,那么就从currentState的fail node的子节点中找并返回:private int getState(int currentState, char character) { int newCurrentState = transitionWithRoot(currentState, character); // 先按success跳转 while (newCurrentState == -1) // 跳转失败的话,按failure跳转 { currentState = fail[currentState]; newCurrentState = transitionWithRoot(currentState, character); } return newCurrentState; }
base[currentState] + code(c)
找到currentState经字符c转移到的位置,接下来要通过检查 check[base[currentState] + code(c)] = base[currentState]
是否成立来判断转移的正确性。如果转移正确,则返回转移到达的位置 base[currentState] + code(c)
protected int transitionWithRoot(int nodePos, char c) { int b = base[nodePos]; int p; p = b + c + 1;// p是nodePos处的节点按字符c转移到的节点的index if (b != check[p])//检查根据c找到的节点,其父节点是否是nodePos位置上的节点 { if (nodePos == 0) return 0; return -1; } return p; }
ouput[]
数组索引到该节点的emits,这里的emits里面又是词语的索引,这里就可以用储存词语的数组v[]
还原text文本hit到的词:private void storeEmits(int position, int currentState, List<Hit<V>> collectedEmits) { int[] hitArray = output[currentState];//利用output索引到currentState的emits数组 if (hitArray != null) { for (int hit : hitArray) { collectedEmits.add(new Hit<V>(position - l[hit], position, v[hit]));//利用数组v还原词语 } } }