a. 后台启动Kafka(三台都要启动)
nohup ~/bigdata/kafka_2.11-1.0.0/bin/kafka-server-start.sh ~/bigdata/kafka_2.11-1.0.0/config/server.properties >~/bigdata/kafka_2.11-1.0.0/logs/server.log 2>&1 &
a. 创建Topic:word-count-input
~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic word-count-input
b. 创建Topic:word-count-output
~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic word-count-output
a. 启动一个producer,向word-count-input
发送消息
进入到$KAFKA_HOME
路径:cd ~/bigdata/kafka_2.11-1.0.0
启动:
bin/kafka-console-producer.sh --broker-list master:9092 --topic word-count-input
b. 启动一个consumer,消费word-count-output
的消息
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic word-count-output --property print.key=true0x02 Storm准备
a. 引入Storm依赖
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.2.2</version> <scope>provided</scope> </dependency>
b. 引入Kafka依赖
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.2.2</version> </dependency>
c. 引入额外打包插件
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <testExcludes> <testExclude>/src/test/**</testExclude> </testExcludes> <encoding>utf-8</encoding> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- this is used for inheritance merges --> <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 --> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
d. 完整的pom.xml
文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.shaonaiyi</groupId> <artifactId>stormlearning</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.2.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.2.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <testExcludes> <testExclude>/src/test/**</testExclude> </testExcludes> <encoding>utf-8</encoding> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- this is used for inheritance merges --> <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 --> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
a. 项目代码结构
b. KafkaSpoutBuilder
package com.shaonaiyi.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import java.util.List; /** * @author: shaonaiyi * @createTime: 2019/07/14 13:32 * @description: KafkaSpout构建器 */ public class KafkaSpoutBuilder { private List<String> brokers; private String topic; public KafkaSpoutBuilder brokers(List<String> v) { brokers = v; return this; } public KafkaSpoutBuilder topic(String v) { topic = v; return this; } public KafkaSpout build() { /** 配置kafka * 1. 需要设置consumer group(注意一个partition中的消息只能被同一group中的一个consumer消费) * 2. 起始消费策略:根据业务需要配置 */ String allBrokers = String.join(",", brokers); KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig .builder(allBrokers, topic) .setProp(ConsumerConfig.GROUP_ID_CONFIG, "word-count-storm") //消费最新的数据 .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .build(); return new KafkaSpout(conf); } }
c. KafkaSplitSentenceBolt
package com.shaonaiyi.kafka; /** * @author: shaonaiyi * @createTime: 2019/07/14 13:38 * @description: 语句分割bolt */ import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * 如,接收的Tuple是:Tuple("sentence" -> "I love teacher shao") * 则,输出的Tuple为: * Tuple("word" -> "I") * Tuple("word" -> "love") * Tuple("word" -> "teacher") * Tuple("word" -> "shao") */ public class KafkaSplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } @Override public void execute(Tuple tuple) { // 实时接收SentenceSpout中输出的Tuple流 String sentence = tuple.getStringByField("value"); // 根据key获取Tuple中的语句,"value"是Kafka中固定了的 String[] words = sentence.split(" "); // 将语句按照空格进行切割 for (String word: words) { this.collector.emit(new Values(word)); // 将切割之后的每一个单词作为Tuple的value输出到下一个bolt中 } this.collector.ack(tuple); // 表示成功处理kafka-spout输出的消息,需要应答,要不然,kafka-spout会不断的重复发送消息 } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); // 输出Tuple的key } }
d. KafkaWordCountBolt
package com.shaonaiyi.kafka; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.HashMap; import java.util.Map; /** * @author: shaonaiyi * @createTime: 2019/07/14 13:42 * @description: 单词计数bolt */ public class KafkaWordCountBolt extends BaseRichBolt { private OutputCollector collector; private HashMap<String, Long> counts = null; // 用于统计每隔单词的计数 @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.counts = new HashMap<String, Long>(); } @Override public void execute(Tuple tuple) { // 实时接收SplitSentenceBolt中输出的Tuple流 String word = tuple.getStringByField("word"); // 根据key获取Tuple中的单词 // 统计每一个单词总共出现的次数 Long count = counts.getOrDefault(word, 0L); count++; this.counts.put(word, count); // 将每一个单词以及这个单词出现的次数作为Tuple中的value输出到下一个bolt中 this.collector.emit(new Values(word, count.toString())); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { // 输出Tuple的key,有两个key,是因为每次输出的value也有两个 outputFieldsDeclarer.declare(new Fields("key", "message")); } }
e. WordCountKafkaTopology
package com.shaonaiyi.kafka; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.kafka.bolt.KafkaBolt; import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import java.util.Arrays; import java.util.Properties; /** * @author: shaonaiyi * @createTime: 2019/07/15 22:54 * @description: Kafka之WordCountTopology */ public class WordCountKafkaTopology { private static final String SENTENCE_SPOUT_ID = "sentence-spout"; private static final String SPLIT_BOLT_ID = "split-bolt"; private static final String COUNT_BOLT_ID = "count-bolt"; private static final String KAFKA_BOLT_ID = "kafka-bolt"; private static final String TOPOLOGY_NAME = "word-count-topology"; public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { int workers = Integer.parseInt(args[0]); // 从Kafka中消费数据 KafkaSpout kafkaSpout = new KafkaSpoutBuilder() .brokers(Arrays.asList("master:9092")) .topic("word-count-input") .build(); KafkaSplitSentenceBolt splitSentenceBolt = new KafkaSplitSentenceBolt(); KafkaWordCountBolt wordCountBolt = new KafkaWordCountBolt(); Properties props = new Properties(); props.put("bootstrap.servers", "master:9092"); // 此配置是表明当一次produce请求被认为完成时的确认值。 // 特别是,多少个其他brokers必须已经提交了数据到他们的log并且向他们的leader确认了这些信息。典型的值包括: // 0: 表示producer从来不等待来自broker的确认信息(和0.7一样的行为)。 // 这个选择提供了最小的时延但同时风险最大(因为当server宕机时,数据将会丢失)。 // 1:表示获得leader replica已经接收了数据的确认信息。这个选择时延较小同时确保了server确认接收成功。 // -1:producer会获得所有同步replicas都收到数据的确认 props.put("acks", "1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaBolt kafkaBolt = new KafkaBolt() .withProducerProperties(props) .withTopicSelector(new DefaultTopicSelector("word-count-output")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, kafkaSpout); builder.setBolt(SPLIT_BOLT_ID, splitSentenceBolt).shuffleGrouping(SENTENCE_SPOUT_ID); builder.setBolt(COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); builder.setBolt(KAFKA_BOLT_ID, kafkaBolt).shuffleGrouping(COUNT_BOLT_ID); // 3、提交Topology Config config = new Config(); // 用来配置Topology运行时行为,对Topology所有组件全局生效的配置参数集合 config.setNumWorkers(workers); StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); // 提交Topology } }0x03 校验结果
a. 打包
b. 上传到集群
此步骤与教程:实时流处理框架之Storm的安装与部署
=>
0x03 启动并校验Storm 步骤一样
即:
a. 启动集群上的三台Zookeeper(查看进程是否存在,如果Kafka已经启动,应该还有Kafka的进程)
b. 启动Storm
在master上启动Nimbus和Web UIcd ~/bigdata/apache-storm-1.2.2
nohup bin/storm nimbus 2>&1 &
然后回车,切换终端2,执行:nohup bin/storm ui 2>&1 &
然后回车
在slave1和slave2上启动Supervisorcd ~/bigdata/apache-storm-1.2.2
nohup bin/storm supervisor 2>&1 &
a. 执行Storm作业
~/bigdata/apache-storm-1.2.2/bin/storm jar /home/hadoop-sny/jar/stormlearning-1.0-SNAPSHOT-jar-with-dependencies.jar com.shaonaiyi.kafka.WordCountKafkaTopology 1
b. 查看Web UI界面(master:8080
)
a. 目前各节点的进程情况
b. 发送消息到Kafka
c. 查看消费者信息
d. 查看Storm的Web UI界面
作者简介:邵奈一
全栈工程师、市场洞察者、专栏编辑
| 公众号 | 微信 | 微博 | CSDN | 简书 |
福利:
邵奈一的技术博客导航
邵奈一 原创不易,如转载请标明出处。