pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wedoctor.spark</groupId>
<artifactId>spark-0708</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<hadoop.version>2.8.1</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<!-- 导入scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 导入spark的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!--导入sparksql的依赖jar包-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.41</version>
</dependency>
<!--导入 configFactory的jar包-->
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.0</version>
</dependency>
<!--导入streaming的jar包-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--导入redis的客户端jedis jar包-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 指定编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<!--<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<!– 指定main方法 –>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>-->
</plugins>
</build>
</project>
redis-checkpoint
JedisUtils
package com.wedoctor.sparkstreaming
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object JedisUtils {
def createJedisPool(host: String, port: Int) = {
// 创建一个连接池的配置对象
val config = new JedisPoolConfig()
//最大空闲连接数, 默认8个
config.setMaxIdle(1000)
//最大连接数, 默认8个
config.setMaxTotal(2000)
// 获取一个连接池
val pool: JedisPool = new JedisPool(config, host, port)
pool
}
private def getJedis(host: String, port: Int): Jedis = {
createJedisPool(host,port).getResource
}
def apply(host: String, port: Int): Jedis = {
getJedis(host, port)
}
}
RedisCheckpoint
package com.wedoctor.sparkstreaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object RedisCheckpoint {
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]") // cores 至少是2个
.setAppName(this.getClass.getSimpleName)
val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
val socket: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.253.xx", 9999)
val curDat= socket.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
curDat.foreachRDD(rdd => {
rdd.foreachPartition(it=>{
val jedis = JedisUtils.apply("192.168.253.xx",6379)
it.foreach(wc=>jedis.hincrBy("streaming_wc",wc._1,wc._2))
jedis.close()
})
})
ssc.start()
// 挂起程序
ssc.awaitTermination()
}
}
mysql-checkpoint
application.conf
db.url="jdbc:mysql://bigdata1:3306/test?characterEncoding=utf-8" db.driver="com.mysql.jdbc.Driver" db.password="123456" db.user="root" db.tablename="streaming_wc"
MysqlCheckpoint
package com.wedoctor.sparkstreaming
import java.sql.{DriverManager, PreparedStatement, ResultSet}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object MysqlCheckpoint {
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]") // cores 至少是2个
.setAppName(this.getClass.getSimpleName)
val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
val socket: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.253.xx", 9999)
socket.foreachRDD(rdd => {
val config: Config = ConfigFactory.load()
// rdd非空 ,才统计
if (!rdd.isEmpty()) {
// 当前批次的数据
val curDat: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
curDat.foreachPartition { case it =>
// 获取连接
val conn = DriverManager.getConnection(
config.getString("db.url"),
config.getString("db.user"),
config.getString("db.password"))
// 获取操作的数据库表
val table = config.getString("db.tablename")
it.foreach{
case (word, curCnts) => {
val pstm = conn.prepareStatement(s"create table if not exists ${table} (word varchar(20), cnts int )")
pstm.executeUpdate()
// 查询 key 是否存在
val pstm1: PreparedStatement = conn.prepareStatement(s"select * from ${table} where word = ?")
pstm1.setString(1, word)
// 查询
val set: ResultSet = pstm1.executeQuery()
if (set.next()) {// 有数据
// 获取次数
val hisDat = set.getInt("cnts")
val total = hisDat + curCnts
// 更新历史数据
val pstm2: PreparedStatement = conn.prepareStatement(s"update ${table} set cnts = ? where word = ?")
pstm2.setInt(1,total)
pstm2.setString(2,word)
pstm2.executeUpdate()
} else { // key在历史数据中不存在 直接入库
val pstm3 = conn.prepareStatement(s"insert into ${table} values (?,?)")
pstm3.setString(1,word)
pstm3.setInt(2,curCnts)
pstm3.execute()
}
}}
}
}
})
ssc.start()
// 挂起程序
ssc.awaitTermination()
}
}