优秀的编程知识分享平台

网站首页 > 技术文章 正文

聊聊storm的JoinBolt(stormst)

nanyue 2024-07-18 22:29:41 技术文章 6 ℃

本文主要研究一下storm的JoinBolt

实例

 @Test
 public void testJoinBolt() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout("uuid-spout", new RandomWordSpout(new String[]{"uuid", "timestamp"}), 1);
 builder.setSpout("word-spout", new RandomWordSpout(new String[]{"word", "timestamp"}), 1);
 JoinBolt joinBolt = new JoinBolt("uuid-spout", "timestamp")
 //from priorStream inner join newStream on newStream.field = priorStream.field1
 .join("word-spout", "timestamp", "uuid-spout")
 .select("uuid,word,timestamp")
 .withTumblingWindow(BaseWindowedBolt.Count.of(10));
 builder.setBolt("join", joinBolt,1)
 .fieldsGrouping("uuid-spout",new Fields("timestamp"))
 .fieldsGrouping("word-spout",new Fields("timestamp"));
 builder.setBolt("fileWriter",new FilePrinterBolt(),1).globalGrouping("join");
 SubmitHelper.submitRemote("windowTopology",builder.createTopology());
 }

JoinBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

public class JoinBolt extends BaseWindowedBolt {
 protected final Selector selectorType;
 // Map[StreamName -> JoinInfo]
 protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>();
 protected FieldSelector[] outputFields; // specified via bolt.select() ... used in declaring Output fields
 // protected String[] dotSeparatedOutputFieldNames; // fieldNames in x.y.z format w/o stream name, used for naming output fields
 protected String outputStreamName;
 // Map[StreamName -> Map[Key -> List<Tuple>] ]
 HashMap<String, HashMap<Object, ArrayList<Tuple>>> hashedInputs = new HashMap<>(); // holds remaining streams
 private OutputCollector collector;
 /**
 * Calls JoinBolt(Selector.SOURCE, sourceId, fieldName)
 *
 * @param sourceId Id of source component (spout/bolt) from which this bolt is receiving data
 * @param fieldName the field to use for joining the stream (x.y.z format)
 */
 public JoinBolt(String sourceId, String fieldName) {
 this(Selector.SOURCE, sourceId, fieldName);
 }
 /**
 * Introduces the first stream to start the join with. Equivalent SQL ... select .... from srcOrStreamId ...
 *
 * @param type Specifies whether 'srcOrStreamId' refers to stream name/source component
 * @param srcOrStreamId name of stream OR source component
 * @param fieldName the field to use for joining the stream (x.y.z format)
 */
 public JoinBolt(Selector type, String srcOrStreamId, String fieldName) {
 selectorType = type;
 joinCriteria.put(srcOrStreamId, new JoinInfo(new FieldSelector(srcOrStreamId, fieldName)));
 }
 /**
 * Optional. Allows naming the output stream of this bolt. If not specified, the emits will happen on 'default' stream.
 */
 public JoinBolt withOutputStream(String streamName) {
 this.outputStreamName = streamName;
 return this;
 }
 /**
 * Performs inner Join with the newStream. SQL : from priorStream inner join newStream on newStream.field = priorStream.field1 same
 * as: new WindowedQueryBolt(priorStream,field1). join(newStream, field, priorStream);
 *
 * Note: priorStream must be previously joined. Valid ex: new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2); Invalid ex:
 * new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1);
 *
 * @param newStream Either stream name or name of upstream component
 * @param field the field on which to perform the join
 */
 public JoinBolt join(String newStream, String field, String priorStream) {
 return joinCommon(newStream, field, priorStream, JoinType.INNER);
 }
 /**
 * Performs left Join with the newStream. SQL : from stream1 left join stream2 on stream2.field = stream1.field1 same as: new
 * WindowedQueryBolt(stream1, field1). leftJoin(stream2, field, stream1);
 *
 * Note: priorStream must be previously joined Valid ex: new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2);
 * Invalid ex: new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1);
 *
 * @param newStream Either a name of a stream or an upstream component
 * @param field the field on which to perform the join
 */
 public JoinBolt leftJoin(String newStream, String field, String priorStream) {
 return joinCommon(newStream, field, priorStream, JoinType.LEFT);
 }
 private JoinBolt joinCommon(String newStream, String fieldDescriptor, String priorStream, JoinType joinType) {
 if (hashedInputs.containsKey(newStream)) {
 throw new IllegalArgumentException("'" + newStream + "' is already part of join. Cannot join with it more than once.");
 }
 hashedInputs.put(newStream, new HashMap<Object, ArrayList<Tuple>>());
 JoinInfo joinInfo = joinCriteria.get(priorStream);
 if (joinInfo == null) {
 throw new IllegalArgumentException("Stream '" + priorStream + "' was not previously declared");
 }
 FieldSelector field = new FieldSelector(newStream, fieldDescriptor);
 joinCriteria.put(newStream, new JoinInfo(field, priorStream, joinInfo, joinType));
 return this;
 }
 /**
 * Specify projection fields. i.e. Specifies the fields to include in the output. e.g: .select("field1, stream2:field2, field3") Nested
 * Key names are supported for nested types: e.g: .select("outerKey1.innerKey1, outerKey1.innerKey2, stream3:outerKey2.innerKey3)" Inner
 * types (non leaf) must be Map<> in order to support nested lookup using this dot notation This selected fields implicitly declare the
 * output fieldNames for the bolt based.
 *
 * @param commaSeparatedKeys
 * @return
 */
 public JoinBolt select(String commaSeparatedKeys) {
 String[] fieldNames = commaSeparatedKeys.split(",");
 outputFields = new FieldSelector[fieldNames.length];
 for (int i = 0; i < fieldNames.length; i++) {
 outputFields[i] = new FieldSelector(fieldNames[i]);
 }
 return this;
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
 String[] outputFieldNames = new String[outputFields.length];
 for (int i = 0; i < outputFields.length; ++i) {
 outputFieldNames[i] = outputFields[i].getOutputName();
 }
 if (outputStreamName != null) {
 declarer.declareStream(outputStreamName, new Fields(outputFieldNames));
 } else {
 declarer.declare(new Fields(outputFieldNames));
 }
 }
 @Override
 public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
 this.collector = collector;
 // initialize the hashedInputs data structure
 int i = 0;
 for (String stream : joinCriteria.keySet()) {
 if (i > 0) {
 hashedInputs.put(stream, new HashMap<Object, ArrayList<Tuple>>());
 }
 ++i;
 }
 if (outputFields == null) {
 throw new IllegalArgumentException("Must specify output fields via .select() method.");
 }
 }
 @Override
 public void execute(TupleWindow inputWindow) {
 // 1) Perform Join
 List<Tuple> currentWindow = inputWindow.get();
 JoinAccumulator joinResult = hashJoin(currentWindow);
 // 2) Emit results
 for (ResultRecord resultRecord : joinResult.getRecords()) {
 ArrayList<Object> outputTuple = resultRecord.getOutputFields();
 if (outputStreamName == null) {
 // explicit anchoring emits to corresponding input tuples only, as default window anchoring will anchor them to all
 // tuples in window
 collector.emit(resultRecord.tupleList, outputTuple);
 } else {
 // explicitly anchor emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples
 // in window
 collector.emit(outputStreamName, resultRecord.tupleList, outputTuple);
 }
 }
 }
 //......
}
  • JoinBolt继承了BaseWindowedBolt,定义了Selector selectorType、LinkedHashMap joinCriteria、FieldSelector[] outputFields等属性,用于记录关联类型及关联关系
  • join、leftJoin方法用于设置join关联关系,最后都是调用joinCommon方法,关联关系使用JoinInfo对象,存储在joinCriteria中
  • select方法用于选择结果集的列,最后设置到outputFields,用于declareOutputFields
  • execute就是join的核心逻辑了,这里调用了hashJoin

