最近在学习es逛博客的时候,发现这篇文章原文链接,学习一下。
spark接入ES可以使用多种方式,常见类型如下。
本文主要介绍将case class 类对象写入ElasticSearch:也就是获取数据然后使用case class封装数据,然后在case class中选取一个字段当做 id,但是这个字段一定数据不能重复 要唯一。不指定ES自己也会生成id。
准备工作
第一步:
使用Apache Spark将数据写入到ElasticSearch中。本文使用的是类库是elasticsearch-hadoop
,其从2.1版本开始提供了内置支持Apache Spark的功能,使用elasticsearch-hadoop
之前,我们需要引入依赖:本文使用的版本是:6.3.2
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.3.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>6.3.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.3.2</version>
</dependency>
第二步:
提前创建好索引和type:索引-->sql_command type--> sql_info
在spark代码中设置ES的相关参数:ES的nodes节点 ip和端口,以及自动创建索引参数
val session: SparkSession = SparkSession.builder().config(conf)
.config("es.index.auto.create", "true")
.config("es.nodes", "ip1:9200,ip2:9200,ip2:9200")
.getOrCreate()
第三步:
调用写入ES的api: saveToEs(),并导入ES的相关类:import org.elasticsearch.spark._,这将使得所有的RDD拥有saveToEs
方法。下面我将一一介绍将不同类型的数据写入ElasticSearch中。调用 saveToEs()时可以指定id也可以不指定,不指定ES会自动生成,看自己需求。自己指定时需要提供一个不唯一的字段,如果没有自己可以生成一个,但是一定不能重复 。
//数据写入es
import org.elasticsearch.spark._
rddSource.saveToEs("sql_command/sql_info", Map("es.mapping.id" -> "md5id"))
案例代码
object Data2Es {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
if (args.length != 5) {
println(
""" |cn.missfresh.SparkStreaming2Es
|参数不合法,请传递正确的参数个数:
|brokers
|topic
|groupId
|seconds
|offtype
""".stripMargin)
sys.exit()
}
val Array(brokers, topic, groupId, seconds, offtype) = args
val conf = new SparkConf().setAppName("RTC_data2es_wangzh")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.kafka.consumer.poll.ms", "60000")
conf.set("spark.streaming.kafka.maxRatePerPartition", "500")
//设置es的相关参数
val session: SparkSession = SparkSession.builder().config(conf)
.config("es.index.auto.create", "true")
.config("es.nodes", "ip1:9200,iP2:9200,ip3:9200")
.getOrCreate()
val sc = session.sparkContext
val ssc: StreamingContext = new StreamingContext(sc, Seconds(seconds.toLong))
//设置kafka的相关参数
val topics = Array(topic)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> offtype, // latest
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//创建Kafka数据流
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topics, kafkaParams)
)
")
kafkaStream.foreachRDD(rdd=>{
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val fliterData = rdd.filter(_.value().split("\\{").length==2)
val rddSource= fliterData.map(x => {
val split = x.value().split("\\{")
//获得ip
val ip = IpUtil.getIp(x.value())
val jsonStr = "{" + split(1)
//解析json字符串,使用case class封装数据
val behavior= JsonParse.jsonParse(jsonStr, ip)
})
//数据写入es
import org.elasticsearch.spark._
rddSource.saveToEs("sql_command/sql_info", Map("es.mapping.id" -> "md5id"))
// 提交偏移量
kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
println("=============》 任务结束 ================》")
})
ssc.start()
ssc.awaitTermination()
}
}
/**
*创建对应字段的case class
*/
case class UserBehavior ( md5id :String,
application_id :String,
session_id :String,
user_ip_address:String,
logger_type :String,
logger_location:String,
command :String,
command_clean :String,
query_string :String,
current_time :String,
blg_user_name :String,
user_name :String,
ret :String,
mode_type :String,
processor_name :String,
last_command :String,
mryxblg_authorization_nabled :String,
mryxblg_command_monitoring:String)