Trident中 FixedBatchSpout分析

FixedBatchSpout 继承自 IBatchSpout

IBatchSpout 方法

public interface IBatchSpout extends Serializable {
    void open(Map conf, TopologyContext context);
    void emitBatch(long batchId, TridentCollector collector);
    void ack(long batchId);
    void close();
    Map getComponentConfiguration();
    Fields getOutputFields();
}
 FixedBatchSpout代码

package storm.trident.testing;

import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;


public class FixedBatchSpout implements IBatchSpout {

    Fields fields;
    List<Object>[] outputs;
    int maxBatchSize;
    
    public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {
        this.fields = fields; // 输出字段
        this.outputs = outputs;  // 保存至本地, 每个对象都是一个List<Object>
        this.maxBatchSize = maxBatchSize; //  该批次最大发射次数,但是不是唯一决定元素
    }
    
    int index = 0;
    boolean cycle = false;
    
    public void setCycle(boolean cycle) {
        this.cycle = cycle;
    }
    
    @Override
    public void open(Map conf, TopologyContext context) {
        index = 0;
    }

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        //Utils.sleep(2000);
        if(index>=outputs.length && cycle) { 
            index = 0;  // 超过下标后,让index归零, 继续循环发送
        }

       //  在不超过outputs大小的情况下,每次发射一个List<Object>
        for(int i=0; index < outputs.length && i < maxBatchSize; index++, i++) {
            collector.emit(outputs[index]);
        }
    }

    @Override
    public void ack(long batchId) {
        
    }

    @Override
    public void close() {
    }

    @Override
    public Map getComponentConfiguration() {
        Config conf = new Config();
        conf.setMaxTaskParallelism(1); // 最大并行度,默认是1. 好像没提供接口来修改, 很奇怪。
        return conf;
    }

    @Override
    public Fields getOutputFields() {
        return fields ;  // 输出字段
    }   
}

 外部使用

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1,
                new Values("ab ab ab ab ab ab ab ab ab ab"));  // 这里设置为1,表示每批只发送一个List<Value>,但是设置更大,也不会出错,参见上面的代码注释,它要同时满足不超过数组大小,所以不会越界。
         spout.setCycle(true);  // 设置则表示会一直发送,如果不用它一直发射, 可以注释掉。

其他就是trident内部调用。

如分析有误,请指出,谢谢。。

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。