JoinBolt.hashJoin

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

 protected JoinAccumulator hashJoin(List<Tuple> tuples) {
 clearHashedInputs();
 JoinAccumulator probe = new JoinAccumulator();
 // 1) Build phase - Segregate tuples in the Window into streams.
 // First stream's tuples go into probe, rest into HashMaps in hashedInputs
 String firstStream = joinCriteria.keySet().iterator().next();
 for (Tuple tuple : tuples) {
 String streamId = getStreamSelector(tuple);
 if (!streamId.equals(firstStream)) {
 Object field = getJoinField(streamId, tuple);
 ArrayList<Tuple> recs = hashedInputs.get(streamId).get(field);
 if (recs == null) {
 recs = new ArrayList<Tuple>();
 hashedInputs.get(streamId).put(field, recs);
 }
 recs.add(tuple);
 } else {
 ResultRecord probeRecord = new ResultRecord(tuple, joinCriteria.size() == 1);
 probe.insert(probeRecord); // first stream's data goes into the probe
 }
 }
 // 2) Join the streams in order of streamJoinOrder
 int i = 0;
 for (String streamName : joinCriteria.keySet()) {
 boolean finalJoin = (i == joinCriteria.size() - 1);
 if (i > 0) {
 probe = doJoin(probe, hashedInputs.get(streamName), joinCriteria.get(streamName), finalJoin);
 }
 ++i;
 }
 return probe;
 }
  • hashJoin方法先遍历一下tuples,把tuples分为两类,firstStream的数据存到JoinAccumulator probe中,其余的存到HashMap>> hashedInputs
  • 之后对剩余的streamId,挨个遍历调用doJoin,把结果整合到JoinAccumulator probe

