生成bolt的名称。
private static Map<Group, String> genBoltIds(Collection<Group> groups) { Map<Group, String> ret = new HashMap<>(); int ctr = 0; for(Group g: groups) { if(!isSpoutGroup(g)) { List<String> name = new ArrayList<>(); name.add("b"); name.add("" + ctr); String groupName = getGroupName(g); if(groupName!=null && !groupName.isEmpty()) { name.add(getGroupName(g)); } ret.put(g, Utils.join(name, "-")); ctr++; } } return ret; } private static String getGroupName(Group g) { TreeMap<Integer, String> sortedNames = new TreeMap<>(); for(Node n: g.nodes) { if(n.name!=null) { sortedNames.put(n.creationIndex, n.name); } } List<String> names = new ArrayList<>(); String prevName = null; for(String n: sortedNames.values()) { if(prevName==null || !n.equals(prevName)) { prevName = n; names.add(n); } } return Utils.join(names, "-"); }
genBoltIds用于为bolt生成一个唯一的id,它使用字母b开头,然后是一个数字id,接着是group的名,然后是第2个id, 第2个group的名,依次下去。而group的名称是由这个group包含的Node名称组成的。
protected Stream addSourcedNode(Stream source, Node newNode) { return addSourcedNode(Arrays.asList(source), newNode); } protected Stream addSourcedNode(List<Stream> sources, Node newNode) { registerSourcedNode(sources, newNode); return new Stream(this, newNode.name, newNode); }
创建一个新节点,指定新节点的父节点(可能多个)。指定多个sources的情况只在merge()方法中被调用multiReduce()时调用。因此这里只关注一个source的情形。
protected void registerSourcedNode(List<Stream> sources, Node newNode) { registerNode(newNode); int streamIndex = 0; for(Stream s: sources) { _graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex)); streamIndex++; } }
除了注册新节点 registerNode(newNode)以外,还在每个stream和节点间创建一条边。
protected void registerNode(Node n) { _graph.addVertex(n); if(n.stateInfo!=null) { String id = n.stateInfo.id; if(!_colocate.containsKey(id)) { _colocate.put(id, new ArrayList()); } _colocate.get(id).add(n); } }
向图中添加一个节点。如果节点中的stateInfo成员不为空,则将该节点放入与存储序号(StateId)相对应的哈希表_colocate中。_colocate变量将所有访问同一存储的节点关联在一起,并将他们放在一个Bolt中执行。
protected Stream addNode(Node n) { registerNode(n); return new Stream(this, n.name, n); }
这个方法比较简单,它只在newStream()及newDRPCStream中调用,这是用于提供一个新的数据源的。而上面的addSourceNode()是用于在bolt中添加下一个处理节点的。
Map<GlobalStreamId, String> _batchIds = new HashMap(); Map<String, TransactionalSpoutComponent> _spouts = new HashMap(); public SpoutDeclarer setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) { Map<String, String> batchGroups = new HashMap(); batchGroups.put(streamName, batchGroup); markBatchGroups(id, batchGroups); TransactionalSpoutComponent c = new TransactionalSpoutComponent(spout, streamName, parallelism, txStateId, batchGroup); _spouts.put(id, c); return new SpoutDeclarerImpl(c); } private void markBatchGroups(String component, Map<String, String> batchGroups) { for(Map.Entry<String, String> entry: batchGroups.entrySet()) { _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue()); } }
调用了markBatchGroups,将新的component添加到_batchIds中,同时也添加到_spouts中。
Map<GlobalStreamId, String> _batchIds = new HashMap(); Map<String, Component> _bolts = new HashMap(); // map from stream name to batch id public BoltDeclarer setBolt(String id, ITridentBatchBolt bolt, Integer parallelism, Set<String> committerBatches, Map<String, String> batchGroups) { markBatchGroups(id, batchGroups); Component c = new Component(bolt, parallelism, committerBatches); _bolts.put(id, c); return new BoltDeclarerImpl(c); } private void markBatchGroups(String component, Map<String, String> batchGroups) { for(Map.Entry<String, String> entry: batchGroups.entrySet()) { _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue()); } }
这里调用了markBatchGroups将新的component添加到_batchIds中,同时也添加到_bolts中;对于trident来说,就是一系列的ProcessorNode(可能也会有PartitionNode)。
Map<String, List<String>> batchesToCommitIds = new HashMap<>(); Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>(); for(String id: _spouts.keySet()) { TransactionalSpoutComponent c = _spouts.get(id); if(c.spout instanceof IRichSpout) { //TODO: wrap this to set the stream name builder.setSpout(id, (IRichSpout) c.spout, c.parallelism); } else { String batchGroup = c.batchGroupId; if(!batchesToCommitIds.containsKey(batchGroup)) { batchesToCommitIds.put(batchGroup, new ArrayList<String>()); } batchesToCommitIds.get(batchGroup).add(c.commitStateId); if(!batchesToSpouts.containsKey(batchGroup)) { batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>()); } batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout); BoltDeclarer scd = builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout)) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID); for(Map<String, Object> m: c.componentConfs) { scd.addConfigurations(m); } Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap(); specs.put(c.batchGroupId, new CoordSpec()); BoltDeclarer bd = builder.setBolt(id, new TridentBoltExecutor( new TridentSpoutExecutor( c.commitStateId, c.streamName, ((ITridentSpout) c.spout)), batchIdsForSpouts, specs), c.parallelism); bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID); bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID); if(c.spout instanceof ICommitterTridentSpout) { bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID); } for(Map<String, Object> m: c.componentConfs) { bd.addConfigurations(m); } } } //...... Number onHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); Number offHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); Number cpuLoad = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); for(String batch: batchesToCommitIds.keySet()) { List<String> commitIds = batchesToCommitIds.get(batch); SpoutDeclarer masterCoord = builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch))); if(onHeap != null) { if(offHeap != null) { masterCoord.setMemoryLoad(onHeap, offHeap); } else { masterCoord.setMemoryLoad(onHeap); } } if(cpuLoad != null) { masterCoord.setCPULoad(cpuLoad); } } for(String id: _bolts.keySet()) { Component c = _bolts.get(id); Map<String, CoordSpec> specs = new HashMap<>(); for(GlobalStreamId s: getBoltSubscriptionStreams(id)) { String batch = batchIdsForBolts.get(s); if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec()); CoordSpec spec = specs.get(batch); CoordType ct; if(_batchPerTupleSpouts.containsKey(s.get_componentId())) { ct = CoordType.single(); } else { ct = CoordType.all(); } spec.coords.put(s.get_componentId(), ct); } for(String b: c.committerBatches) { specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID); } BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism); for(Map<String, Object> conf: c.componentConfs) { d.addConfigurations(conf); } for(InputDeclaration inputDecl: c.declarations) { inputDecl.declare(d); } Map<String, Set<String>> batchToComponents = getBoltBatchToComponentSubscriptions(id); for(Map.Entry<String, Set<String>> entry: batchToComponents.entrySet()) { for(String comp: entry.getValue()) { d.directGrouping(comp, TridentBoltExecutor.COORD_STREAM(entry.getKey())); } } for(String b: c.committerBatches) { d.allGrouping(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID); } } return builder.createTopology(); }
对于bolt来说(包装了ProcessorNode的SubtopologyBolt),这里设置了TridentBoltExecutor这个bolt,它directGrouping了TridentBoltExecutor.COORD_STREAM($coord-)
同时还allGrouping了MasterBatchCoordinator.COMMIT_STREAM_ID($commit)。