流式计算-Jstorm提交Topology过程

Topology是Jstorm对有向无环图的抽象,内部封装了数据来源spout和数据处理单元bolt,以及spout和bolt、bolt和bolt之间的关系,它可以被提交到Jstorm集群。

本文以Jstorm自带的SequenceTopology简单介绍一下Jstorm提交topology的过程,本文主要介绍提交过程,不涉及具体业务,

1、 SequenceTopology核心方法com.alipay.dw.jstorm.example.sequence.SequenceTopology.SetBuilder(TopologyBuilder builder, Map conf),该方法主要根据配置文件,使用TopologyBuilder构造Topology的spout和bolt,以及spout和bolt之间的关系

2、TopologyBuilder构造好Topology之后,通过Jstorm Client的StormSubmitter.submitTopology(streamName, conf,builder.createTopology())提交Topology到Jstorm集群,

3、在StormSubmitter.submitTopology方法中,首先会对配置项进行检查、然后将Topology自己的配置项和Jstorm的配置项组装成一个大的Map,之后上传用户在命令行提交的Jar包,然后通过NimbusClient 的submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) 方法将Topology提交到Jstorm集群,其核心代码如下:

if (!Utils.isValidConf(stormConf)) {
			throw new IllegalArgumentException(
					"Storm conf is not valid. Must be json-serializable");
		}
		stormConf = new HashMap(stormConf);
		stormConf.putAll(Utils.readCommandLineOpts());
		Map conf = Utils.readStormConfig();
		conf.putAll(stormConf);
		putUserInfo(conf, stormConf);
		String serConf = JSON.toJSONString(stormConf);
		if (localNimbus != null) {
			localNimbus.submitTopology(name, null, serConf, topology);
		} else {
			NimbusClient client = NimbusClient.getConfiguredClient(conf);
			if (topologyNameExists(conf, name)) {//检查名字是否重复,Jstorm要求每个topology名称必须唯一
					throw new RuntimeException("Topology with name `" + name
							+ "` already exists on cluster");
				}
			submitJar(conf);//上传Jar包到ZK			
			client.getClient().submitTopologyWithOpts(name, path,
								serConf, topology, opts);//通过Thrift将topology提交到集群
					

4、NimbusClient提交之后,NimbusSever通过com.alibaba.jstorm.daemon.nimbus.ServiceHandler.submitTopologyWithOpts(String topologyname, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options)处理接收到的topology,其具体逻辑如下(代码已经精简)

public void submitTopologyWithOpts(String topologyname,
			String uploadedJarLocation, String jsonConf,
			StormTopology topology, SubmitOptions options)
			throws AlreadyAliveException, InvalidTopologyException,
			TopologyAssignException, TException {
		//首先检查topology是否已经存在
		checkTopologyActive(data, topologyname, false);		
		//生成topology的唯一标识
		int counter = data.getSubmittedCount().incrementAndGet();
		String topologyId = topologyname + "-" + counter + "-"
				+ TimeUtils.current_time_secs();
		try {
			//反序列化topology配置项
			Map<Object, Object> serializedConf = (Map<Object, Object>) JStormUtils
					.from_json(jsonConf);
			if (serializedConf == null) {
				LOG.warn("Failed to serialized Configuration");
				throw new InvalidTopologyException(
						"Failed to serilaze topology configuration");
			}
			//将topology的名称和ID增加到配置项中
			serializedConf.put(Config.TOPOLOGY_ID, topologyId);
			serializedConf.put(Config.TOPOLOGY_NAME, topologyname);			
			Map<Object, Object> stormConf;
			stormConf = NimbusUtils.normalizeConf(conf, serializedConf,
					topology);
			Map<Object, Object> totalStormConf = new HashMap<Object, Object>(
					conf);
			totalStormConf.putAll(stormConf);
			StormTopology normalizedTopology = NimbusUtils.normalizeTopology(
					stormConf, topology);

			// this validates the structure of the topology
			Common.validate_basic(normalizedTopology, totalStormConf,
					topologyId);
			// don't need generate real topology, so skip Common.system_topology
			// Common.system_topology(totalStormConf, topology);
			StormClusterState stormClusterState = data.getStormClusterState();
			// 创建topology在ZK上的目录
			setupStormCode(conf, topologyId, uploadedJarLocation, stormConf,
					normalizedTopology);

			// 为每一个spout或者bolt生成Task,并在ZK上创建相应的task目录<span style="font-family: Arial, Helvetica, sans-serif;">/ZK/tasks/topoologyId/xxx</span>
			setupZkTaskInfo(conf, topologyId, stormClusterState);
			// 进行任务分配
			TopologyAssignEvent assignEvent = new TopologyAssignEvent();
			assignEvent.setTopologyId(topologyId);
			assignEvent.setScratch(false);
			assignEvent.setTopologyName(topologyname);
			assignEvent.setOldStatus(Thrift
					.topologyInitialStatusToStormStatus(options
							.get_initial_status()));

			TopologyAssign.push(assignEvent);
			LOG.info("Submit for " + topologyname + " with conf "
					+ serializedConf);

			boolean isSuccess = assignEvent.waitFinish();
			if (isSuccess == true) {
				LOG.info("Finish submit for " + topologyname);
			} 


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。