在本章中,我们将学习如何将Kafka与Apache Storm集成。
Storm最初是由Nathan Marz和BackType团队创建的。 在很短的时间内,Apache Storm成为分布式实时处理系统的标准,用于处理大数据。 Storm速度非常快,每个节点每秒处理超过一百万个元组的基准时钟。 Apache Storm持续运行,从配置的源(Spouts)中消耗数据并将数据传递到处理管道(Bolts)。 组合 Spouts 和 Bolts 构成一个拓扑。
Kafka和Storm自然而然地相互补充,它们强大的合作能够实现快速移动大数据的实时流式分析。 Kafka和Storm的整合使得开发者更容易从Storm拓扑中获取和发布数据流。
概念流程
喷口(spout)是流的来源。 例如,spout可能会读取卡夫卡主题中的元组并将其作为流发送。 Bolts消耗输入流,处理并可能发射新的流。 Bolts可以做任何事情,从运行功能,过滤元组,流聚合,流式连接,与数据库交互等等。 Storm拓扑中的每个节点并行执行。 一个拓扑无限期地运行,直到终止它。 Storm会自动重新分配任何失败的任务。 此外,即使机器停机并且信息丢失,Storm也可以保证不会丢失数据。
下面来看看Kafka-Storm集成API。 有三个主要类将Kafka和Storm结合在一起。 他们如下 -
BrokerHosts - ZkHosts&StaticHosts
BrokerHosts
是一个接口,ZkHosts
和StaticHosts
是它的两个主要实现。 ZkHosts用于通过在ZooKeeper中维护详细信息来动态跟踪Kafka经纪人,而StaticHosts
用于手动/静态设置Kafka经纪人及其详细信息。 ZkHosts是访问Kafka经纪人的简单而快捷的方式。
ZkHosts的签名如下 -
public ZkHosts(String brokerZkStr, String brokerZkPath) public ZkHosts(String brokerZkStr)
其中brokerZkStr
是ZooKeeper主机,brokerZkPath
是维护Kafka代理细节的ZooKeeper路径。
public KafkaConfig(BrokerHosts hosts, string topic)
参数
SpoutConfig API
Spoutconfig是KafkaConfig的扩展,支持额外的ZooKeeper信息。
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
参数
hosts
- BrokerHosts可以是BrokerHosts接口的任何实现topic
- 主题名称。zkRoot
- ZooKeeper根路径。id
- spout
存储在Zookeeper中消耗的偏移量的状态。该ID应该唯一标识的spout。SchemeAsMultiScheme
SchemeAsMultiScheme是一个接口,它规定了从Kafka消耗的ByteBuffer如何转换为 storm 元组。它来自MultiScheme并接受Scheme类的实现。Scheme类有很多实现,一个这样的实现是StringScheme,它将字节解析为一个简单的字符串。 它还控制输出字段的命名。 签名定义如下。
public SchemeAsMultiScheme(Scheme scheme)
参数
scheme
- 从kafka消耗的字节缓冲区。KafkaSpout API
KafkaSpout
是spout实现,它将与Storm整合。 它从kafka主题获取消息并将其作为元组发送到Storm生态系统中。 KafkaSpout从SpoutConfig获取配置细节。
以下是创建一个简单的kafka spout的示例代码。
// ZooKeeper connection string BrokerHosts hosts = new ZkHosts(zkConnString); //Creating SpoutConfig Object SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName UUID.randomUUID().toString()); //convert the ByteBuffer to String. spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); //Assign SpoutConfig to KafkaSpout. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Bolt是一个将元组作为输入,处理元组并生成新的元组作为输出的组件。 Bolts将实现IRichBolt
接口。 在这个程序中,使用两个类 - WordSplitter-Bolt
和WordCounterBolt
来执行操作。
IRichBolt
接口有以下方法 -
prepare
- 为 bolt 提供执行的环境。 执行者将运行此方法来初始化spout
。prepare
- 处理输入的单个元组。prepare
- 当bolt即将关闭时调用。declareOutputFields
- 声明元组的输出模式。下面创建一个Java文件:SplitBolt.java
,它实现了将句子分成单词;CountBolt.java
它实现了逻辑来分离唯一的单词并计算它的出现次数。
SplitBolt.java
import java.util.Map; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.TopologyContext; public class SplitBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String sentence = input.getString(0); String[] words = sentence.split(" "); for(String word: words) { word = word.trim(); if(!word.isEmpty()) { word = word.toLowerCase(); collector.emit(new Values(word)); } } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public void cleanup() {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
文件:CountBolt.java -
import java.util.Map; import java.util.HashMap; import backtype.storm.tuple.Tuple; import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.TopologyContext; public class CountBolt implements IRichBolt{ Map<String, Integer> counters; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.counters = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple input) { String str = input.getString(0); if(!counters.containsKey(str)){ counters.put(str, 1); }else { Integer c = counters.get(str) +1; counters.put(str, c); } collector.ack(input); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counters.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Storm拓扑基本上是一个Thrift结构。 TopologyBuilder类提供了简单而简单的方法来创建复杂的拓扑。 TopologyBuilder类具有设置spout (setSpout)和设置bolt(setBolt)的方法。 最后,TopologyBuilder使用createTopology()
来创建拓朴学。 shuffleGrouping
和fieldsGrouping
方法有助于设置spout
和bolt
的流分组。
本地群集 - 出于开发目的,我们可以使用LocalCluster对象创建本地群集,然后使用LocalCluster类的submitTopology方法提交拓扑。
文件:KafkaStormSample.java -
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import java.util.ArrayList; import java.util.List; import java.util.UUID; import backtype.storm.spout.SchemeAsMultiScheme; import storm.kafka.trident.GlobalPartitionInformation; import storm.kafka.ZkHosts; import storm.kafka.Broker; import storm.kafka.StaticHosts; import storm.kafka.BrokerHosts; import storm.kafka.SpoutConfig; import storm.kafka.KafkaConfig; import storm.kafka.KafkaSpout; import storm.kafka.StringScheme; public class KafkaStormSample { public static void main(String[] args) throws Exception{ Config config = new Config(); config.setDebug(true); config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); String zkConnString = "localhost:2181"; String topic = "my-first-topic"; BrokerHosts hosts = new ZkHosts(zkConnString); SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic, UUID.randomUUID().toString()); kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4; kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4; kafkaSpoutConfig.forceFromStart = true; kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig)); builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout"); builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("KafkaStormSample", config, builder.create-Topology()); Thread.sleep(10000); cluster.shutdown(); } }
在移动编译之前,Kakfa-Storm集成需要馆长ZooKeeper客户端java库。 ZooKeeper 版本2.9.1支持Apache Storm 0.9.5版本(在本教程中使用)。 下载下面指定的jar文件并将其放在java类路径中。
curator-client-2.9.1.jar
curator-framework-2.9.1.jar
在包含依赖文件后,使用以下命令编译程序,
javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java
执行
启动Kafka Producer CLI(在上一章中介绍),创建一个名为my-first-topic
的新主题,并提供一些示例消息,如下所示 -
hello kafka storm spark test message another test message
现在使用以下命令执行应用程序 -
java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample
此应用程序的输出示例如下所示 -
storm : 1 test : 2 spark : 1 another : 1 kafka : 1 hello : 1 message : 2