mac10.9下eclipse的storm开发环境搭建
- 易于扩展。对于扩展,你只需要添加机器和改变对应的topology(拓扑)设置。Storm使用Hadoop Zookeeper进行集群协调,这样可以充分的保证大型集群的良好运行。
- 每条信息的处理都可以得到保证。
- Storm集群管理简易。
- Storm的容错机能:一旦topology递交,Storm会一直运行它直到topology被废除或者被关闭。而在执行中出现错误时,也会由Storm重新分配任务。
- 尽管通常使用Java,Storm中的topology可以用任何语言设计。
- Topologies 用于封装一个实时计算应用程序的逻辑,类似于Hadoop的MapReduce,下面是一个Topology内部Spout和Bolt之间的数据流关系:
- Stream 消息流,是一个没有边界的tuple序列,这些tuples会被以一种分布式的方式并行地创建和处理。在Topology定义时,需要为每个Bolt指定接收什么样的Stream作为其输入(注:Spout并不需要接收Stream,只会发射Stream)。
- Spouts 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息:tuple
- Bolts 消息处理者,所有的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生输出的新数据流,可执行过滤,聚合,查询数据库等操作
- Task 每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程.
- Stream groupings 消息分发策略,定义一个Topology的其中一步是定义每个tuple接受什么样的流作为输入,stream grouping就是用来定义一个stream应该如果分配给Bolts们.
- stream grouping分类
- Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同.
- Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts.
- All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到.
- Global Grouping: 全局分组,这个tuple被分配到storm中的一个bolt的其中一个task.再具体一点就是分配给id值最低的那个task.
- Non Grouping: 不分组,意思是说stream不关心到底谁会收到它的tuple.目前他和Shuffle grouping是一样的效果,有点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程去执行.
- Direct Grouping: 直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息.只有被声明为Direct Stream的消息流可以声明这种分组方法.而且这种消息tuple必须使用emitDirect方法来发射.消息处理者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)
liondeMacBook-Pro:~ lion$ ruby -e "$(curl -fsSL https://raw.github.com/Homebrew/homebrew/go/install)" |
liondeMacBook-Pro:~ lion$ sudo brew install maven |
liondeMacBook-Pro:java lion$ pwd /Users/lion/Documents/_my_project/java liondeMacBook-Pro:java lion$ git clone https://github.com/nathanmarz/storm-starter.git liondeMacBook-Pro:java lion$ cd storm-starter/ liondeMacBook-Pro:storm-starter lion$ mvn -f m2-pom.xml package |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>storm.starter</groupId> <artifactId>storm-starter</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <repositories> <repository> <id>github-releases</id> <url>http://oss.sonatype.org/content/repositories/github-releases/</url> </repository> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.9.0.1</version> <!-- keep storm out of the jar-with-dependencies --> <scope>provided</scope> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.1</version> </dependency> </dependencies> </project> |
WordCountTopology.java
package storm.starter; import java.util.HashMap; import java.util.Map; import storm.starter.RandomSentenceSpout; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * This topology demonstrates Storm‘s stream groupings and multilang * capabilities. */ public class WordCountTopology { public static class SplitSentence extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { try { String msg = input.getString(0); System.out.println(msg + "-------------------"); if (msg != null) { String[] s = msg.split(" "); for (String string : s) { collector.emit(new Values(string)); } } } catch (Exception e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",newFields("word")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { /*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程 如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了 一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交 但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。 */ conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); //指定为本地模式运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } } } |
RandomSentenceSpout.java
package storm.starter; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; import java.util.Random; public class RandomSentenceSpout extends BaseRichSpout { /** * 用来发射数据的工具类 */ SpoutOutputCollector _collector; Random _rand; /** * 这里初始化collector */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } /** * 该方法会在SpoutTracker类中被调用每调用一次就可以向storm集群中发射一条数据(一个tuple元组) * 该方法会被不停的调用 */ public void nextTuple() { //模拟等待100ms Utils.sleep(100); //构造随机数据 String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; String sentence = sentences[_rand.nextInt(sentences.length)]; //调用发射方法 _collector.emit(new Values(sentence)); } public void ack(Object id) { } public void fail(Object id) { } /** * 这里定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。 * 该declarer变量有很大作用,我们还可以调用 declarer.declareStream(); 来定义stramId,该id可以用来定义 * 更加复杂的流拓扑结构 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } |
an apple a day keeps the doctor away------------------- 13324 [Thread-20-count] INFO backtype.storm.daemon.task - Emitting: count default [with, 57] 13324 [Thread-20-count] INFO backtype.storm.daemon.executor - Processing received message source: split:5, stream: default, id: {}, [nature] 13324 [Thread-24-split] INFO backtype.storm.daemon.task - Emitting: split default [an] 13324 [Thread-20-count] INFO backtype.storm.daemon.task - Emitting: count default [nature, 57] 13324 [Thread-24-split] INFO backtype.storm.daemon.task - Emitting: split default [apple] 13324 [Thread-20-count] INFO backtype.storm.daemon.executor - Processing received message source: split:5, stream: default, id: {}, [an] 13324 [Thread-24-split] INFO backtype.storm.daemon.task - Emitting: split default [a] 13324 [Thread-20-count] INFO backtype.storm.daemon.task - Emitting: count default [an, 44] |
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。