主要是集群部署安装。
Downloads (azkaban.github.io)
启动mysql
mysql -uroot -proot
创建azkaban数据库
create database azkaban;
创建azkaban用户并赋予权限(可以不设置账号,继续使用root账号)
-- 显示相关变量 SHOW VARIABLES like 'validate_password%'; -- 设置密码有效长度1位及以上 set global validate_password.length=1; -- 设置密码策略最低级别 set global validate_password_policy=0; -- 创建Azkaban用户,任何主机都可以访问Azkaban,密码是azkaban CREATE USER 'azkaban'@'%' IDENTIFIED BY 'azkaban'; -- 赋予Azkaban用户增删改查权限 GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;
创建azkaban的表
source /opt/software/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql;
更改mysql包大小,防止azkaban连接mysql阻塞
sudo vim /etc/my.cnf # 在[mysqld]下面加一行max_allowed_packet=1024M [mysqld] max_allowed_packet=1024M
重启mysql
sudo systemctl restart mysqld
Azkaban Executor Server处理工作流和作业的实际执行。
编辑azkaban.properties
vim /opt/software/azkaban/azkaban-exec-server-3.84.4/conf/azkaban.properties
修改如下属性:
# 修改如下内容 default.timezone.id=Asia/Shanghai azkaban.webserver.url=http://node001:8081 executor.port=12321 database.type=mysql mysql.port=3306 mysql.host=node001 mysql.database=azkaban mysql.user=azkaban mysql.password=azkaban mysql.numconnections=100 # 添加如下内容 executor.metric.reports=true executor.metric.milisecinterval.default=60000
进入.../azkaban-exec-server-3.84.4/lib
更新 mysql-connector-java-8.0.29.jar
包,使其与mysql版本匹配
同步azkaban-exec-server-3.84.4
到所有节点
在各节点的**.../azkaban-exec-server-3.84.4/ **目录分别执行
bin/start-exec.sh
如果出现executor.port文件,说明启动成功。
在各个节点激活executor
curl -G "node001:$(<./executor.port)/executor?action=activate" && echo curl -G "node002:$(<./executor.port)/executor?action=activate" && echo curl -G "node003:$(<./executor.port)/executor?action=activate" && echo
如果节点出现提示:{"status":"success"},表示激活成功。
Azkaban Web Server处理项目管理,身份验证,计划和执行触发。
编辑azkaban.properties
修改如下: default.timezone.id=Asia/Shanghai database.type=mysql mysql.port=3306 mysql.host=node001 mysql.database=azkaban mysql.user=root mysql.password=root mysql.numconnections=100 azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
说明:
StaticRemainingFlowSize:正在排队的任务数
CpuStatus:CPU占用情况
MinimumFreeMemory:内存占用情况。测试环境,必须将MinimumFreeMemory删除掉,否则它会认为集群资源不够,不执行
修改azkaban-users.xml
文件,添加新用户
<azkaban-users> <user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/> <user password="metrics" roles="metrics" username="metrics"/> <user password="azkaban" roles="metrics,admin" username="nuochengze"/> <role name="admin" permissions="ADMIN"/> <role name="metrics" permissions="METRICS"/> </azkaban-users>
进入.../azkaban-web-server-3.84.4/lib
更新 mysql-connector-java-8.0.29.jar
包,使其与mysql版本匹配
进入...//azkaban-web-server-3.84.4/
目录,启动web Server
./bin/start-web.sh
访问http://node001:8081/
,并用配置的账号登录
新建azkaban.project文件,编辑内容如下:
azkaban-flow-version: 2.0
该文件的表明,采用新的Flow-API方式解析flow文件
新建hello_world.flow文件,编辑内容如下:
nodes: - name: jobA # job的名称 type: command # job的类型,command表示要执行作业的方式为命令 config: # job的配置信息 command: echo "hello world"
将azkaban.project、hello_world.flow文件压缩到一个zip文件
说明:文件名称必须是英文
在WebServer新建一个项目:http://node001:8081/index
Execute Flow
Job List
中查看运行结果需求:JobA和JobB执行完了,才能执行JobC
步骤:
创建basic.flow
文件,添加如下内容
nodes: - name: JobC type: command dependsOn: - JobA - JobB config: command: echo "I'm JobC" - name: JobA type: command config: command: echo "I'm JobA" - name: JobB type: command config: command: echo "I'm JobB"
dependsOn后面表示当前Job依赖的其他Job
创建azkaban.project
文件,编辑内容如下:
azkaban-flow-version: 2.0
将basic.flow
和azkaban.project
文件压缩成dependson_example.zip
文件
在WebServer新建一个项目:http://node001:8081/index
Execute Flow
Job List
中查看运行结果需求:如果执行任务失败,需要重试3次,重试的时间间隔10000ms
修改basic.flow文件:
nodes: - name: JobA type: command config: command: sh /not_exists.sh retries: 3 retry.backoff: 10000
参数说明:
执行的次数= 一次失败+三次重试
全局配置的方式:
# 在Flow全局配置中添加任务失败配置,此时重试配置会应用到所有Job config: retires: 3 retry.backoff: 10000 nodes: - name: JobA type: command config: command: sh /not_exists.sh
Enable 和 Disable 下面都分别有如下参数:
Parents:该作业的上一个任务
Ancestors:该作业前的所有任务
Children:该作业后的一个任务
Descendents:该作业后的所有任务
Enable All:所有的任务
JavaProcess 类型可以运行一个自定义主类方法,
type 类型为 javaprocess,
可用的配置为:
步骤:
新建一个azkaban的maven工程
创建包名:com.nuochengze
创建AzTest类:
package com.nuochengze public class AzTest { public static void main(String[] args){ System.out.println("This is a test."); } }
打包成jar包
修改basic.flow
文件
nodes: - name: test_java type: javaprocess config: Xms: 96M Xmx: 200M java.class: com.nuochengze.AzTest
将jar包和basic.flow
文件及azkaban.project
文件打包成zip包
条件工作流功能允许用户自定义执行条件来决定是否运行某些Job。条件可以由当前Job 的父 Job 输出的运行时参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定 Job 执行逻辑时获得更大的灵活性,例如,只要父 Job 之一成功,就可以运行当前 Job。
运行时参数案例:
基本原理
父 Job 将参数写入 JOB_OUTPUT_PROP_FILE 环境变量所指向的文件
子 Job 使用 ${jobName:param}来获取父 Job 输出的参数并定义执行条件
支持的条件运算符
案例:
需求:JobA执行一个shell脚本,JobB执行一个shell脚本,但JobB不需要每天都执行,而只需要每个周一执行
步骤:
新建jobA.sh
#! /bin/bash echo "do JobA" wk=`date +%w` echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
新建JobB.sh
#!/bin/bash echo "do JobB"
新建basic.flow
nodes: - name: JobA type: command config: command: sh JobA.sh - name: JobB type: command dependsOn: - JobA config: command: sh JobB.sh condition: ${JobA:wk} == 1
按照设定条件,JobB会根据当日日期决定是否执行
Azkaban 中预置了几个特殊的判断条件,称为预定义宏。 预定义宏会根据所有父 Job 的完成情况进行判断,再决定是否执行。
可用的预定义宏如 下:
案例:
需求
JobA 执行一个 shell 脚本 ,JobB 执行一个 shell 脚本,JobC 执行一个 shell 脚本,要求 JobA、JobB 中有一个成功即可执行
步骤
新建JobA.sh
#!/bin/bash echo "do JobA"
新建JobC.sh
#!/bin/bash echo "do JobC
新建basic.flow
nodes: - name: JobA type: command config: command: sh JobA.sh - name: JobB type: command config: command: sh JobB.sh - name: JobC type: command depondsOn: - JobA - JobB config: command: sh JobC.sh condition: one_success
注意:没有JobB.sh
注册邮箱并开启smtp,获取第三方客户端授权码
Azkaban 默认支持通过邮件对失败的任务进行报警,配置方法如下:
在 azkaban-web 节 点 node001上,编辑.../azkaban-web-server-3.84.4/conf/azkaban.properties
,并修改如下内容:
#这里设置邮件发送服务器 mail.sender=xxxx@126.com mail.host=smtp.126.com mail.user=xxxx@126.com mail.password=用邮箱的授权码
保存并重启web-server
页面配置
Azkaban 多 Executor 模式是指,在集群中多个节点部署 Executor。在这种模式下, Azkaban web Server 会根据策略,选取其中一个 Executor 去执行任务。
为确保所选的 Executor 能够准确的执行任务,我们须在以下两种方案任选其一,推荐使 用方案二。
方案一:指定特定的 Executor(hadoop102)去执行任务。
在 MySQL 中 azkaban 数据库 executors 表中,查询 node001上的 Executor 的 id。
在执行工作流程时加入 useExecutor 属性,如下