JStorm之NimbusServer启动流程
1、清理无效topology
2、建立zk连接并创建相应znode
3、启动监控线程
4、启动httpserver
启动主函数如下:
public static void main(String[] args) throws Exception { // 读取配置文件,如果我们在JVM启动参数中包含了storm.conf.file系统属性 //则会直接读取参数指定的文件,否则会找strom.yaml,这个一个yaml格式的文件 //读取出的配置信息最终放入Map中 @SuppressWarnings("rawtypes") Map config = Utils.readStormConfig(); NimbusServer instance = new NimbusServer(); //处理资源分配的工具 INimbus iNimbus = new DefaultInimbus(); //主要启动流程包含此函数中 instance.launchServer(config, iNimbus); }nimbus启动的主方法如下:
private void launchServer(final Map conf, INimbus inimbus) { LOG.info("Begin to start nimbus with conf " + conf); try { // 检测集群配置是本地模式还是集群模式 StormConfig.validate_distributed_mode(conf); //清理pid目录,由参数storm.local.dir指定,该过程包含如下操作 //1、创建本地进程的pid文件 //2、删除历史pid文件,比如上次进程直接被kill createPid(conf); //初始化关闭钩子线程:里面包含各种清理操作,可参考cleanup方法 initShutdownHook(); //初始化ininumbus目录 inimbus.prepare(conf, StormConfig.masterInimbus(conf)); //创建数据容器,存放各种统计信息例如提交此俗、集群状态、启动时间,参考下面NimbusData类成员 data = createNimbusData(conf, inimbus); //启动fllower线程不断更新zk状态,如果发现自己不是leader则会关闭进程 initFollowerThread(conf); //启动http服务器 int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf); hs = new Httpserver(port, conf); hs.start(); //如果不是运行在yarn类似的容器中,不用关心此方法 initContainerHBThread(conf); while (!data.isLeader()) Utils.sleep(5000); //初始换监控线程 initUploadMetricThread(data); //进行初始化操作,该方法会清理zk中残留的topology信息,初始化TopologyAssign线程负责分配任务 init(conf); } catch (Throwable e) { LOG.error("Fail to run nimbus ", e); } finally { cleanup(); } LOG.info("Quit nimbus"); }NimbusServer的init方法如下:
private void init(Map conf) throws Exception { NimbusUtils.cleanupCorruptTopologies(data); //初始化TopologyAssign线程 initTopologyAssign(); initTopologyStatus(); //jar文件清理线程,默认每10分钟会清理一次,清理目录:$LOCAL-DIR/nimbus/inbox initCleaner(conf); serviceHandler = new ServiceHandler(data); if (!data.isLocalMode()) { //监控各task运行状态,发现dead状态的则会清理掉释放资源 initMonitor(conf); //启动Thrift server initThrift(conf); } }最后附上NimbusData的数据结构:
public class NimbusData { private Map<Object, Object> conf; private StormClusterState stormClusterState; // Map<topologyId, Map<taskid, TkHbCacheTime>> private ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache; // TODO two kind of value:Channel/BufferFileInputStream private TimeCacheMap<Object, Object> downloaders; private TimeCacheMap<Object, Object> uploaders; private int startTime; private final ScheduledExecutorService scheduExec; private AtomicInteger submittedCount; private Object submitLock = new Object(); private StatusTransition statusTransition; private static final int SCHEDULE_THREAD_NUM = 8; private final INimbus inimubs; private Map<String, Map<String, Map<ThriftResourceType, Integer>>> groupToTopology; private Map<String, Map<ThriftResourceType, Integer>> groupToResource; private Map<String, Map<ThriftResourceType, Integer>> groupToUsedResource; private final boolean localMode; private volatile boolean isLeader; ........... }
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。