版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/hjw199089/article/details/86099809
一、查询引擎测试压测demo实现逻辑
很久没写spark工程了,近期需要一个查询引擎测试压测工具,以hive(HDFS)中每日落盘的查询来压测引擎性能,正适合用spark读hdfs,结果落hive。小结个小demo吧
(1) 实现逻辑
- spark读取HDFS中存储的随机某天(以参数形式传入)的查询(
hive_test.engine_queryjson
表的第二列即为查询) - 以2秒为间隔向引擎提交查询
- 每隔2秒轮询查询结果,5分钟查询未完毕视为查询超时
- 将查询执行状态写入状态结果表(
hive_test.query_result_info
)
(2)提交代码至spark执行.
- 打包
- spark-submit提交jar包
spark-submit参考spark官文:Submitting Applications
这里mian函数设有一个日期参数,格式”20190106“,标识hive_test.engine_queryjson
某天粒度分区.
备注:cluster mode读取hive 需通过–files 指定hive-site, 如果集群已配置可忽略
spark-submit \
--master yarn \
--deploy-mode cluster \
--files hdfs://xxxxx/hive-site.xml\ 指定hive-site,集群已配置可忽略
--queue 这里队列 \
--class com.learn.QuerySender \
./query_sender-1.0-SNAPSHOT-jar-with-dependencies.jar \
20190106
(3) 结果落Hive表用于分析
执行完毕后,每个查询的执行状态会写入hive_test.query_result_info
,查询该表做相关统计或分析即可
二、主要代码说明.
(1) hive_test.query_result_info
建表语句.
CREATE EXTERNAL TABLE `hive_test.query_result_info `(
`query ` string COMMENT 'query',
`code` int COMMENT 'result code',
`info` string COMMENT 'query info')
COMMENT 'query result table'
PARTITIONED BY (
`dt` string COMMENT 'dt')
(2) 主要代码片段
package com.learn
import com.alibaba.fastjson.JSON
import com.learn.util.HttpUtil
import org.slf4j.LoggerFactory
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
/**
* query result
* @param queryJson query
* @param code result code
* @param info query info
*/
case class QueryResult (
query: String, // query json
code: Int, // result code
info: String // query info
)
object QuerySender {
val queryUrl = "xxx"
val queryResultUrl = "XXX"
val queryTimeout = 300000 // 5minute timeout
val queryPolingInterval = 2000 // 2秒中轮询超时结果
val queryInterval = 2000 // 2秒查询间隔
def main(args: Array[String]) {
val logger = LoggerFactory.getLogger(QuerySender.getClass)
val dt = args.apply(0)
println("dt=" + dt)
//val hadoopUserName = args.apply(1)
//val hadoopUserPassWord = args.apply(2)
//println("hadoopUserName=" + hadoopUserName)
//println("hadoopUserPassWord=" + "hadoopUserPassWord")
//System.setProperty("HADOOP_USER_NAME", hadoopUserName)
//System.setProperty("HADOOP_USER_PASSWORD", hadoopUserPassWord)
val conf = new SparkConf()
conf.setAppName("HdfsReader")
//conf.setMaster("local") // 本地测试
// ---读取Hdfs-这里示范读取hdfs,亦可以改为读hive表(具体代码这里不贴了)--
val sc = SparkContext.getOrCreate(conf)
var hdfsFile = sc.textFile(s"hdfs://xxxx/engine_queryjson/dt=$dt/000000_0")
var hivedata = hdfsFile.map(_.split("\t")).map(e => (e(1), e(2),e(0)))
println(hivedata.first()._2)// 第二列为query字符串
// ---提交查询---
var queryResults = scala.collection.mutable.ArrayBuffer[QueryResult]()
println("提交查询.....")
hivedata.foreach(v => {
queryResults.append(submmitQuery(v._2))
Thread.sleep(queryInterval) // 2秒查询间隔
})
// ---查询结果写Hive---
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
queryResults.toDF("query", "code", "info").registerTempTable("queryResultTempTable")
println("start insert overwrite table....")
sqlContext.sql("set hive.exec.dynamic.partition=true")
sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")
sqlContext.sql("insert overwrite table hive_test.query_result_info partition(dt) " +
s"select query, code, info, $dt " +
"from queryResultTempTable ")
sc.stop()
}
/**
* 提交查询
* 提交查询,这里假设返回信息meta中有code和status标识查询状态
* code:2超时,1失败, 0成功
* HttpUtil为以java实现的http工具包(本工程为java、scala混合编程,此不详述,见pom)
* 每隔2秒轮询查询结果,查询超时时间5分钟
* @param query. 查询
* @return 查询结果
*/
def submmitQuery(query: String): QueryResult = {
val startTime = System.currentTimeMillis()
val result = scala.collection.mutable.Map[String, String]()
val responseDirect = HttpUtil.postJson(queryUrl, query) //提交查询,responseDirect为返回状态
println("查询状态:" + responseDirect.toString)
// 解析状态
val jsonResponse = JSON.parseObject(responseDirect)
val code = jsonResponse.getJSONObject("meta").getIntValue("code")
val satus = jsonResponse.getJSONObject("meta").getString("satus")
val msg = jsonResponse.getJSONObject("meta").getString("msg")
if (code == 1 || code == 0) {// 2超时,1失败, 0成功
// 查询成功、失败
return QueryResult(query, code, msg)
} else {
while(true) {
if (System.currentTimeMillis() - startTime >= queryTimeout) {
// 5分钟超时
return QueryResult(query, 2, "timeout")
} else {
val responseRetry = HttpUtil.postJson(queryResultUrl, query) //超时,开始轮询查询结果
val code = jsonResponse.getJSONObject("meta").getIntValue("code")
val satus = jsonResponse.getJSONObject("meta").getString("satus")
val msg = jsonResponse.getJSONObject("meta").getString("msg")
if (code == 1 || code == 0) {
// 查询成功、失败
return QueryResult(query, code, msg)
}
Thread.sleep(queryPolingInterval) // 2秒轮询结果
}
}
return QueryResult(queryJson, 2, "timeout")
}
}
}
(3) pom
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.learn</groupId>
<artifactId>query_sender</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>query_sender</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.2.0</spark.version>
<scala.version>2.11.6</scala.version>
<fastjson.version>1.2.29</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.3</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!--这里不添加-take-->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.13</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</project>
三、参考
- spark官文-Submitting Applications