K天熟悉Apache Storm (三)
软件版本:Storm:0.9.3 ,Redis:2.8.19;jedis:2.6.2;
代码及Jedis下载:Storm实时单词计数
Storm应用场景--实时单词计数,有点类似《Getting Started with Storm》中的chapter6的real-life app。
场景描述:
1. 使用一个java程序每间隔一定时间向Redis数据库A中存入数据;
2. Storm的Spout读取Redis数据库A中的数据,读取后删除Redis中的数据;
3. Storm的SplitBolt读取Spout的输出,对其进行解析,并输出;
4. Storm的CountBolt对SplitBolt的数据进行计数,并每隔一定间隔把数据存储在Redis数据库B中;
5. 另外的java程序定时读取Redis数据库B中的数据,并打印;
具体实现:
1. Java定时向Redis发送数据
while(true){// 每次发送3个数据 try { Thread.sleep(200);// 每200毫秒产生一次数据 } catch (InterruptedException e) { e.printStackTrace(); } interval ++; int index = random.nextInt(normal.length); if(!jedis.exists("0")){// 如果不存在key说明已经被取走了,就再次产生,否则不产生 jedis.set("0",normal[index]); } index = random.nextInt(normal.length); if(!jedis.exists("1")){ jedis.set("1", normal[index]); } index = random.nextInt(normal.length); if(!jedis.exists("2")){ jedis.set("2", normal[index]); } if(interval*200/1000==2*60) {// 每间隔200毫秒产生数据后,产生了2分钟,共2*60*1000/200*3 个数据记录 // 暂停 5分钟 System.out.println(new java.util.Date()+":数据暂定5分钟产生..."); try { interval=0; Thread.sleep(5*60*1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new java.util.Date()+":5分钟暂停完成,继续产生数据..."); } }
这里使用一个固定的字符串数组,每次从里面随机抽取三个字符串,使用Jedis存储到Redis的数据库中;
2. Spout读取Redis数据
@Override public void nextTuple() { long interval =0; while(true){// 获取数据 interval++; String zero = getItem("0"); String one = getItem("1"); String two = getItem("2"); try { Thread.sleep(200);// 每200毫秒发送一次数据 } catch (InterruptedException e) { e.printStackTrace(); } if(zero==null||one==null||two==null){ // do nothing // 没有数据 // if(interval%15==0){ // } }else{ String tmpStr =zero+","+one+","+two; if(thisTaskId==tmpStr.hashCode()%numTasks){ // spout负载均衡 this.collector.emit(new Values(tmpStr)); if(interval%15==0&&"fast".equals(slow_fast)){ System.out.println(RedisUtils.getCurrDateWithInfo(String.valueOf(thisTaskId), taskId, "Spout:["+zero+","+one+","+two+"]")); }else if("slow".equals(slow_fast)){ System.out.println(RedisUtils.getCurrDateWithInfo(String.valueOf(thisTaskId), taskId, "Spout:["+zero+","+one+","+two+"]")); }else{ new RuntimeException("Wrong argument!"); } } } } }这里使用了负载均衡,Spout处理的数据按task进行分隔。
getItem用于从Redis中获取数据,并删除对应的数据,代码如下:
/** * Redis中获取键值并删除对应的键 * @param index */ private String getItem(String index){ if(!jedis.exists(index)){ return null; } String val = jedis.get(index); // if(val==null||"null".equals("null")){ // return ; // } jedis.del(index); return val; }3. SplitBolt就是一般的单词分隔代码:
public void execute(Tuple input, BasicOutputCollector collector) { interval++; String sentence = input.getString(0); if(interval%15==0&&"fast".equals(slow_fast)){ // System.out.println(new java.util.Date()+":ConponentId:"+conponentId+",taskID:"+taskId+ // "splitBolt:"+sentence); System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "splitBolt:"+sentence)); }else if("slow".equals(slow_fast)){ System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "splitBolt:"+sentence)); } String[] words = sentence.split(","); for(String word : words){ word = word.trim(); if(!word.isEmpty()){ word = word.toLowerCase(); collector.emit(new Values(word)); } } }
4. CountBolt进行单词计数,并向Redis数据库中存储单词的计数
public void execute(Tuple input, BasicOutputCollector collector) { interval++; String str = input.getString(0); /** * If the word dosn‘t exist in the map we will create * this, if not We will add 1 */ if(!counters.containsKey(str)){ counters.put(str, 1); }else{ Integer c = counters.get(str) + 1; counters.put(str, c); } // 每records条数据则向向数据库中更新 if(interval%records==0){ for(Map.Entry<String , Integer> m :counters.entrySet()){ jedis.set(m.getKey(), String.valueOf(m.getValue()));// } } }5. Java程序定时读取Redis中单词计数,并打印
private void read() { System.out.println("数据获取开始。。。,10s后打印。。。"); long interval =0; while(true){// 获取数据 interval++; Set<String> keys = jedis.keys("*"); for(String key:keys){ push2Map(key); } // push2Map("one"); try { Thread.sleep(200);// 每200毫秒获取一次数据 } catch (InterruptedException e) { e.printStackTrace(); } if(interval*200/1000==10) {// 每10秒打印一次 interval=0; printMap(); } } }
Storm作为实时大数据处理框架,从这个小例子中就可以感受一二。
ps:相关调用接口:
System.out.println("\nwc.redis.WCTopology <storeFrequent> <num_works>" + " <parallel_spout> <parallel_split_bolt> <parallel_count_bolt> <slow|fast>"+ " <printWC>");打包使用storm jar命令运行的时候,其中的参数解释如下:
storeFrequent : CountBolt每多少条记录往Redis数据库中存储一次数据;
num_works : worker的数量;
parallel_spout :Spout并行的数量;
parallel_split_bolt :SplitBolt并行的数量;
parallel_count_bolt :CountBolt并行的数量;
slow|fast :在Spout/SplitBolt/CountBolt中日志打印的频率;
printWC:在CountBolt中的日志是否打印(true|false);
分享,成长,快乐
脚踏实地,专注
转载请注明blog地址:http://blog.csdn.net/fansy1990
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。