JoinAccumulator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

 protected class JoinAccumulator {
 ArrayList<ResultRecord> records = new ArrayList<>();
 public void insert(ResultRecord tuple) {
 records.add(tuple);
 }
 public Collection<ResultRecord> getRecords() {
 return records;
 }
 }
  • JoinAccumulator就是一个ArrayList

ResultRecord

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

 // Join helper to concat fields to the record
 protected class ResultRecord {
 ArrayList<Tuple> tupleList = new ArrayList<>(); // contains one Tuple per Stream being joined
 ArrayList<Object> outFields = null; // refs to fields that will be part of output fields
 // 'generateOutputFields' enables us to avoid projection unless it is the final stream being joined
 public ResultRecord(Tuple tuple, boolean generateOutputFields) {
 tupleList.add(tuple);
 if (generateOutputFields) {
 outFields = doProjection(tupleList, outputFields);
 }
 }
 public ResultRecord(ResultRecord lhs, Tuple rhs, boolean generateOutputFields) {
 if (lhs != null) {
 tupleList.addAll(lhs.tupleList);
 }
 if (rhs != null) {
 tupleList.add(rhs);
 }
 if (generateOutputFields) {
 outFields = doProjection(tupleList, outputFields);
 }
 }
 public ArrayList<Object> getOutputFields() {
 return outFields;
 }
 // 'stream' cannot be null,
 public Object getField(FieldSelector fieldSelector) {
 for (Tuple tuple : tupleList) {
 Object result = lookupField(fieldSelector, tuple);
 if (result != null) {
 return result;
 }
 }
 return null;
 }
 }
 // Performs projection on the tuples based on 'projectionFields'
 protected ArrayList<Object> doProjection(ArrayList<Tuple> tuples, FieldSelector[] projectionFields) {
 ArrayList<Object> result = new ArrayList<>(projectionFields.length);
 // Todo: optimize this computation... perhaps inner loop can be outside to avoid rescanning tuples
 for (int i = 0; i < projectionFields.length; i++) {
 boolean missingField = true;
 for (Tuple tuple : tuples) {
 Object field = lookupField(projectionFields[i], tuple);
 if (field != null) {
 result.add(field);
 missingField = false;
 break;
 }
 }
 if (missingField) { // add a null for missing fields (usually in case of outer joins)
 result.add(null);
 }
 }
 return result;
 }
 // Extract the field from tuple. Field may be nested field (x.y.z)
 protected Object lookupField(FieldSelector fieldSelector, Tuple tuple) {
 // very stream name matches, it stream name was specified
 if (fieldSelector.streamName != null &&
 !fieldSelector.streamName.equalsIgnoreCase(getStreamSelector(tuple))) {
 return null;
 }
 Object curr = null;
 for (int i = 0; i < fieldSelector.field.length; i++) {
 if (i == 0) {
 if (tuple.contains(fieldSelector.field[i])) {
 curr = tuple.getValueByField(fieldSelector.field[i]);
 } else {
 return null;
 }
 } else {
 curr = ((Map) curr).get(fieldSelector.field[i]);
 if (curr == null) {
 return null;
 }
 }
 }
 return curr;
 }
  • ResultRecord用于存储joined之后的数据
  • 当joinCriteria.size() == 1或者finalJoin为true的时候,ResultRecord的generateOutputFields为true,会调用doProjection对结果集进行projection操作
  • 当遍历joinCriteria调用doJoin的时候,遍历到最后一条记录时为true

