Spark如何自动提交计算任务
引言
最近在接触分布式计算这方面,但是遇到了一个问题,就是通常提交一个Spark计算任务,都是打包后通过Spark提交工具进行提交,但是,这样子很不方便,能不能把Spark任务的提交设计成调用型的呢?
Launcher
Spark Launcher是一个可以代替Spark工具进行提交的工具包,主要是利用java的Process,即SHELL执行者,进行命令的执行。通过Process可以读取shell命令的返回数据,因此,能够知道任务的执行结果。说了这么多,下面贴出代码:
/**
* @author linxu
* @date 2019/7/31
* @tips: well done forever.
* a launcher which used to launch the spark task .
* can add principal verify here, So don't worried about the ticket timeout.
*/
@Data
@Slf4j
public class SmartLauncher {
/**
* default master is yarn-client
*/
private String master = "yarn-client";
/**
* default-name is current time.
*/
private String appName = DateUtil.getCurrentTime();
private SparkLauncher sparkLauncher = new SparkLauncher();
public SmartLauncher(String mainClass, String appResource, String[] sparkArgs, String master, String name) {
//set default
sparkLauncher.setMaster(this.master).setAppName(appName);
if (master != null) {
sparkLauncher.setMaster(master);
}
if (name != null) {
sparkLauncher.setAppName(name);
appName = name;
}
if (mainClass == null || appResource == null) {
throw new NullPointerException("MainClass or appResource path should not be null!");
}
sparkLauncher.setMainClass(mainClass).setAppResource(appResource).addAppArgs(sparkArgs);
log.info("CONSTRUCT A LAUNCHER FINISHED!");
}
/**
* using embed process.
*/
public int launch() throws IOException, InterruptedException {
Process process = sparkLauncher.launch();
long startTime = System.currentTimeMillis();
//launch a thread to collect the error info.
//TODO use thread pool to replace it.
new Thread(new ISRRunnable(process.getErrorStream())).start();
//may throw a interrupted ex
int exitCode = process.waitFor();
log.info("SPARK TASK ---{}--- FINISHED ,COST {} MS WITH CODE {}.", this.appName, System.currentTimeMillis() - startTime, exitCode);
return exitCode;
}
}
- 主要是通过构建Spark Launcher对象进行任务提交
- 通过Process类,对任务提交后的SHELL命令的输出进行读取
以下贴出 ISRRunnable
游标线程的实现:
public class ISRRunnable implements Runnable {
private final BufferedReader reader;
private ISRRunnable(BufferedReader reader) {
this.reader = reader;
}
public ISRRunnable(InputStream inputStream) {
this(new BufferedReader(new InputStreamReader(inputStream)));
}
public void run() {
String line = null;
try {
line = reader.readLine();
while (line != null) {
//这里可以做其它处理,比如把数据显示到界面、打印、持久化处理等等。
System.out.println(line);
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
System.out.println("There is a exception when getting log: " + e);
}
}
}
集成SpringBoot
在上面,我们做了核心的部分,就是SmartLauncher,这个类主要负责任务的提交、结果回调。那么,接下来,我们以SpringBoot框架为例子,整合进去。当然,你也可以用其它框架进行整合,我这里只是举一个例子。
- 首先,创建一个模型,用于接收页面的数据
/**
* @author linxu
* @date 2019/7/31
* @tips: well done forever.
* spark task model
*/
@Data
public class SparkTask {
private String appName;
private String master;
private String appPath;
private String mainClass;
private String[] args;
}
- 创建控制器
/**
* @author linxu
* @date 2019/8/1
* @tips: well done forever.
*/
@RestController
@RequestMapping("/ha")
public class RequestController {
@PostMapping("/commitTask")
public String commitSparkTask(@RequestBody SparkTask task) {
SmartLauncher smartLauncher = new SmartLauncher(task.getMainClass(), task.getAppPath(), task.getArgs(), task.getMaster(), task.getAppName());
int exitCode = 0;
try {
smartLauncher.launch();
} catch (IOException e) {
return "FAILED WITH IO EX.";
} catch (InterruptedException e) {
return "FAILED WITH P EX.";
}
return "COMMIT OK. WITH CODE:" + exitCode;
}
}
接下来就可以提交任务了,这里,我是直接采用POSTMan进行提交任务:
- 这是一份JSON任务数据
{
"appName":"online_task_01",
"master":"提交类型",//对应SparkContext.setMaster()
"appPath":"/yourpath/XX.jar",//这里是Spark任务的jar包的路径,可使用HDFS文件系统
"mainClass":"com.xx.TableInputData",
#这里可以添加一些driver、executor的配置,如内存分配,数量限制等
"args":[
]
}
执行的结果可以打开log文件进行查看:
2019-08-01 17:09:01.925 INFO 373 --- [io-12345-exec-5] c.qgailab.linxu.component.SmartLauncher : CONSTRUCT A LAUNCHER FINISHED!
Warning: Ignoring non-spark config property: hadoop_server_path=/opt/huawei/Bigdata/hadoop/
19/08/01 17:09:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/08/01 17:09:04 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
[Stage 0:> (0 + 2) / 2]
[Stage 0:=============================> (1 + 1) / 2]
2019-08-01 17:09:37.826 INFO 373 --- [io-12345-exec-5] c.qgailab.linxu.component.SmartLauncher : SPARK TASK ---online_task_02--- FINISHED ,COST 35900 MS WITH CODE 0.
附录
依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.qgailab</groupId>
<artifactId>linxu</artifactId>
<version>0.0.1</version>
<name>linxu</name>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-launcher_2.10</artifactId>
<version>1.5.1</version>
<exclusions>
<exclusion>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web-services</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>