Kafka+sparkStreaming+Hbase

一、说明

1、需求分析

实时定位系统:实时定位某个用户的具体位置,将最新数据进行存储;

2、具体操作

sparkStreaming从kafka消费到原始用户定位信息,进行分析。然后将分析之后且满足需求的数据按rowkey=用户名进行Hbase存储;这里为了简化,kafka消费出的原始数据即是分析好之后的数据,故消费出可以直接进行存储;

3、组件版本

组件 版本
kafka kafka_2.10-0.10.2.1
spark spark-2.2.0-bin-hadoop2.7
hbase hbase-1.2.6

二、业务操作

1、在hbase中首先创建一个表

hbase(main):003:0> create 'location_sure','info'

2、pom依赖

<dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency><!-- Spark Streaming Kafka -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.2.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.gavaghan</groupId>
            <artifactId>geodesy</artifactId>
            <version>1.1.3</version>
        </dependency>
        <dependency>
            <groupId>com.github.scopt</groupId>
            <artifactId>scopt_2.11</artifactId>
            <version>3.7.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.2.4</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.codehaus.jettison/jettison -->
        <dependency>
            <groupId>org.codehaus.jettison</groupId>
            <artifactId>jettison</artifactId>
            <version>1.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/net.sf.json-lib/json-lib -->
        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.4</version>
            <classifier>jdk15</classifier>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.4.2</version>
        </dependency>
        <!-- orcale驱动 -->
        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>ojdbc6</artifactId>
            <version>12.1.0.1-atlassian-hosted</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.6</version>
        </dependency>
    </dependencies>

3、工具类

package com.cn.util

import java.text.SimpleDateFormat
import java.util.Date

object TimeUtil {
  //时间转化为时间戳
  def tranTimeToLong(tm:String) :Long={
    val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val dt = fm.parse(tm)
    val aa = fm.format(dt)
    val tim: Long = dt.getTime()
    tim
  }

  //时间戳转化为时间
  def tranTimeToString(tm:String) :String={
    val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val tim = fm.format(new Date(tm.toLong))
    tim
  }
}
package com.cn.util

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}

object HbaseUtils extends Serializable{

  /**
    * @param zkList
    * @param port
    * @return
    */
    def getHBaseConn(zkList: String, port: String): Connection = {
      val conf = HBaseConfiguration.create()
      conf.set("hbase.zookeeper.quorum", zkList)
      conf.set("hbase.zookeeper.property.clientPort", port)
      val connection = ConnectionFactory.createConnection(conf)
      connection
    }

}

4、造数据到kafka

package com.cn.util

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.codehaus.jettison.json.JSONObject

import scala.util.Random


/**
  * 编写一个提交数据到kafka集群的producer
  * 模拟场景:
  * 统计一些用户实时步行的总步数,每隔5s统计一次,包括某个用户新统计时的时间、所在地点
  */
object KafkaEventProducer {

  //用户
  private val users = Array(
    "zhangSan", "liSi",
    "wangWu", "xiaoQiang",
    "zhangFei", "liuBei",
    "guanYu", "maChao",
    "caoCao", "guanYu"
  )

  private var pointer = -1

  //随机获得用户
  def getUser(): String = {
    pointer = (pointer + 1) % users.length
    users(pointer)
  }

 
  val random = new Random()

  //获取统计时间
  def getTime(): Long = {
    System.currentTimeMillis()
  }

  //获取行走地点
  val walkPlace = Array(
    "操场南门", "操场东门", "操场北门", "操场西门", "操场东南门", "操场西北门", "操场西南门", "操场东南北门"
  )

  def getWalkPlace(): String = {
    walkPlace(random.nextInt(walkPlace.length))
  }


  def main(args: Array[String]): Unit = {

    val topic = "topic_walkCount"
    val brokers = "master:6667,slaves1:6667,slaves2:6667"
    //设置属性,配置
    val props = new Properties()
    props.setProperty("bootstrap.servers", brokers)
    props.setProperty("metadata.broker.list", brokers)
    props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    //生成producer对象
    val producer = new KafkaProducer[String, String](props)

    //传输数据
    while (true) {
      val event = new JSONObject()
      event.put("user", getUser())
        .put("count_time", TimeUtil.tranTimeToString(getTime().toString))
        .put("walk_place", getWalkPlace())
      println(event.toString())
      //发送数据
      producer.send(new ProducerRecord[String, String](topic, event.toString))
      Thread.sleep(5000)
    }
  }
}