JoinBolt.doJoin

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

 // Dispatches to the right join method (inner/left/right/outer) based on the joinInfo.joinType
 protected JoinAccumulator doJoin(JoinAccumulator probe, HashMap<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo,
 boolean finalJoin) {
 final JoinType joinType = joinInfo.getJoinType();
 switch (joinType) {
 case INNER:
 return doInnerJoin(probe, buildInput, joinInfo, finalJoin);
 case LEFT:
 return doLeftJoin(probe, buildInput, joinInfo, finalJoin);
 case RIGHT:
 case OUTER:
 default:
 throw new RuntimeException("Unsupported join type : " + joinType.name());
 }
 }
  • doJoin封装了各种join类型的方法,目前仅仅实现了INNER以及LEFT,分别调用doInnerJoin、doLeftJoin方法

doInnerJoin

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

 // inner join - core implementation
 protected JoinAccumulator doInnerJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo,
 boolean finalJoin) {
 String[] probeKeyName = joinInfo.getOtherField();
 JoinAccumulator result = new JoinAccumulator();
 FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
 for (ResultRecord rec : probe.getRecords()) {
 Object probeKey = rec.getField(fieldSelector);
 if (probeKey != null) {
 ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey);
 if (matchingBuildRecs != null) {
 for (Tuple matchingRec : matchingBuildRecs) {
 ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
 result.insert(mergedRecord);
 }
 }
 }
 }
 return result;
 }
  • 这里挨个对JoinAccumulator probe的records遍历,然后通过probeKey从buildInput寻找对应的records,如果有找到则进行合并

doLeftJoin

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

 // left join - core implementation
 protected JoinAccumulator doLeftJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo,
 boolean finalJoin) {
 String[] probeKeyName = joinInfo.getOtherField();
 JoinAccumulator result = new JoinAccumulator();
 FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
 for (ResultRecord rec : probe.getRecords()) {
 Object probeKey = rec.getField(fieldSelector);
 if (probeKey != null) {
 ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey); // ok if its return null
 if (matchingBuildRecs != null && !matchingBuildRecs.isEmpty()) {
 for (Tuple matchingRec : matchingBuildRecs) {
 ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
 result.insert(mergedRecord);
 }
 } else {
 ResultRecord mergedRecord = new ResultRecord(rec, null, finalJoin);
 result.insert(mergedRecord);
 }
 }
 }
 return result;
 }
  • left join与inner join的区别就在于没有找到匹配记录的话,仍旧保留左边的记录

小结

  • JoinBolt继承了BaseWindowedBolt,目前仅仅支持inner join及left join,而且要求join的字段与fieldsGrouping的字段相同
  • JoinBolt对于多个stream数据的合并,使用分治的方式实现,采用JoinAccumulator不断累加结果集,循环遍历调用doJoin来完成
  • 由于JoinBolt是在内存进行操作,又需要匹配数据,需要消耗CPU及内存,有几个点需要注意一下:
  • window的时间窗口不宜过大,否则内存堆积的数据过多,容易OOM,可根据情况调整时间窗口或者通过Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB设置woker的内存大小
  • 采取slding window会造成数据重复join,因而需要使用withTumblingWindow
  • 如果开启tuple处理超时,则要求Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS大于windowLength + slidingInterval + 处理时间,避免还没有处理完就误判为超时重新replayed
  • 由于windowedBolt会自动对tupleWindow的数据进行anchor,数据量过多anchor操作会给整个topology造成压力,如无必要可以关闭ack(设置Config.TOPOLOGY_ACKER_EXECUTORS为0)
  • Config.TOPOLOGY_MAX_SPOUT_PENDING要设置的大一点,给window的join操作及后续操作足够的时间,在一定程度上避免spout发送tuple速度过快,下游bolt消费不过来
  • 生产上Config.TOPOLOGY_DEBUG设置为false关闭debug日志,Config.TOPOLOGY_EVENTLOGGER_EXECUTORS设置为0关闭event logger
  • doc
  • Windowing Support in Core Storm
  • Joining Streams in Storm Core
最近发表
标签列表