2021SC@SDUSC
2021SC@SDUSC
本文主要介绍一下Bolt输出收集器,Bolt处理好的消息都是通过输出收集器发送出去的,不同类型的Bolt所使用的输出收集器也是不同的。
上一篇文章中介绍了几个bolt端口,它们分别使用的输出收集器如下:
IRichBolt:
它使用OutputCollector输出收集器,该收集器实现IOutputCollector接口,实际上是一个代理类。
IBasicBolt:
它使用BasicOutputCollector输出收集器,该收集器实际上是OutputCollector
的封装类,实现的是IBasicOutputCollector接口。
IBatchBolt:
它使用BatchOutputCollector输出收集器,该收集器是一个虚基类,Storm提
供了它的默认实现类BatchOutputCollectorImpl , 这个类实际上也是通过封装OutputCollector类来实现消息发送的。
public interface IOutputCollector extends IErrorReporter { /** * Returns the task ids that received the tuples. */ List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple); void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple); void ack(Tuple input); void fail(Tuple input); void resetTimeout(Tuple input); void flush(); }
接口IErrorReporter定义了reportError方法,其输入为一个Throwable对象,用户可以在该方法中处理异常。而接口IOutputCollector扩展了接口IErrorReporter,并且定义了如上的一些基本方法。
emit方法:
用来向外发送数据,它的返回值是该消息所有发送目标的Taskld集合,其输入参数的含义如下所示:
streamld:消息将被输出到的流。
anchors:输出消息的标记,通常代表该条消息是由哪些消息产生的,主要用于消息的Ack系统。
tuple:要输出的消息,为一个Object列表。
emitDirect方法:
其输入列表与emit方法相似,主要区别在于,emitDirect发送的消息只有
指定的Task才可以接收。这个方法要求streamld对应的流必须被定义为直接流,同时接收端的Task必须通过直接分组的方式来接收消息,否则会抛出异常。如果没有下游节点接收该消息,那么此类消息其实也就没有被真正发送。
fail和ack方法:
用来表示消息是否被成功处理。
Storm提供了IOutputCollector接口的默认实现类OutputCollector,它实际上也是一个代理。它包含一个真正工作的IOutputCollector实例,这个对象是在Clojure代码中定义的。OutputCollector主要用于从IRichBolt向外发送数据。在OutputCollector的实现中,所有操作都由代理对象完成。
package org.apache.storm.topology; import java.util.List; import org.apache.storm.task.IErrorReporter; import org.apache.storm.tuple.Tuple; public interface IBasicOutputCollector extends IErrorReporter { List<Integer> emit(String streamId, List<Object> tuple); void emitDirect(int taskId, String streamId, List<Object> tuple); void resetTimeout(Tuple tuple); }
如果使用IBasicBolt, Storm框架会自动帮用户进行Ack、Fail和Anchor操作。
public abstract class BatchOutputCollector { public List<Integer> emit(List<Object> tuple) { return emit(Utils.DEFAULT_STREAM_ID, tuple); } public abstract List<Integer> emit(String streamId, List<Object> tuple); public void emitDirect(int taskId, List<Object> tuple) { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple); } public abstract void emitDirect(int taskId, String streamId, List<Object> tuple); public abstract void flush(); public abstract void reportError(Throwable error); }
BatchOutputCollector是Storm中用于数据批处理的输出收集器。它的方法跟IBasicOutputCollector中定义的接口方法基本一致。Storm提供了BatchOutputCollector的默认实现类BatchOutputCollectorlmpl,它实际上是一个代理类,内部封装了OutputCollector变量,所有的方法都通过调用OutputCollector的方法来实现。
通过了解bolt输出收集器,可以更好的了解bolt接口,对此总结一下:
IRichBolt:Storm中最常用来定义Topology组件的接口。它十分灵活,用户可以通过其实现各种控制逻辑,并且能控制何时进行Ack、 Fail和Anchor操作。
IBasicBolt:Storm中提供的定义简单逻辑的Topology组件接口。对于这种Bolt, Storm内置实现了Ack、Fail和Anchor的机制,用户基于它实现自己的Bolt也比较简单。但是它的使用是有限制的,基于收到的某条消息衍生出来的所有消息必须在一次execute中发送出(或者需要对消息进行缓存并且编号),否则内置的Ack机制将不能保证Bolt的正常工作。所以,用户应该避免使用该类型的Bolt来做诸如聚集或者连接的操作。
IBatchBolt:它是Storm提供的用来处理批量数据的接口。目前,它只用于事务Topology中,它是Storm实现事务Topology的基础