JStorm之Supervisor启动流程

  Supervisor中文翻译是监督者,意思简单明了,就是对资源进行监控,其实主要是woker资源。该组件所做的事情概括如下:
    1、每隔一段时间发送心跳证明自己还活着
    2、下载新的topology
    3、释放无效的woker

    4、分配新的任务

技术分享

  该组件主要包含:心跳线程、supervisor事件接受线程、处理线程,一旦事件接受到则会进入任务分配环节,主要逻辑代码如下:
public static void main(String[] args) {
	Supervisor instance = new Supervisor();
	//主要初始化操作方法
	instance.run();
}
  
public void run() {
	SupervisorManger supervisorManager = null;
	try {
		//读取配置文件,和nimbus一样,不多解释
		Map<Object, Object> conf = Utils.readStormConfig();
		//获得集群模式
		StormConfig.validate_distributed_mode(conf);
		//创建pid文件
		createPid(conf);
		//supervisor会在该方法中启动
		supervisorManager = mkSupervisor(conf, null);
		JStormUtils.redirectOutput("/dev/null");

	} catch (Exception e) {
		LOG.error("Failed to start supervisor\n", e);
		System.exit(1);
	}
	while (supervisorManager.isFinishShutdown() == false) {
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {


		}
	}
}
下面看组件启动的主方法
public SupervisorManger mkSupervisor(Map conf, IContext sharedContext)
			throws Exception {
		LOG.info("Starting Supervisor with conf " + conf);
		active = new AtomicBoolean(true);
		/**
		 * Step 1: 清理临时文件:$storm.local.dir/supervisor/tmp
		 */
		String path = StormConfig.supervisorTmpDir(conf);
		FileUtils.cleanDirectory(new File(path));
		/*
		 * Step 2: 创建zk连接,并建立一系列目录:[/assignments, /tasks, /topology, /supervisors, /taskbeats, /taskerrors, /monitor]
		 * 
		 */
		StormClusterState stormClusterState = Cluster
				.mk_storm_cluster_state(conf);
		/*
		 * Step 3  创建$storm.local.dirsupervisor/localstate目录
		 *         创建以时间戳为名的版本文件:1421217778765、1421217778765.version并将supervisorid序列化到文件内
		 *         删除旧的版本文件,保留最近4个
		 */
		LocalState localState = StormConfig.supervisorState(conf);
		String supervisorId = (String) localState.get(Common.LS_ID);
		if (supervisorId == null) {
			supervisorId = UUID.randomUUID().toString();
			localState.put(Common.LS_ID, supervisorId);
		}
		Vector<SmartThread> threads = new Vector<SmartThread>();
		// Step 5 create HeartBeat
		/*
		* 创建heatbeat线程,并每隔supervisor.heartbeat.frequency.secs秒发送一次心跳,更新zk里的znode
		* znode节点为:/supervisors/supervisor-id (0add54ac-2c23-49bc-aaee-05b3cb9fef00)
		* 会更新该节点的如下信息:SupervisorInfo[hostName=rt2l02046.tbc,
		*					 										supervisorId=0add54ac-2c23-49bc-aaee-05b3cb9fef00,
		*					 										timeSecs=1421219320,
		*					 										uptimeSecs=908,
		*					 										workerPorts=[6801, 6800, 6803, 6802]]
		*
		*/
		Heartbeat hb = new Heartbeat(conf, stormClusterState, supervisorId,
				active);
		hb.update();
		AsyncLoopThread heartbeat = new AsyncLoopThread(hb, false, null,
				Thread.MIN_PRIORITY, true);
		threads.add(heartbeat);
		
		// Sync heartbeat to Apsara Container
		AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkSupervisorInstance(conf);
		if (syncContainerHbThread != null) {
		    threads.add(syncContainerHbThread);
		}
		// Step 6 create and start sync Supervisor thread
		// every supervisor.monitor.frequency.secs second run SyncSupervisor
		/*
		*  创建两个同步线程,SyncSupervisorEvent和SyncProcessEvent分别用来接收事件和处理事件,
		*  与topology关系比较密切,在topology提交的时候再详细介绍
		*/
		EventManager processEventManager = new EventManagerImp(false);
		ConcurrentHashMap<String, String> workerThreadPids = new ConcurrentHashMap<String, String>();
		SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorId,
				conf, localState, workerThreadPids, sharedContext);

		EventManager syncSupEventManager = new EventManagerImp(false);
		SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(
				supervisorId, conf, processEventManager, syncSupEventManager,
				stormClusterState, localState, syncProcessEvent);


		int syncFrequence = JStormUtils.parseInt(conf
				.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS));
		EventManagerPusher syncSupervisorPusher = new EventManagerPusher(
				syncSupEventManager, syncSupervisorEvent, active, syncFrequence);
		AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(
				syncSupervisorPusher);
		threads.add(syncSupervisorThread);


		//Step 7 start httpserver
		int port = ConfigExtension.getSupervisorDeamonHttpserverPort(conf);
		Httpserver httpserver = new Httpserver(port, conf);
		httpserver.start();
		
		//Step 8 上传监控信息
		MetricSendClient client;
		if (ConfigExtension.isAlimonitorMetricsPost(conf)) {
			client = new AlimonitorClient(AlimonitorClient.DEFAUT_ADDR, 
					AlimonitorClient.DEFAULT_PORT, true);
		} else {
		    client = new MetricSendClient();
		}
		UploadSupervMetric uploadMetric = new UploadSupervMetric(conf, stormClusterState, 
				supervisorId, active, 60, client);
		AsyncLoopThread uploadMetricThread = new AsyncLoopThread(uploadMetric);
		threads.add(uploadMetricThread);
		
		// SupervisorManger which can shutdown all supervisor and workers
		return new SupervisorManger(conf, supervisorId, active, threads,
				syncSupEventManager, processEventManager, httpserver, 
				stormClusterState, workerThreadPids);
	}

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。