Azkaban
1 Azkaban概述
1.1 Azkaban是什么?
Azkaban是工作流调度系统。调度的是shell脚本、java程序、mr程序、hive脚本
1.2 为什么需要工作流调度系统?
如果不用工作流调度系统的,①那就需要一个一个的去手动执行脚本;②并且可能多个任务之间存在不同的依赖关系(如果一个任务有很长的时间,那么它下面的任务就要等着它执行完毕,再去执行);③有些任务是每天都要执行。
1.3 常见的工作流调度系统?
(1)简单的任务调度:linux自带的crontab
crontab只能执行简单的脚本或命令。复杂的工作流任务不能,
(2)复杂的任务调度:
工作流调度框架 | 说明 |
---|---|
Ooize | 功能全、重量级、CDH平台,使用HUE可视化工作界面 |
Azkaban | 轻量级、使用yaml配置文件 |
Airflow | 使用python开发,需要写python脚本 |
DolphinSchedul | 国产、纳入apache项目、功能强大 |
2 Azkaban入门
2.1 Azkaban架构
2.2 Azkaban部署模式
(1)单机模式
azkaban的WebServer和ExecutorServer在一个节点的一个进程上。
(2)集群模式
- WebServer和ExecutorServer在不同的进程上
- 可以存在多个Executor Server,目的是负载均衡、容错
2.3 安装部署
①安装azkaban
步骤1:上传安装包、解压安装包到/opt/module/azkaban
[atguigu@hadoop102 software]$ mkdir /opt/module/azkaban
[atguigu@hadoop102 software]$ tar -zxvf azkaban-db-3.84.4.tar.gz -C /opt/module/azkaban/
[atguigu@hadoop102 software]$ tar -zxvf azkaban-exec-server-3.84.4.tar.gz -C /opt/module/azkaban/
[atguigu@hadoop102 software]$ tar -zxvf azkaban-web-server-3.84.4.tar.gz -C /opt/module/azkaban/
# 修改安装包名称
[atguigu@hadoop102 azkaban]$ mv azkaban-exec-server-3.84.4/ azkaban-exec
[atguigu@hadoop102 azkaban]$ mv azkaban-web-server-3.84.4/ azkaban-web
②配置MySQL
步骤2:启动MySQL
[atguigu@hadoop102 azkaban]$ mysql -uroot -p123456
步骤3:创建azkaban数据库
mysql> create database azkaban;
步骤4:创建azkaban用户(可以不做)
--设置密码有效长度4位及以上
mysql> set global validate_password_length=4;
--设置密码策略最低级别
mysql> set global validate_password_policy=0;
--创建azkaban用户
mysql> CREATE USER 'azkaban'@'%' IDENTIFIED BY '000000';
--赋予azkaban用户增删改查的权限
mysql> GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;
步骤5:初始化azkaban表
--建表的语句都在azkaban-db中
source /opt/module/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql
--退出MySQL
quit;
步骤6:更改MySQL包大小,防止azkaban连接MySQL阻塞
[atguigu@hadoop102 software]$ sudo vim /etc/my.cnf
在my.cnf的下面添加配置信息
[mysqld]
max_allowed_packet=1024M
步骤7:重启MySQL
sudo systemctl restart mysqld
③配置Executor Server
步骤1:修改配置文件:azkaban.properties
[atguigu@hadoop102 azkaban]$ vim /opt/module/azkaban/azkaban-exec/conf/azkaban.properties
#...时区
default.timezone.id=Asia/Shanghai
#...azkaban webServer的地址
azkaban.webserver.url=http://hadoop102:8081
# 增加azkaban exexutor的端口号
为什么呢?executor默认会随机生成一个端口,不方便管理,所以需要新增
executor.port=12321
#...MySQL的配置信息、Azkaban仅支持MySQL关系型数据库
database.type=mysql
mysql.port=3306
mysql.host=hadoop102
mysql.database=azkaban
mysql.user=azkaban
mysql.password=000000
mysql.numconnections=100
步骤2:同步azkaban-exec到所有节点
生产环境下,并不用都配置executor server,配置三个即可~
[atguigu@hadoop102 azkaban]$ xsync /opt/module/azkaban/azkaban-exec
步骤3:启动executor server,必须到/opt/module/azkaban/azkaban-exec路径下
为什么必须要到/opt/module/azkaban/azkaban-exec路径下呢?
因为会用到一些当前目录的文件,配置中有许多的参数是相对路径。
[atguigu@hadoop102 azkaban-exec]$ bin/start-exec.sh
[atguigu@hadoop103 azkaban-exec]$ bin/start-exec.sh
[atguigu@hadoop104 azkaban-exec]$ bin/start-exec.sh
步骤4:启动成功的标志:
注意了:有上面的两个进程,并不代表一定启动成功了。
①/opt/module/azkaban/azkaban-exec路径下出现配置的端口号文件。
②MySQL的azkaban库中的executors表中出现配置的executor
下面这张图是配置了三个executor server。
其中active刚开始是0,表示未激活
active是1,表示已经激活
步骤4:激活executor
[atguigu@hadoop102 azkaban-exec]$ curl -G "hadoop102:12321/executor?action=activate" && echo
[atguigu@hadoop103 azkaban-exec]$ curl -G "hadoop103:12321/executor?action=activate" && echo
[atguigu@hadoop104 azkaban-exec]$ curl -G "hadoop104:12321/executor?action=activate" && echo
激活成功都会出现
{
"status":"success"}
④配置Web Server
步骤1:配置文件azkaban.properties
[atguigu@hadoop102 azkaban]$ vim /opt/module/azkaban/azkaban-web/conf/azkaban.properties
修改属性
# azkaban的时区
default.timezone.id=Asia/Shanghai
# MySQL配置
database.type=mysql
mysql.port=3306
mysql.host=hadoop102
mysql.database=azkaban
mysql.user=azkaban
mysql.password=000000
mysql.numconnections=100
# azkaban的executor选择器 过滤器
azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
azkaban.executorselector.comparator.NumberOfAssignedFlowComparator=1
azkaban.executorselector.comparator.Memory=1
azkaban.executorselector.comparator.LastDispatched=1
azkaban.executorselector.comparator.CpuUsage=1
- StaticRemainingFlowSize:正在排队的任务数
- CpuStatus:CPU的占用情况
- MinimumFreeMemory:内存占用情况。测试环境,必须将MinimumFreeMemory删除掉,否则它会认为集群资源不够,不执行。
步骤2:修改azkaban-users.xml
- user:用户,用来登录web页面的用户名和密码和所属的角色(所有的权限)
- role:角色,是权限的集合
- promission:权限
需要创建web用户,用来登录azkaban-web页面管理
<azkaban-users>
<user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/>
<user password="metrics" roles="metrics" username="metrics"/>
<user password="atguigu" roles="admin" username="atguigu"/>
<role name="admin" permissions="ADMIN"/>
<role name="metrics" permissions="METRICS"/>
</azkaban-users>
步骤3:启动web server,需要到/opt/module/azkaban/azkaban-web路径下
[atguigu@hadoop102 azkaban-web]$ bin/start-web.sh
步骤4:访问http://hadoop102:8081,使用上面创建的用户登录
2.4 Work Flow案例实操
①Hello World案例
步骤1:在windows环境新建azkaban.project文件,文件内容
azkaban-flow-version: 2.0
1.0:表示azkaban-flow-version使用的是k-v格式解析work flow文件
2.0:注明使用yaml配置文件格式
这里并不是azkaban框架的版本
步骤2:新建basic.flow文件,文件内容\
nodes:一个节点就是一个工作单元。
name:表示每个工作任务单元名称
type:表示类型:command表示命令,如果要执行多个命令,可以将多个命令写到脚本中
config:表示job配置:command后面是具体的命令
nodes:
- name: jobA
type: command
config:
command: echo "Hello World"
步骤3:将azkaban.project和basic.flow文件压缩到一个zip文件,文件必须是英文名。
first.zip
步骤4:在webServer新建项目
步骤5:给项目名称命名和添加项目描述
步骤6:上传first.zip文件
步骤7:选择上传的文件
步骤8:执行任务流
步骤9:在日志中查看运行结果
②作业依赖案例(dependsOn)
需求:JobA和JobB执行完了,才能执行JobC
步骤1:basic.flow文件
其中dependsOn:表示依赖的工作单元。后面跟的是数组形式 - JobA - JobB
nodes:
- name: jobC
type: command
# jobC 依赖 JobA和JobB
dependsOn:
- jobA
- jobB
config:
command: echo "I’m JobC"
- name: jobA
type: command
config:
command: echo "I’m JobA"
- name: jobB
type: command
config:
command: echo "I’m JobB"
步骤2:将修改后的basic.flow和azkaban.project压缩成second.zip文件
步骤3:重复HelloWorld后续步骤
③自动失败重试案例(retries、retry.backoff)
需求:如果执行任务失败,需要重试3次,重试的时间间隔10000ms
步骤1:basic.flow文件
其中retries:表示失败重试的次数
retry.backoff:表示失败重试的时间间隔,单位是毫秒
nodes:
- name: JobA
type: command
config:
command: sh /not_exists.sh
retries: 3
retry.backoff: 10000
也可是产看上图的Log,可以看到是失败了4次
步骤2:也可以在Flow全局配置中添加任务失败重试配置,此时重试配置会应用到所有的job
config:
retries: 3
retry.backoff: 10000
nodes:
- name: JobA
type: command
config:
command: sh /not_exists.sh
④手动失败重试案例
需求:JobA=》JobB(依赖于A)=》JobC=》JobD=》JobE=》JobF。生产环境,任何Job都有可能挂掉,可以根据需求执行想要执行的Job。
步骤1:编译配置流basic.flow
nodes:
- name: JobA
type: command
config:
command: echo "This is JobA."
- name: JobB
type: command
dependsOn:
- JobA
config:
command: echo "This is JobB."
- name: JobC
type: command
dependsOn:
- JobB
config:
command: echo "This is JobC."
- name: JobD
type: command
dependsOn:
- JobC
config:
command: echo "This is JobD."
- name: JobE
type: command
dependsOn:
- JobD
config:
command: echo "This is JobE."
- name: JobF
type: command
dependsOn:
- JobE
config:
command: echo "This is JobF."
步骤2:方式①失败
Enable和Disable下面都分别有如下参数:
Parents:该作业的上一个任务
Ancestors:该作业前的所有任务
Children:该作业后的一个任务
Descendents:该作业后的所有任务
Enable All:所有的任务
3 Azkaban进阶
3.1 JavaProcess作业类型案例
JavaProcess类型可以运行一个自定义主类方法,type类型为javaprocess,可用的配置为:
Xms:最小堆
Xmx:最大堆
classpath:类路径
java.class:要运行的Java对象,其中必须包含Main方法
main.args:main方法的参数
案例:
1)新建一个azkaban的maven工程
2)创建包名:com.atguigu
3)创建AzTest类
package com.atguigu;
public class AzTest {
public static void main(String[] args) {
System.out.println("This is for testing!");
}
}
4)打包成jar包azkaban-1.0-SNAPSHOT.jar
5)新建testJava.flow,内容如下
nodes:
\- name: test_java
type: javaprocess
config:
Xms: 96M
Xmx: 200M
java.class: com.atguigu.AzTest
6)将Jar包、flow文件和project文件打包成javatest.zip
7)创建项目=》上传javatest.zip =》执行作业=》观察结果
3.2 条件工作流案例
根据条件指定是否运行某些作业。
默认情况是:A和B都成功,才会运行C
但是也存在一些情况:A或B成功,就会运行C
①运行时参数案例(condition)
(1)父Job将参数写入JOB_OUTPUT_PROP_FILE环境变量所指向的文件
(2)子Job使用 **${jobName:param}**来获取父Job输出的参数并定义执行条件
(3)支持的条件运算符:
== 等于 != 不等于 > 大于 >= 大于等于 < 小于 <= 小于等于
&& 与 || 或 ! 非
需求:
JobA执行一个shell脚本。
JobB执行一个shell脚本,但JobB不需要每天都执行,而只需要每个周一执行。
步骤1:新建JobA.sh
%w 一星期中的第几日(0-6),0 代表周日
#!/bin/bash
echo "do JobA"
wk=`date +%w`
echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
步骤2:新建JobB.sh
#!/bin/bash
echo "do JobB"
步骤3:新建condition.flow
JobB依赖于JobA,获取JobA的wk参数,如果等于1那么就执行JobB
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
dependsOn:
- JobA
config:
command: sh JobB.sh
condition: ${
JobA:wk} == 1
步骤4:将JobA.sh 、JobB.sh、condition.flow和azkaban.project打包成condition.zip
步骤5:创建condition项目->上传condition.zip文件->执行作业->观察结果
步骤6:根据设定的条件,判断是否执行,JobB会根据当日日期决定是否执行。
②预定义宏
常用的操作,预置了几个常用的判断条件,根据所有父Job的完成情况进行判断。
(1)all_success: 表示父Job全部成功才执行(默认)
(2)all_done:表示父Job全部完成才执行
(3)all_failed:表示父Job全部失败才执行
(4)one_success:表示父Job至少一个成功才执行
(5)one_failed:表示父Job至少一个失败才执行
需求:
JobA执行一个shell脚本
JobB执行一个shell脚本
JobC执行一个shell脚本,要求JobA、JobB中有一个成功即可执行
步骤1:新建JobA.sh
#!/bin/bash
echo "do JobA"
步骤2:新建JobC.sh
#!/bin/bash
echo "do JobC"
步骤3:新建marco.flow
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
config:
command: sh JobB.sh
- name: JobC
type: command
dependsOn:
- JobA
- JobB
config:
command: sh JobC.sh
condition: one_success
步骤4:JobA.sh、JobB.sh、macro.flow、azkaban.project文件打包成macro.zip
没有JobB.sh
步骤5:创建marco项目-> 上传marco.zip文件-> 执行作业 -> 观察结果
3.3 定时执行案例
需求:JobA每间隔1分钟执行一次
步骤1:执行工作流时,选择左下角
步骤2:观察结果
步骤3:删除定时任务
3.4 邮箱报警案例
Azkaban默认支持通过邮箱对失败的任务进行报警。
①注册邮箱
步骤1:申请一个126邮箱
步骤2:点击邮箱设置:
步骤3:开启SMTP服务
步骤4:记住授权码
②配置邮箱报警
步骤1:在azkaban的web server节点hadoop102上,编辑/opt/module/azkaban/azkaban-web/conf/azkaban.properties,修改配置文件:
mail.sender:表示发送信息的邮箱
mail.host:表示邮箱的stmp服务
mail.user:表示邮箱的用户
mail.password:注意不是邮箱的密码,是邮箱的授权码
#这里设置邮件发送服务器,需要 申请邮箱,切开通stmp服务,以下只是例子
[email protected]
mail.host=smtp.126.com
[email protected]
mail.password=用邮箱的授权码
步骤2:保存并重启web-server
[atguigu@hadoop102 azkaban-web]$ bin/shutdown-web.sh
[atguigu@hadoop102 azkaban-web]$ bin/start-web.sh
步骤3:编辑basic.flow
nodes:
- name: jobA
type: command
config:
command: echo "This is an email test."
步骤4:将azkaban.project和basic.flow压缩成email.zip
步骤5:创建工程=》上传文件=》执行作业=》查看结果
当要发送给多个邮箱的话,使用逗号或空格分隔开。
步骤6:观察到邮箱收到的消息
3.5 电话报警案例
azkaban默认仅支持的是邮箱报警,有时候任务执行失败后邮箱接收不及时,因此需要其他的报警方式,比如电话报警。可以使用第三方的告警平台进行集成,比如睿象云。
①第三方告警的原理
②第三方告警平台集成
步骤1:进入睿象云官网注册账号并登录。
官网地址:https://www.aiops.com/
步骤2:使用email邮箱集成
使用CA集成平台
步骤3:获取邮箱地址,后面需要将报警信息发送到该邮箱
步骤4:配置分派策略
步骤5:配置通知策略
③测试
执行上一个邮件通知的案例,将通知的对象改为刚刚集成第三方平台获取的邮箱
接下来会收到淡化报警。电话通知的内容就是azkaban的报警信息。
3.6 Azkaban多Executor模式注意实想
Azkaban集群模式的多Executor模式是指,在集群中部署多个Executor。在这种模式下Azkaban Web Server会根据策略,选择一个Executor去执行任务。
这样就会存在一个问题:执行的脚本如果不Executor上怎么办?那么就会执行失败。
①方案1:指定特定的Executor去执行
步骤1:在MySQL的azkaban数据库的executors表中,查询hadoop102的Executor的id
注意这个id是自增主键
步骤2:在执行工作流程时加上useExecutor属性
②分发脚本到集群
在Executor所在所有节点部署任务所需脚本和应用
4 YAML语法
4.1 语法特点
- 大小写敏感
- 缩进表示层级关系
- 不能使用tab键,只能使用空格
- 空格数目不重要,只要相同层级左对齐即可
4.2 数据结构
- 对象
- 数组
- 纯量
https://juejin.cn/post/6844903743557746702#heading-0
5 问题汇总
5.1 内存不足
一、原因:azkaban默认情况下在开始运行job时会检测系统的内存,其最低要求的内存是3G,若系统内存不足3G,便会出现运行的job一直卡在那不动。
二、解决办法:
(1)增加系统内存
(2)关闭检测内存的选项。具体办法为,在azkaban/azkaban-exec-server/plugins/jobtypes/目录下的commonprivate.properties的文件中添加一下内容:
memCheck.enabled=false
添加后重启服务即可。