一、说明
1、需求分析
实时定位系统:实时定位某个用户的具体位置,将最新数据进行存储;
2、具体操作
sparkStreaming从kafka消费到原始用户定位信息,进行分析。然后将分析之后且满足需求的数据按rowkey=用户名进行Hbase存储;这里为了简化,kafka消费出的原始数据即是分析好之后的数据,故消费出可以直接进行存储;
3、组件版本
组件 | 版本 |
---|---|
kafka | kafka_2.10-0.10.2.1 |
spark | spark-2.2.0-bin-hadoop2.7 |
hbase | hbase-1.2.6 |
二、业务操作
1、在hbase中首先创建一个表
hbase(main):003:0> create 'location_sure','info'
2、pom依赖
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency><!-- Spark Streaming Kafka -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.gavaghan</groupId>
<artifactId>geodesy</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_2.11</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.codehaus.jettison/jettison -->
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/net.sf.json-lib/json-lib -->
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<!-- orcale驱动 -->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>12.1.0.1-atlassian-hosted</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.6</version>
</dependency>
</dependencies>
3、工具类
package com.cn.util
import java.text.SimpleDateFormat
import java.util.Date
object TimeUtil {
//时间转化为时间戳
def tranTimeToLong(tm:String) :Long={
val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val dt = fm.parse(tm)
val aa = fm.format(dt)
val tim: Long = dt.getTime()
tim
}
//时间戳转化为时间
def tranTimeToString(tm:String) :String={
val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val tim = fm.format(new Date(tm.toLong))
tim
}
}
package com.cn.util
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
object HbaseUtils extends Serializable{
/**
* @param zkList
* @param port
* @return
*/
def getHBaseConn(zkList: String, port: String): Connection = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", zkList)
conf.set("hbase.zookeeper.property.clientPort", port)
val connection = ConnectionFactory.createConnection(conf)
connection
}
}
4、造数据到kafka
package com.cn.util
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.codehaus.jettison.json.JSONObject
import scala.util.Random
/**
* 编写一个提交数据到kafka集群的producer
* 模拟场景:
* 统计一些用户实时步行的总步数,每隔5s统计一次,包括某个用户新统计时的时间、所在地点
*/
object KafkaEventProducer {
//用户
private val users = Array(
"zhangSan", "liSi",
"wangWu", "xiaoQiang",
"zhangFei", "liuBei",
"guanYu", "maChao",
"caoCao", "guanYu"
)
private var pointer = -1
//随机获得用户
def getUser(): String = {
pointer = (pointer + 1) % users.length
users(pointer)
}
val random = new Random()
//获取统计时间
def getTime(): Long = {
System.currentTimeMillis()
}
//获取行走地点
val walkPlace = Array(
"操场南门", "操场东门", "操场北门", "操场西门", "操场东南门", "操场西北门", "操场西南门", "操场东南北门"
)
def getWalkPlace(): String = {
walkPlace(random.nextInt(walkPlace.length))
}
def main(args: Array[String]): Unit = {
val topic = "topic_walkCount"
val brokers = "master:6667,slaves1:6667,slaves2:6667"
//设置属性,配置
val props = new Properties()
props.setProperty("bootstrap.servers", brokers)
props.setProperty("metadata.broker.list", brokers)
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//生成producer对象
val producer = new KafkaProducer[String, String](props)
//传输数据
while (true) {
val event = new JSONObject()
event.put("user", getUser())
.put("count_time", TimeUtil.tranTimeToString(getTime().toString))
.put("walk_place", getWalkPlace())
println(event.toString())
//发送数据
producer.send(new ProducerRecord[String, String](topic, event.toString))
Thread.sleep(5000)
}
}
}
5、分析处理
package com.cn.sparkStreaming
import com.cn.util.{HbaseUtils, RedisUtils, TimeUtil}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TimeUnit
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.codehaus.jettison.json.JSONObject
object kafka2sparkStreaming2Hbase {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("kafka2sparkStreaming2Hbase")
.setMaster("local[1]")
//.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//设置流数据每批的时间间隔为2s
val ssc = new StreamingContext(conf, Seconds(1))
//控制日志输出级别
ssc.sparkContext.setLogLevel("WARN") //WARN,INFO,DEBUG
ssc.checkpoint("checkpoint")
val topic = "topic_walkCount"
val groupId = "t02"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "master:6667,slaves1:6667,slaves2:6667",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest", // 初次启动从最开始的位置开始消费
"enable.auto.commit" -> (false: java.lang.Boolean) // 自动提交设置为 false
)
val topics = Array(topic)
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent, //均匀分发到executor
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD(rdd => {
// 获取每一个分区的消费的偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition(partitions => {
partitions.foreach(records => {
val record = new JSONObject(records.value())
val user = record.getString("user")
val countTime = record.getString("count_time")
val walkPlace = record.getString("walk_place")
val tableName=TableName.valueOf("location_sure")
//列簇名
val columnFamily=Bytes.toBytes("info")
//列名
val count_time=Bytes.toBytes("count_time")
val walk_place=Bytes.toBytes("walk_place")
val connection = HbaseUtils.getHBaseConn("master,slaves1,slaves2","2181")
val table = connection.getTable(tableName)
val put = new Put(Bytes.toBytes(user))
//注意,countTime、walkPlace一定要是string类型,不是要toString转化,否则存入乱码;
put.addColumn(columnFamily,count_time,Bytes.toBytes(countTime))
put.addColumn(columnFamily,walk_place,Bytes.toBytes(walkPlace))
table.put(put)
println("insert hbase success!")
})
})
// 手动提交偏移量
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
ssc.start()
ssc.awaitTermination()
}
}
三、测试
1、数据源
{"user":"zhangSan","count_time":"2020-03-11 11:45:12","walk_place":"操场南门"}
{"user":"liSi","count_time":"2020-03-11 11:45:17","walk_place":"操场西南门"}
{"user":"wangWu","count_time":"2020-03-11 11:45:22","walk_place":"操场西南门"}
{"user":"xiaoQiang","count_time":"2020-03-11 11:45:27","walk_place":"操场东门"}
{"user":"zhangFei","count_time":"2020-03-11 11:45:32","walk_place":"操场北门"}
{"user":"liuBei","count_time":"2020-03-11 11:45:37","walk_place":"操场西门"}
{"user":"guanYu","count_time":"2020-03-11 11:45:42","walk_place":"操场南门"}
{"user":"maChao","count_time":"2020-03-11 11:45:47","walk_place":"操场南门"}
{"user":"caoCao","count_time":"2020-03-11 11:45:52","walk_place":"操场东南北门"}
{"user":"guanYu","count_time":"2020-03-11 11:45:57","walk_place":"操场西北门"}
2、hbase存储展示
hbase(main):006:0> scan 'location_sure'
ROW COLUMN+CELL
caoCao column=info:count_time, timestamp=1583898354092, value=2020-03-11 11:45:52
caoCao column=info:walk_place, timestamp=1583898354092, value=\xE6\x93\x8D\xE5\x9C\xBA\xE4\xB8\x9C\xE5\x8D\
x97\xE5\x8C\x97\xE9\x97\xA8
guanYu column=info:count_time, timestamp=1583898357267, value=2020-03-11 11:45:57
guanYu column=info:walk_place, timestamp=1583898357267, value=\xE6\x93\x8D\xE5\x9C\xBA\xE8\xA5\xBF\xE5\x8C\
x97\xE9\x97\xA8
liSi column=info:count_time, timestamp=1583898330817, value=2020-03-11 11:45:17
liSi column=info:walk_place, timestamp=1583898330817, value=\xE6\x93\x8D\xE5\x9C\xBA\xE8\xA5\xBF\xE5\x8D\
x97\xE9\x97\xA8
liuBei column=info:count_time, timestamp=1583898337245, value=2020-03-11 11:45:37
liuBei column=info:walk_place, timestamp=1583898337245, value=\xE6\x93\x8D\xE5\x9C\xBA\xE8\xA5\xBF\xE9\x97\
xA8
maChao column=info:count_time, timestamp=1583898354005, value=2020-03-11 11:45:47
maChao column=info:walk_place, timestamp=1583898354005, value=\xE6\x93\x8D\xE5\x9C\xBA\xE5\x8D\x97\xE9\x97\
xA8
wangWu column=info:count_time, timestamp=1583898330892, value=2020-03-11 11:45:22
wangWu column=info:walk_place, timestamp=1583898330892, value=\xE6\x93\x8D\xE5\x9C\xBA\xE8\xA5\xBF\xE5\x8D\
x97\xE9\x97\xA8
xiaoQiang column=info:count_time, timestamp=1583898330990, value=2020-03-11 11:45:27
xiaoQiang column=info:walk_place, timestamp=1583898330990, value=\xE6\x93\x8D\xE5\x9C\xBA\xE4\xB8\x9C\xE9\x97\
xA8
zhangFei column=info:count_time, timestamp=1583898332253, value=2020-03-11 11:45:32
zhangFei column=info:walk_place, timestamp=1583898332253, value=\xE6\x93\x8D\xE5\x9C\xBA\xE5\x8C\x97\xE9\x97\
xA8
zhangSan column=info:count_time, timestamp=1583898330778, value=2020-03-11 11:45:12
zhangSan column=info:walk_place, timestamp=1583898330778, value=\xE6\x93\x8D\xE5\x9C\xBA\xE5\x8D\x97\xE9\x97\
xA8
9 row(s) in 0.0480 seconds
hbase(main):007:0>
3、hbase存储程序读出结果
rowkey:caoCao,列族:info,列:count_time,值:2020-03-11 11:45:52
rowkey:caoCao,列族:info,列:walk_place,值:操场东南北门
rowkey:guanYu,列族:info,列:count_time,值:2020-03-11 11:45:57
rowkey:guanYu,列族:info,列:walk_place,值:操场西北门
rowkey:liSi,列族:info,列:count_time,值:2020-03-11 11:45:17
rowkey:liSi,列族:info,列:walk_place,值:操场西南门
rowkey:liuBei,列族:info,列:count_time,值:2020-03-11 11:45:37
rowkey:liuBei,列族:info,列:walk_place,值:操场西门
rowkey:maChao,列族:info,列:count_time,值:2020-03-11 11:45:47
rowkey:maChao,列族:info,列:walk_place,值:操场南门
rowkey:wangWu,列族:info,列:count_time,值:2020-03-11 11:45:22
rowkey:wangWu,列族:info,列:walk_place,值:操场西南门
rowkey:xiaoQiang,列族:info,列:count_time,值:2020-03-11 11:45:27
rowkey:xiaoQiang,列族:info,列:walk_place,值:操场东门
rowkey:zhangFei,列族:info,列:count_time,值:2020-03-11 11:45:32
rowkey:zhangFei,列族:info,列:walk_place,值:操场北门
rowkey:zhangSan,列族:info,列:count_time,值:2020-03-11 11:45:12
rowkey:zhangSan,列族:info,列:walk_place,值:操场南门