博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于Kafka+Spark Streaming+HBase实时点击流案例
阅读量:5214 次
发布时间:2019-06-14

本文共 3422 字,大约阅读时间需要 11 分钟。

 背景

Kafka实时记录从数据采集工具Flume或业务系统实时接口收集数据,并作为消息缓冲组件为上游实时计算框架提供可靠数据支撑,Spark 1.3版本后支持两种整合Kafka机制(Receiver-based Approach 和 Direct Approach),具体细节请参考文章最后官方文档链接,数据存储使用HBase

实现思路

  1. 实现Kafka消息生产者模拟器
  2. Spark Streaming采用Direct Approach方式实时获取Kafka中数据
  3. Spark Streaming对数据进行业务计算后存储到HBase

组件版本

Spark 2.1.0  Kafka0.9.0.1 HBase1.2.0

代码实现

Kafka消息模拟器

object KafkaMessageGenerator {  private val random = new Random()  private var pointer = -1  private val os_type = Array(    "Android", "IPhone OS",    "None", "Windows Phone"  )  def click(): Double = {    random.nextInt(10)  }  def getOsType(): String = {    pointer = pointer + 1    if (pointer >= os_type.length) {      pointer = 0      os_type(pointer)    } else {      os_type(pointer)    }  }    def main(args: Array[String]): Unit = {      val topic = "user_events"      val props = new Properties()      props.put("bootstrap.servers", "10.3.71.154:9092")      props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer")      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")      val producer = new KafkaProducer[String, String](props)      while (true) {        val event: JSONObject = new JSONObject()        event.put("uid", UUID.randomUUID()) //随机生成用户id        event.put("event_time", System.currentTimeMillis.toString) //记录事件发生时间        event.put("os_type", getOsType) //设备类型        event.put("click_count", click) //点击次数        val record = new ProducerRecord[String, String](topic, event.toString)        producer.send(record)        println("Message sent: " + event)        Thread.sleep(200)      }    }}

Spark Streaming主类

object PageViewStream {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("PageViewStream").setMaster("local[*]")    //创建StreamingContext  批处理间隔5s    val ssc = new StreamingContext(conf, Seconds(5))    // kafka配置    val kafkaParams = Map[String, String](      "metadata.broker.list" -> "10.3.71.154:9092",      "serializer.class" -> "kafka.serializer.StringEncoder"    )    //创建一个direct stream    val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("user_events"))    val events: DStream[JSONObject] = kafkaStream.flatMap(line => {      val data: JSONObject = JSON.parseObject(line._2)      Some(data)    })    // 计算用户点击次数    val userClicks: DStream[(String, Integer)] = events.map(x => (x.getString("uid"), x.getInteger("click_count"))).reduceByKey(_ + _)    userClicks.foreachRDD(rdd => {      rdd.foreachPartition(partitionOfRecords => {        //Hbase配置        val tableName = "PageViewStream2"        val hbaseConf = HBaseConfiguration.create()        hbaseConf.set("hbase.zookeeper.quorum", "master66")        hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")        val conn = ConnectionFactory.createConnection(hbaseConf)        val StatTable = conn.getTable(TableName.valueOf(tableName))        partitionOfRecords.foreach(pair => {          //用户ID          val uid = pair._1          //点击次数          val click = pair._2          //组装数据 创建put对象 rowkey          val put = new Put(Bytes.toBytes(uid))          put.addColumn("Stat2".getBytes, "ClickStat".getBytes, Bytes.toBytes("TESTS============"))          StatTable.put(put)        })      })    })    ssc.start()    ssc.awaitTermination()  }}

 

 

转载于:https://www.cnblogs.com/itboys/p/9156701.html

你可能感兴趣的文章
监控工具之---Prometheus 安装详解(三)
查看>>
Azure Iaas基础之---创建虚拟机
查看>>
不错的MVC文章
查看>>
网络管理相关函数
查看>>
IOS Google语音识别更新啦!!!
查看>>
20190422 T-SQL 触发器
查看>>
[置顶] Linux终端中使用上一命令减少键盘输入
查看>>
poj1422_有向图最小路径覆盖数
查看>>
BootScrap
查看>>
[大牛翻译系列]Hadoop(16)MapReduce 性能调优:优化数据序列化
查看>>
WEB_点击一百万次
查看>>
CodeForces - 878A Short Program(位运算)
查看>>
路冉的JavaScript学习笔记-2015年1月23日
查看>>
Mysql出现(10061)错误提示的暴力解决办法
查看>>
2018-2019-2 网络对抗技术 20165202 Exp3 免杀原理与实践
查看>>
NPM慢怎么办 - nrm切换资源镜像
查看>>
CoreData 从入门到精通(四)并发操作
查看>>
Swift - UIView的常用属性和常用方法总结
查看>>
Swift - 异步加载各网站的favicon图标,并在单元格中显示
查看>>
Java编程思想总结笔记Chapter 5
查看>>