以前写的例子,都是基于用空格分割的单词,英文文本本身就是用空格分割,识别相对容易,但是中文之间是没有空格的,严格地说,中文没有可识别的分隔符,能够识别中文词汇来自于中文语法规则,计算机哪里会?所以必须基于一些词库来识别。所以很多大数据处理框架都提供了使用中文分词器的功能。这里我们是用一款叫做结巴分词器的工具,来对输入源的中文进行分词。
在上一次修改过的基础之上https://blog.csdn.net/xxkalychen/article/details/117136261?spm=1001.2014.3001.5501,我们对程序稍作修改,使程序具有中文分词功能。
一、添加结巴分词器的pom依赖库。
<dependency> <groupId>com.huaban</groupId> <artifactId>jieba-analysis</artifactId> <version>1.0.2</version> </dependency>
二、修改LineBolt。因为我们要在单词拆分的逻辑里面进行处理,所以在这里修改。修改的内容也不是很多。
package com.chris.storm.bolt; import com.huaban.analysis.jieba.JiebaSegmenter; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * @author Chris Chan * Create on 2021/5/19 9:44 * Use for: * Explain: */ public class LineBolt extends BaseRichBolt { //bolt数据收集器 private OutputCollector collector; //结巴中文分词器 private JiebaSegmenter jiebaSegmenter; /** * 初始化操作 * * @param map * @param topologyContext * @param outputCollector */ @Override public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; jiebaSegmenter = new JiebaSegmenter(); } /** * 处理数据 * * @param tuple */ @Override public void execute(Tuple tuple) { //再上一个流水线spout中我们把一行行数据放在元组中的line字段,在这里我们把它读取出来 String line = tuple.getStringByField("line"); //根据数据特性,用空格进行分割 String[] words = line.split(" "); //中文分词 使用结巴分词器,并过滤掉空数据 List<String> wordList = Arrays.stream(words) .flatMap(word -> this.jiebaSegmenter.process(word, JiebaSegmenter.SegMode.SEARCH).stream()) .map(segToken -> segToken.word) .filter(word -> !"".equals(word.trim())) .collect(Collectors.toList()); //发送给下一个bolt进行处理 面向多任务分支需要定义streamId wordList.forEach(word -> { this.collector.emit("print", new Values(word)); this.collector.emit("count", new Values(word)); }); } /** * 定义输出数据元组字段 * * @param outputFieldsDeclarer */ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //我们把拆分后的单词放到元组中名为word的字段中 多任务分支需要定义streamId outputFieldsDeclarer.declareStream("print", new Fields("word")); outputFieldsDeclarer.declareStream("count", new Fields("word")); } }
内中涉及两个处理,一个是用空格分割,一个是用中文分词,空格分割要在中文分词之前,因为中文词汇是不包含空格的。
好了,我们的修改就结束了。启动各种服务器,Kafka输入待命。启动提交测试。
输入句子,中文英文都可以,还可以模拟商品搜索。
查看数据库。
效果和我们预想的一样。看起来,直接使用中文词来做ES的文档ID可不太合适,建议还是经过一次编码为好。