2021SC@SDUSC
2021SC@SDUSC
SubTopologyBolt类型为Trident中运行的基本单位,但它并不是真正的Bolt节点,Trident会利用TridentBoltExecutor对SubTopologyBolt进行接口适配。
TridentBoltExecutor继承自IRichBolt接口,是Trident中真正运行的Bolt节点。它提供了类似于协调Bolt ( CoordinatedBolt )节点的功能,通过发送协调消息来对各个节点进行同步。
SubTopologyBolt主要用于对TridentProcessor的执行进行抽象。本篇文章将讨论SubTopologyBolt和TridentBoltExecutor的实现。
public class SubtopologyBolt implements ITridentBatchBolt { private static final long serialVersionUID = 1475508603138688412L; @SuppressWarnings("rawtypes") final DirectedGraph<Node, IndexedEdge> graph; final Set<Node> nodes; final Map<String, InitialReceiver> roots = new HashMap<>(); final Map<Node, Factory> outputFactories = new HashMap<>(); final Map<String, List<TridentProcessor>> myTopologicallyOrdered = new HashMap<>(); final Map<Node, String> batchGroups;
graph:整个Topology所对应的有向图。
nodes:该Bolt中所包含的处理节点。_nodesS_graph节点的子集。
roots:每种类型的输人流都会对应一个lnitalReceiver对象,用于表示如何处理该流的消息。
outputFactories:每个处理节点都会对应一个输出的工厂。 myTopologicallyOrdered:它的键为节点组序号,值为节点组所对应的TridentProcessor。
batchGroups:该变量保存了反向的索引,用来表示每个节点属于哪一个节点组。
BatchGroup对应于graph中的一个最大连通子图。
public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector batchCollector) { int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size(); for (Node n : nodes) { if (n.stateInfo != null) { State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks); context.setTaskData(n.stateInfo.id, s); } } DirectedSubgraph<Node, ?> subgraph = new DirectedSubgraph<>(graph, nodes, null); TopologicalOrderIterator<Node, ?> it = new TopologicalOrderIterator<>(subgraph); int stateIndex = 0; while (it.hasNext()) { Node n = it.next(); if (n instanceof ProcessorNode) { ProcessorNode pn = (ProcessorNode) n; String batchGroup = batchGroups.get(n); if (!myTopologicallyOrdered.containsKey(batchGroup)) { myTopologicallyOrdered.put(batchGroup, new ArrayList<>()); } myTopologicallyOrdered.get(batchGroup).add(pn.processor); List<String> parentStreams = new ArrayList<>(); List<Factory> parentFactories = new ArrayList<>(); for (Node p : TridentUtils.getParents(graph, n)) { parentStreams.add(p.streamId); if (nodes.contains(p)) { parentFactories.add(outputFactories.get(p)); } else { if (!roots.containsKey(p.streamId)) { roots.put(p.streamId, new InitialReceiver(p.streamId, getSourceOutputFields(context, p.streamId))); } roots.get(p.streamId).addReceiver(pn.processor); parentFactories.add(roots.get(p.streamId).getOutputFactory()); } } List<TupleReceiver> targets = new ArrayList<>(); boolean outgoingNode = false; for (Node cn : TridentUtils.getChildren(graph, n)) { if (nodes.contains(cn)) { targets.add(((ProcessorNode) cn).processor); } else { outgoingNode = true; } } if (outgoingNode) { targets.add(new BridgeReceiver(batchCollector)); } TridentContext triContext = new TridentContext( pn.selfOutFields, parentFactories, parentStreams, targets, pn.streamId, stateIndex, batchCollector ); pn.processor.prepare(conf, context, triContext); outputFactories.put(n, pn.processor.getOutputFactory()); } stateIndex++; } }
for (Node n : nodes):
在此循环中判断节点的statelnfo是否为空,并对存储的State对象进行初始化。初始化过后的State对象被存储于TopologyContext的taskData中,并以statelnfo.id为键。statelnfo.id是一个以串 “ state” 为前缀的在Topology中唯一的字符串。
subgraph:
根据SubTopologyBolt中包含的节点获得一个子图。
it:
对子图进行拓扑排序,TopologicalOrderlterator类型的it变量用来按照拓扑排序的顺序遍历子图。
if (n instanceof ProcessorNode):
SubTopologyBolt只对处理节点进行操作。处理节点中包含了一个TridentProcessor。Spout节点和分区节点则不在SubTopologyBolt的处理范围之内。
pn.processor.prepare(conf, context, triContext:
调用该TridentProcessor的prepare方法,它将新产生的TridentContext对象作为构造函数的一个参数传入。
outputFactories.put(n, pn.processor.getOutputFactory()):
将该TridentProcessor所对应的输出添加到SubTopologyBolt的输出中。于是该输出便可被其他的SubTopologyBolt作为输入了。
statelndex变量:
用于唯一地标识SubTopologyBolt中的每一个节点。
public void execute(BatchInfo batchInfo, Tuple tuple) { String sourceStream = tuple.getSourceStreamId(); InitialReceiver ir = roots.get(sourceStream); if (ir == null) { throw new RuntimeException("Received unexpected tuple " + tuple.toString()); } ir.receive((ProcessorContext) batchInfo.state, tuple); }
首先,根据输人消息的流号,在roots中找到对应的InitialReceiver对象,并调用其receive方法。
所有等待该流消息的TridentProcessor的execute方法均会被调用。
在某个TridentProcessor的execute方法中,下游TridentProcessor的execute方法也会被依次调用到,于是构成了一个调用链,直到SubTopologyBolt完成对该消息的处理后结束。
public void finishBatch(BatchInfo batchInfo) { for (TridentProcessor p : myTopologicallyOrdered.get(batchInfo.batchGroup)) { p.finishBatch((ProcessorContext) batchInfo.state); } } @Override public Object initBatchState(String batchGroup, Object batchId) { ProcessorContext ret = new ProcessorContext(batchId, new Object[nodes.size()]); for (TridentProcessor p : myTopologicallyOrdered.get(batchGroup)) { p.startBatch(ret); } return ret; } public void declareOutputFields(OutputFieldsDeclarer declarer) { for (Node n : nodes) { declarer.declareStream(n.streamId, TridentUtils.fieldsConcat(new Fields("$batchId"), n.allOutputFields)); } }
在initBatchState方法中,会对ProcessorContext的数据进行初始化,然后返回ProcessorContext对象。Trident中的聚集器均要基于ProcessorContext中state所存储的数据来实现。
declareOutputFields方法会对SubTopologyBolt中每一个节点的输出进行声明,它将$batchid作为第1列。可以看出SubTopologyBolt虽然作为一个整体而存在,可其中每一个节点的输出均可能成为最终的输出。