5、分析处理

package com.cn.sparkStreaming

import com.cn.util.{HbaseUtils, RedisUtils, TimeUtil}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TimeUnit
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.codehaus.jettison.json.JSONObject

object kafka2sparkStreaming2Hbase {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("kafka2sparkStreaming2Hbase")
      .setMaster("local[1]")
    //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //设置流数据每批的时间间隔为2s
    val ssc = new StreamingContext(conf, Seconds(1))
    //控制日志输出级别
    ssc.sparkContext.setLogLevel("WARN") //WARN,INFO,DEBUG
    ssc.checkpoint("checkpoint")
    val topic = "topic_walkCount"
    val groupId = "t02"
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "master:6667,slaves1:6667,slaves2:6667",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest", // 初次启动从最开始的位置开始消费
      "enable.auto.commit" -> (false: java.lang.Boolean) // 自动提交设置为 false
    )

    val topics = Array(topic)
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent, //均匀分发到executor
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    stream.foreachRDD(rdd => {
      // 获取每一个分区的消费的偏移量
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.foreachPartition(partitions => {
        partitions.foreach(records => {
          val record = new JSONObject(records.value())
          val user = record.getString("user")
          val countTime = record.getString("count_time")

          val walkPlace = record.getString("walk_place")

          val tableName=TableName.valueOf("location_sure")
          //列簇名
          val columnFamily=Bytes.toBytes("info")
          //列名
          val count_time=Bytes.toBytes("count_time")
          val walk_place=Bytes.toBytes("walk_place")
          val connection = HbaseUtils.getHBaseConn("master,slaves1,slaves2","2181")
          val table = connection.getTable(tableName)

          val put = new Put(Bytes.toBytes(user))
          //注意,countTime、walkPlace一定要是string类型,不是要toString转化,否则存入乱码;
          put.addColumn(columnFamily,count_time,Bytes.toBytes(countTime))
          put.addColumn(columnFamily,walk_place,Bytes.toBytes(walkPlace))

          table.put(put)
          println("insert hbase success!")
        })
      })
      // 手动提交偏移量
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

三、测试

1、数据源

{"user":"zhangSan","count_time":"2020-03-11 11:45:12","walk_place":"操场南门"}
{"user":"liSi","count_time":"2020-03-11 11:45:17","walk_place":"操场西南门"}
{"user":"wangWu","count_time":"2020-03-11 11:45:22","walk_place":"操场西南门"}
{"user":"xiaoQiang","count_time":"2020-03-11 11:45:27","walk_place":"操场东门"}
{"user":"zhangFei","count_time":"2020-03-11 11:45:32","walk_place":"操场北门"}
{"user":"liuBei","count_time":"2020-03-11 11:45:37","walk_place":"操场西门"}
{"user":"guanYu","count_time":"2020-03-11 11:45:42","walk_place":"操场南门"}
{"user":"maChao","count_time":"2020-03-11 11:45:47","walk_place":"操场南门"}
{"user":"caoCao","count_time":"2020-03-11 11:45:52","walk_place":"操场东南北门"}
{"user":"guanYu","count_time":"2020-03-11 11:45:57","walk_place":"操场西北门"}

2、hbase存储展示

hbase(main):006:0> scan 'location_sure'
ROW                                COLUMN+CELL                                                                                         
 caoCao                            column=info:count_time, timestamp=1583898354092, value=2020-03-11 11:45:52                          
 caoCao                            column=info:walk_place, timestamp=1583898354092, value=\xE6\x93\x8D\xE5\x9C\xBA\xE4\xB8\x9C\xE5\x8D\
                                   x97\xE5\x8C\x97\xE9\x97\xA8                                                                         
 guanYu                            column=info:count_time, timestamp=1583898357267, value=2020-03-11 11:45:57                          
 guanYu                            column=info:walk_place, timestamp=1583898357267, value=\xE6\x93\x8D\xE5\x9C\xBA\xE8\xA5\xBF\xE5\x8C\
                                   x97\xE9\x97\xA8                                                                                     
 liSi                              column=info:count_time, timestamp=1583898330817, value=2020-03-11 11:45:17                          
 liSi                              column=info:walk_place, timestamp=1583898330817, value=\xE6\x93\x8D\xE5\x9C\xBA\xE8\xA5\xBF\xE5\x8D\
                                   x97\xE9\x97\xA8                                                                                     
 liuBei                            column=info:count_time, timestamp=1583898337245, value=2020-03-11 11:45:37                          
 liuBei                            column=info:walk_place, timestamp=1583898337245, value=\xE6\x93\x8D\xE5\x9C\xBA\xE8\xA5\xBF\xE9\x97\
                                   xA8                                                                                                 
 maChao                            column=info:count_time, timestamp=1583898354005, value=2020-03-11 11:45:47                          
 maChao                            column=info:walk_place, timestamp=1583898354005, value=\xE6\x93\x8D\xE5\x9C\xBA\xE5\x8D\x97\xE9\x97\
                                   xA8                                                                                                 
 wangWu                            column=info:count_time, timestamp=1583898330892, value=2020-03-11 11:45:22                          
 wangWu                            column=info:walk_place, timestamp=1583898330892, value=\xE6\x93\x8D\xE5\x9C\xBA\xE8\xA5\xBF\xE5\x8D\
                                   x97\xE9\x97\xA8                                                                                     
 xiaoQiang                         column=info:count_time, timestamp=1583898330990, value=2020-03-11 11:45:27                          
 xiaoQiang                         column=info:walk_place, timestamp=1583898330990, value=\xE6\x93\x8D\xE5\x9C\xBA\xE4\xB8\x9C\xE9\x97\
                                   xA8                                                                                                 
 zhangFei                          column=info:count_time, timestamp=1583898332253, value=2020-03-11 11:45:32                          
 zhangFei                          column=info:walk_place, timestamp=1583898332253, value=\xE6\x93\x8D\xE5\x9C\xBA\xE5\x8C\x97\xE9\x97\
                                   xA8                                                                                                 
 zhangSan                          column=info:count_time, timestamp=1583898330778, value=2020-03-11 11:45:12                          
 zhangSan                          column=info:walk_place, timestamp=1583898330778, value=\xE6\x93\x8D\xE5\x9C\xBA\xE5\x8D\x97\xE9\x97\
                                   xA8                                                                                                 
9 row(s) in 0.0480 seconds

hbase(main):007:0> 

3、hbase存储程序读出结果

rowkey:caoCao,列族:info,列:count_time,值:2020-03-11 11:45:52
rowkey:caoCao,列族:info,列:walk_place,值:操场东南北门
rowkey:guanYu,列族:info,列:count_time,值:2020-03-11 11:45:57
rowkey:guanYu,列族:info,列:walk_place,值:操场西北门
rowkey:liSi,列族:info,列:count_time,值:2020-03-11 11:45:17
rowkey:liSi,列族:info,列:walk_place,值:操场西南门
rowkey:liuBei,列族:info,列:count_time,值:2020-03-11 11:45:37
rowkey:liuBei,列族:info,列:walk_place,值:操场西门
rowkey:maChao,列族:info,列:count_time,值:2020-03-11 11:45:47
rowkey:maChao,列族:info,列:walk_place,值:操场南门
rowkey:wangWu,列族:info,列:count_time,值:2020-03-11 11:45:22
rowkey:wangWu,列族:info,列:walk_place,值:操场西南门
rowkey:xiaoQiang,列族:info,列:count_time,值:2020-03-11 11:45:27
rowkey:xiaoQiang,列族:info,列:walk_place,值:操场东门
rowkey:zhangFei,列族:info,列:count_time,值:2020-03-11 11:45:32
rowkey:zhangFei,列族:info,列:walk_place,值:操场北门
rowkey:zhangSan,列族:info,列:count_time,值:2020-03-11 11:45:12
rowkey:zhangSan,列族:info,列:walk_place,值:操场南门
发布了143 篇原创文章 · 获赞 12 · 访问量 8662

猜你喜欢

转载自blog.csdn.net/hyj_king/article/details/104794961