Spark Streaming、Kafka结合Spark JDBC External DataSouces处理案例
场景:使用Spark Streaming接收Kafka发送过来的数据与关系型数据库中的表进行相关的查询操作;
Kafka发送过来的数据格式为:id、name、cityId,分隔符为tab
1 zhangsan 1 2 lisi 1 3 wangwu 2 4 zhaoliu 3
MySQL的表city结构为:id int, name varchar
1 bj 2 sz 3 sh
本案例的结果为:select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id;
Kafka安装参见:Kafka单机版环境搭建
启动Kafka:
zkServer.sh start kafka-server-start.sh $KAFKA_HOME/config/server.properties & kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic luogankun_topic kafka-console-producer.sh --broker-list hadoop000:9092 --topic luogankun_topic
实例代码:
package com.asiainfo.ocdc case class Student(id: Int, name: String, cityId: Int)
package com.asiainfo.ocdc import org.apache.spark.streaming._ import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka._ /** * Spark Streaming处理Kafka的数据并结合Spark JDBC外部数据源处理 * * @author luogankun */ object KafkaStreaming { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaStreaming <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(5)) val sqlContext = new HiveContext(sc) import sqlContext._ import com.luogankun.spark.jdbc._ //使用External Data Sources处理MySQL中的数据 val cities = sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root", "root", "select id, name from city") //将cities RDD注册成city临时表 cities.registerTempTable("city") val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap val inputs = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2) inputs.foreachRDD(rdd => { if (rdd.partitions.length > 0) { //将Streaming中接收到的数据注册成student临时表 rdd.map(_.split("\t")).map(x => Student(x(0).toInt, x(1), x(2).toInt)).registerTempTable("student") //关联Streaming和MySQL表进行查询操作 sqlContext.sql("select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id").collect().foreach(println) } }) ssc.start() ssc.awaitTermination() } }
提交到集群执行脚本:sparkstreaming_kafka_jdbc.sh
#!/bin/sh . /etc/profile set -x cd $SPARK_HOME/bin spark-submit --name KafkaStreaming --class com.asiainfo.ocdc.KafkaStreaming --master spark://hadoop000:7077 \ --executor-memory 1G --total-executor-cores 1 /home/spark/software/source/streaming-app/target/streaming-app-V00B01C00-SNAPSHOT-jar-with-dependencies.jar hadoop000:2181 test-consumer-group luogankun_topic 1
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。