使用Spark streaming 连接 kafka分析并用Hbase保存结果
- kafka发送的数据是json格式
{"userId":20400,"day":"2017-03-01","begintime":1488326400000,"endtime":1488327000000,"data":[{"package":"com.browser1","activetime":60000},{"package":"com.browser","activetime":1207000}]}
{"userId":2000,"day":"2017-03-05","begintime":1488326400000,"endtime":1488327000000,"data":[{"package":"com.browser","activetime":120000}]}
- 项目maven 的配置
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>edu.zhku</groupId>
<artifactId>SparkStreaming</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>2.2.1</spark.version>
<scala.version>2.11.6</scala.version>
</properties>
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-xml</artifactId>
<version>2.11.0-M4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.0-beta-2</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
- 代码实现:使用了scala的json4s包和case类来解析json
package edu.zhku
import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
* <pre> <pre>
*/
object BehaviorHourly {
System.setProperty("hadoop.home.dir", "/data/install/apache/hadoop-2.9.0")
var zookeeperservers = "master:2181,slave1:2181,slave2:2181"
var tablename = "userHourly"
val hbaseconf: Configuration = HBaseConfiguration.create()
hbaseconf.set("hbase.zookeeper.quorum",zookeeperservers)
hbaseconf.set("hbase.zookeeper.property.clientPort", "2181")
var table: Table = _
case class apptimes(activetime:String, `package`:String)
case class UserHourly(userId:String, endtime:Long, data: List[(String,Long)])
case class log(userId:String, day:String, begintime:String, endtime:Long, data: List[apptimes])
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("behavior")
val ssc = new StreamingContext(conf, Seconds(3))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "master:9092,slave1:9092,slave2:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("behavior")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => record.value)
.map(value => {
implicit val formats: DefaultFormats.type = DefaultFormats
val json = parse(value)
json.extract[log]
}).window( Seconds(3600), Seconds(60))
.foreachRDD(
rdd => {
rdd.foreachPartition(partitionOfRecords => {
var connection: Connection = ConnectionFactory.createConnection(hbaseconf)
table = connection.getTable(TableName.valueOf(tablename))
partitionOfRecords.foreach(logData => {
val theput= new Put(Bytes.toBytes(String.valueOf(new Date().getTime)+"_"+logData.endtime))
theput.addColumn(Bytes.toBytes("info"),Bytes.toBytes("userId"),Bytes.toBytes(logData.userId.toString))
logData.data.foreach(
appTime => {
theput.addColumn(Bytes.toBytes("info"),Bytes.toBytes(appTime.`package`.toString),Bytes.toBytes(appTime.activetime.toString))
}
)
table.put(theput)
table.close()
})
})
}
)
ssc.start()
ssc.awaitTermination()
}
}