JStorm之Woker启动流程

我们知道,在Jstorm中具体任务执行是由Woker来完成的,Woker的启动是由Supervisor组件负责,下面就介绍一个Woker的启动流程。
Sueprvisor在启动后会启动一个检测任务分配的线程,该线程周期性的到Zookeeper中查询任务分配的情况,一旦有新的任务到来,就会把自己负责的Woker启动起来,当然其中具体细节还有很多,大概流程就是这样的。
线程的执行体定义在EventManagerImpExecute中,如下:

public void run() {
	try {
		while (manager.isRunning()) {
			RunnableCallback r = null;
			try {
				r = manager.take();
			} catch (InterruptedException e) {
				// LOG.info("Failed to get ArgsRunable from EventManager queue");
			}


			if (r == null) {
				try {
					Thread.sleep(10);
				} catch (InterruptedException e) {


				}
				continue;
			}


			r.run();
			Exception e = r.error();
			if (e != null) {
				throw e;
			}
			manager.proccessinc();


		}


	} catch (InterruptedException e) {
		.....
	}
}

可以看到该线程逻辑是基于事件队列的,最多10秒就会查询一次ZK,如果有事件到来,就会执行该事件的run方法,下面我们看下当一个任务到来时,Supervisor做了哪些操作,代码在SyncSupervisorEvent
public void run() {
	LOG.debug("Synchronizing supervisor");


	try {


		RunnableCallback syncCallback = new EventManagerZkPusher(this,
				syncSupEventManager);


		/**
		 * Step 1: get all assignments and register /ZK-dir/assignment and
		 * every assignment watch
		 * 
		 */
		Map<String, Assignment> assignments = Cluster.get_all_assignment(
				stormClusterState, syncCallback);
		LOG.debug("Get all assignments " + assignments);


		/**
		 * Step 2: get topologyIds list from
		 * STORM-LOCAL-DIR/supervisor/stormdist/
		 */
		List<String> downloadedTopologyIds = StormConfig
				.get_supervisor_toplogy_list(conf);
		LOG.debug("Downloaded storm ids: " + downloadedTopologyIds);


		/**
		 * Step 3: get <port,LocalAssignments> from ZK local node's
		 * assignment
		 */
		Map<Integer, LocalAssignment> localAssignment = getLocalAssign(
				stormClusterState, supervisorId, assignments);


		/**
		 * Step 4: writer local assignment to LocalState
		 */
		try {
			LOG.debug("Writing local assignment " + localAssignment);
			localState.put(Common.LS_LOCAL_ASSIGNMENTS, localAssignment);
		} catch (IOException e) {
			LOG.error("put LS_LOCAL_ASSIGNMENTS " + localAssignment
					+ " of localState failed");
			throw e;
		}


		// Step 5: download code from ZK


		Map<String, String> topologyCodes = getTopologyCodeLocations(
				assignments, supervisorId);


		downloadTopology(topologyCodes, downloadedTopologyIds);


		/**
		 * Step 6: remove any downloaded useless topology
		 */
		removeUselessTopology(topologyCodes, downloadedTopologyIds);


		/**
		 * Step 7: push syncProcesses Event
		 */
		processEventManager.add(syncProcesses);


	} catch (Exception e) {
		LOG.error("Failed to Sync Supervisor", e);
		// throw new RuntimeException(e);
	}
}
上面的代码中注释写的很清楚了,主要是重新注册ZK的watch和下载代码。然后把SyncProcessEvent事件丢入相应队列,然后由SyncProcessEvent线程处理该事件,也就是启动相应Worker。下面我们看这部分位于SyncProcessEvent的启动流程:
public void run() {
	LOG.debug("Syncing processes");
	try {


		/**
		 * Step 1: get assigned tasks from localstat Map<port(type Integer),
		 * LocalAssignment>
		 */
		Map<Integer, LocalAssignment> localAssignments = null;
		try {
			localAssignments = (Map<Integer, LocalAssignment>) localState
					.get(Common.LS_LOCAL_ASSIGNMENTS);
		} catch (IOException e) {
			LOG.error("Failed to get LOCAL_ASSIGNMENTS from LocalState", e);
			throw e;
		}


		if (localAssignments == null) {
			localAssignments = new HashMap<Integer, LocalAssignment>();
		}
		LOG.debug("Assigned tasks: " + localAssignments);


		/**
		 * Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat
		 * Map<workerid [WorkerHeartbeat, state]>
		 */
		Map<String, StateHeartbeat> localWorkerStats = null;
		try {
			localWorkerStats = getLocalWorkerStats(conf, localState,
					localAssignments);
		} catch (Exception e) {
			LOG.error("Failed to get Local worker stats");
			throw e;
		}
		LOG.debug("Allocated: " + localWorkerStats);


		/**
		 * Step 3: kill Invalid Workers and remove killed worker from
		 * localWorkerStats
		 */
		Set<Integer> keepPorts = killUselessWorkers(localWorkerStats);


		// check new workers
		checkNewWorkers(conf);


		// start new workers
		startNewWorkers(keepPorts, localAssignments);


	} catch (Exception e) {
		LOG.error("Failed Sync Process", e);
		// throw e
	}
}
上述代码主要完成两件事1、删除旧的无效Woker 2、启动新Worker,上述代码中最终会调用launchWorker来启动一个Woker进程,launchWorker的主要作用就是拼装Woker启动命令,在最后调用JStormUtils.launch_process来启动该进程
public void launchWorker(Map conf, IContext sharedcontext,
		String topologyId, String supervisorId, Integer port,
		String workerId, LocalAssignment assignment) throws IOException {


	// STORM-LOCAL-DIR/supervisor/stormdist/topologyId
	String stormroot = StormConfig.supervisor_stormdist_root(conf,
			topologyId);


	// STORM-LOCAL-DIR/supervisor/stormdist/topologyId/stormjar.jar
	String stormjar = StormConfig.stormjar_path(stormroot);


	// get supervisor conf
	Map stormConf = StormConfig.read_supervisor_topology_conf(conf,
			topologyId);


	Map totalConf = new HashMap();
	totalConf.putAll(conf);
	totalConf.putAll(stormConf);


	// get classpath
	// String[] param = new String[1];
	// param[0] = stormjar;
	// String classpath = JStormUtils.add_to_classpath(
	// JStormUtils.current_classpath(), param);


	// get child process parameter


	String stormhome = System.getProperty("jstorm.home");


	long memSize = assignment.getMem();
	int cpuNum = assignment.getCpu();
	String childopts = getChildOpts(totalConf);


	childopts += getGcDumpParam(totalConf);


	Map<String, String> environment = new HashMap<String, String>();


	if (ConfigExtension.getWorkerRedirectOutput(totalConf)) {
		environment.put("REDIRECT", "true");
	} else {
		environment.put("REDIRECT", "false");
	}


	environment.put("LD_LIBRARY_PATH",
			(String) totalConf.get(Config.JAVA_LIBRARY_PATH));


	StringBuilder commandSB = new StringBuilder();


	try {
		if (this.cgroupManager != null) {
			commandSB
					.append(cgroupManager.startNewWorker(cpuNum, workerId));
		}
	} catch (Exception e) {
		LOG.error("fail to prepare cgroup to workerId: " + workerId, e);
		return;
	}


	// commandSB.append("java -server -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=n ");
	commandSB.append("java -server ");
	commandSB.append(" -Xms" + memSize);
	commandSB.append(" -Xmx" + memSize + " ");
	commandSB.append(" -Xmn" + memSize / 3 + " ");
	commandSB.append(" -XX:PermSize=" + memSize / 16);
	commandSB.append(" -XX:MaxPermSize=" + memSize / 8);
	commandSB.append(" " + childopts);
	commandSB.append(" "
			+ (assignment.getJvm() == null ? "" : assignment.getJvm()));


	commandSB.append(" -Djava.library.path=");
	commandSB.append((String) totalConf.get(Config.JAVA_LIBRARY_PATH));


	if (stormhome != null) {
		commandSB.append(" -Djstorm.home=");
		commandSB.append(stormhome);
	} 
	
	commandSB.append(getLogParameter(totalConf, stormhome, assignment.getTopologyName(), port));


	String classpath = getClassPath(stormjar, stormhome, totalConf);
	String workerClassPath = (String) totalConf
			.get(Config.WORKER_CLASSPATH);
	List<String> otherLibs = (List<String>) stormConf
			.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
	StringBuilder sb = new StringBuilder();
	if (otherLibs != null) {
		for (String libName : otherLibs) {
			sb.append(StormConfig.stormlib_path(stormroot, libName))
					.append(":");
		}
	}
	workerClassPath = workerClassPath + ":" + sb.toString();


	Map<String, String> policyReplaceMap = new HashMap<String, String>();
	String realClassPath = classpath + ":" + workerClassPath;
	policyReplaceMap.put(SandBoxMaker.CLASS_PATH_KEY, realClassPath);
	commandSB
			.append(sandBoxMaker.sandboxPolicy(workerId, policyReplaceMap));


	// commandSB.append(" -Dlog4j.configuration=storm.log.properties");


	commandSB.append(" -cp ");
	// commandSB.append(workerClassPath + ":");
	commandSB.append(classpath);
	if (!ConfigExtension.isEnableTopologyClassLoader(totalConf))
		commandSB.append(":").append(workerClassPath);


	commandSB.append(" com.alibaba.jstorm.daemon.worker.Worker ");
	commandSB.append(topologyId);


	commandSB.append(" ");
	commandSB.append(supervisorId);


	commandSB.append(" ");
	commandSB.append(port);


	commandSB.append(" ");
	commandSB.append(workerId);


	commandSB.append(" ");
	commandSB.append(workerClassPath + ":" + stormjar);


	String cmd = commandSB.toString();
	cmd = cmd.replace("%ID%", port.toString());
	cmd = cmd.replace("%TOPOLOGYID%", topologyId);
	if (stormhome != null) {
		cmd = cmd.replace("%JSTORM_HOME%", stormhome);
	} else {
		cmd = cmd.replace("%JSTORM_HOME%", "./");
	}
	
	LOG.info("Launching worker with command: " + cmd);
	LOG.info("Environment:" + environment.toString());


	JStormUtils.launch_process(cmd, environment, true);
}
上面代码描述了构建Woker启动命令时的各种细节,包括内存、libpath、GC策略等信息,命令拼装好后就到了启动阶段,Woker是一个后台启动的进程
public static java.lang.Process launch_process(final String command,
		final Map<String, String> environment, boolean backend) throws IOException {


	if (backend == true) {
		new Thread(new Runnable() {
			
			@Override
			public void run() {
				String[] cmdlist = (new String("nohup " + command + " &")).split(" ");
				try {
					launchProcess(cmdlist, environment);
				} catch (IOException e) {
					LOG.error("Failed to run " + command + ":" + e.getCause(), e);
				}
			}
		}).start();
		return null;
	}else {
		String[] cmdlist = command.split(" ");
		return launchProcess(cmdlist, environment);
	}
}
这里会单独启动一个线程来启动Woker进程,最后由ProcessBuilder来启动
protected static java.lang.Process launchProcess(final String[] cmdlist, 
		final Map<String, String> environment)  throws IOException {
	ArrayList<String> buff = new ArrayList<String>();
	for (String tok : cmdlist) {
		if (!tok.isEmpty()) {
			buff.add(tok);
		}
	}
	
	ProcessBuilder builder = new ProcessBuilder(buff);
	builder.redirectErrorStream(true);
	Map<String, String> process_evn = builder.environment();
	for (Entry<String, String> entry : environment.entrySet()) {
		process_evn.put(entry.getKey(), entry.getValue());
	}


	return builder.start();
}

至此一个Woker启动就完成了,在系统通过ps命令就可以看到,上述流程中我们可以发现,Worker启动并没生成启动脚本,这点和Hadoop中是不同的,在Hadoop中Map和Reduce的启动都会生成一个脚本,有时排查问题会更方便。

最后附上相关线程图片:

技术分享

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。