背景
Kafka实时记录从数据采集工具Flume或业务系统实时接口收集数据,并作为消息缓冲组件为上游实时计算框架提供可靠数据支撑,Spark 1.3版本后支持两种整合Kafka机制(Receiver-based Approach 和 Direct Approach),具体细节请参考文章最后官方文档链接,数据存储使用HBase
实现思路
- 实现Kafka消息生产者模拟器
- Spark Streaming采用Direct Approach方式实时获取Kafka中数据
- Spark Streaming对数据进行业务计算后存储到HBase
组件版本
Spark 2.1.0 Kafka0.9.0.1 HBase1.2.0
代码实现
Kafka消息模拟器
object KafkaMessageGenerator { private val random = new Random() private var pointer = -1 private val os_type = Array( "Android", "IPhone OS", "None", "Windows Phone" ) def click(): Double = { random.nextInt(10) } def getOsType(): String = { pointer = pointer + 1 if (pointer >= os_type.length) { pointer = 0 os_type(pointer) } else { os_type(pointer) } } def main(args: Array[String]): Unit = { val topic = "user_events" val props = new Properties() props.put("bootstrap.servers", "10.3.71.154:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) while (true) { val event: JSONObject = new JSONObject() event.put("uid", UUID.randomUUID()) //随机生成用户id event.put("event_time", System.currentTimeMillis.toString) //记录事件发生时间 event.put("os_type", getOsType) //设备类型 event.put("click_count", click) //点击次数 val record = new ProducerRecord[String, String](topic, event.toString) producer.send(record) println("Message sent: " + event) Thread.sleep(200) } }}
Spark Streaming主类
object PageViewStream { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("PageViewStream").setMaster("local[*]") //创建StreamingContext 批处理间隔5s val ssc = new StreamingContext(conf, Seconds(5)) // kafka配置 val kafkaParams = Map[String, String]( "metadata.broker.list" -> "10.3.71.154:9092", "serializer.class" -> "kafka.serializer.StringEncoder" ) //创建一个direct stream val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("user_events")) val events: DStream[JSONObject] = kafkaStream.flatMap(line => { val data: JSONObject = JSON.parseObject(line._2) Some(data) }) // 计算用户点击次数 val userClicks: DStream[(String, Integer)] = events.map(x => (x.getString("uid"), x.getInteger("click_count"))).reduceByKey(_ + _) userClicks.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { //Hbase配置 val tableName = "PageViewStream2" val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "master66") hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") val conn = ConnectionFactory.createConnection(hbaseConf) val StatTable = conn.getTable(TableName.valueOf(tableName)) partitionOfRecords.foreach(pair => { //用户ID val uid = pair._1 //点击次数 val click = pair._2 //组装数据 创建put对象 rowkey val put = new Put(Bytes.toBytes(uid)) put.addColumn("Stat2".getBytes, "ClickStat".getBytes, Bytes.toBytes("TESTS============")) StatTable.put(put) }) }) }) ssc.start() ssc.awaitTermination() }}