Java教程

一、Azkaban简明笔记

本文主要是介绍一、Azkaban简明笔记,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1、azkaban部署

主要是集群部署安装。

1.1 准备安装包

Downloads (azkaban.github.io)

1.2 配置MySQL

  1. 启动mysql

    mysql -uroot -proot

  2. 创建azkaban数据库

    create database azkaban;

  3. 创建azkaban用户并赋予权限(可以不设置账号,继续使用root账号)

    -- 显示相关变量
    SHOW VARIABLES like 'validate_password%';
    
    -- 设置密码有效长度1位及以上
    set global validate_password.length=1;
    -- 设置密码策略最低级别
    set global validate_password_policy=0;
    -- 创建Azkaban用户,任何主机都可以访问Azkaban,密码是azkaban
    CREATE USER 'azkaban'@'%' IDENTIFIED BY 'azkaban'; 
    
    -- 赋予Azkaban用户增删改查权限
    GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;
    
  4. 创建azkaban的表

    source /opt/software/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql;
    
  5. 更改mysql包大小,防止azkaban连接mysql阻塞

    sudo vim /etc/my.cnf
    
    # 在[mysqld]下面加一行max_allowed_packet=1024M
    
    [mysqld]
    max_allowed_packet=1024M
    
  6. 重启mysql

    sudo systemctl restart mysqld
    

1.3 配置Executor Server

Azkaban Executor Server处理工作流和作业的实际执行。

  1. 编辑azkaban.properties

    vim /opt/software/azkaban/azkaban-exec-server-3.84.4/conf/azkaban.properties
    

    修改如下属性:

    # 修改如下内容
    default.timezone.id=Asia/Shanghai
    azkaban.webserver.url=http://node001:8081
    executor.port=12321
    
    database.type=mysql
    mysql.port=3306
    mysql.host=node001
    mysql.database=azkaban
    mysql.user=azkaban
    mysql.password=azkaban
    mysql.numconnections=100
    
    # 添加如下内容
    executor.metric.reports=true
    executor.metric.milisecinterval.default=60000
    
  2. 进入.../azkaban-exec-server-3.84.4/lib 更新 mysql-connector-java-8.0.29.jar包,使其与mysql版本匹配

  3. 同步azkaban-exec-server-3.84.4到所有节点

  4. 在各节点的**.../azkaban-exec-server-3.84.4/ **目录分别执行

    bin/start-exec.sh
    

    如果出现executor.port文件,说明启动成功。

  5. 在各个节点激活executor

    curl -G "node001:$(<./executor.port)/executor?action=activate" && echo
    
    curl -G "node002:$(<./executor.port)/executor?action=activate" && echo
    
    curl -G "node003:$(<./executor.port)/executor?action=activate" && echo
    

    如果节点出现提示:{"status":"success"},表示激活成功。

1.4 配置Web Server

Azkaban Web Server处理项目管理,身份验证,计划和执行触发。

  1. 编辑azkaban.properties

    修改如下:
    default.timezone.id=Asia/Shanghai
    
    database.type=mysql
    mysql.port=3306
    mysql.host=node001
    mysql.database=azkaban
    mysql.user=root
    mysql.password=root
    mysql.numconnections=100
    
    azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
    

    说明:

    • StaticRemainingFlowSize:正在排队的任务数

    • CpuStatus:CPU占用情况

    • MinimumFreeMemory:内存占用情况。测试环境,必须将MinimumFreeMemory删除掉,否则它会认为集群资源不够,不执行

  2. 修改azkaban-users.xml文件,添加新用户

    <azkaban-users>
      <user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/>
      <user password="metrics" roles="metrics" username="metrics"/>
      <user password="azkaban" roles="metrics,admin" username="nuochengze"/>
    
      <role name="admin" permissions="ADMIN"/>
      <role name="metrics" permissions="METRICS"/>
    </azkaban-users>
    
  3. 进入.../azkaban-web-server-3.84.4/lib 更新 mysql-connector-java-8.0.29.jar包,使其与mysql版本匹配

  4. 进入...//azkaban-web-server-3.84.4/目录,启动web Server

    ./bin/start-web.sh
    
  5. 访问http://node001:8081/,并用配置的账号登录

2、使用

2.1 HelloWorld案例

  1. 新建azkaban.project文件,编辑内容如下:

    azkaban-flow-version: 2.0
    

    该文件的表明,采用新的Flow-API方式解析flow文件

  2. 新建hello_world.flow文件,编辑内容如下:

    nodes:
    	- name: jobA  # job的名称
       type: command  # job的类型,command表示要执行作业的方式为命令
    	  config:  # job的配置信息
    	  	command: echo "hello world"
    
  3. 将azkaban.project、hello_world.flow文件压缩到一个zip文件

    说明:文件名称必须是英文

  4. 在WebServer新建一个项目:http://node001:8081/index

    1. 给项目名称命名和添加项目描述
    2. 将zip包文件上传
    3. 执行Execute Flow
    4. Job List中查看运行结果

2.2 作业依赖案例

需求:JobA和JobB执行完了,才能执行JobC

步骤:

  1. 创建basic.flow文件,添加如下内容

    nodes:
    	- name: JobC
    	  type: command
    	  dependsOn:
    	  	- JobA
    	  	- JobB
    	  config: 
    	  	command: echo "I'm JobC"
    	  
    	- name: JobA
    	  type: command
    	  config:
    	  	command: echo "I'm JobA"
    	  	
    	- name: JobB
    	  type: command
    	  config:
    	  	command: echo "I'm JobB"
    

    dependsOn后面表示当前Job依赖的其他Job

  2. 创建azkaban.project文件,编辑内容如下:

    azkaban-flow-version: 2.0
    
  3. basic.flowazkaban.project文件压缩成dependson_example.zip文件

  4. 在WebServer新建一个项目:http://node001:8081/index

    1. 给项目名称命名和添加项目描述
    2. 将zip包文件上传
    3. 执行Execute Flow
    4. Job List中查看运行结果

2.3 自动失败重试案例

需求:如果执行任务失败,需要重试3次,重试的时间间隔10000ms

修改basic.flow文件:

nodes:	
	- name: JobA
	  type: command
	  config: 
	  	command: sh /not_exists.sh
	  	retries: 3
	  	retry.backoff: 10000

参数说明:

  • retries:重试次数
  • retry.backoff: 重试的时间间隔

执行的次数= 一次失败+三次重试

全局配置的方式:

# 在Flow全局配置中添加任务失败配置,此时重试配置会应用到所有Job

config:
	retires: 3
	retry.backoff: 10000
nodes:
	- name: JobA
	  type: command
	  config:
	  	command: sh /not_exists.sh

2.4 手动失败重试案例

Enable 和 Disable 下面都分别有如下参数:

  • Parents:该作业的上一个任务

  • Ancestors:该作业前的所有任务

  • Children:该作业后的一个任务

  • Descendents:该作业后的所有任务

  • Enable All:所有的任务

2.5 JavaProcess作业类型案例

JavaProcess 类型可以运行一个自定义主类方法,

type 类型为 javaprocess,

可用的配置为:

  • Xms:最小堆
  • Xmx:最大堆
  • classpath:类路径
  • java.class:要运行的 Java 对象,其中必须包含 Main 方法
  • main.args:main 方法的参数

步骤:

  1. 新建一个azkaban的maven工程

  2. 创建包名:com.nuochengze

  3. 创建AzTest类:

    package com.nuochengze
    
    public class AzTest {
        public static void main(String[] args){
            System.out.println("This is a test.");
        }
    }
    
  4. 打包成jar包

  5. 修改basic.flow文件

    nodes: 
    	- name: test_java
    	  type: javaprocess
    	  config:
    	  	Xms: 96M
    	  	Xmx: 200M
    	  	java.class: com.nuochengze.AzTest
    
  6. 将jar包和basic.flow文件及azkaban.project文件打包成zip包

2.6 条件工作流案例

条件工作流功能允许用户自定义执行条件来决定是否运行某些Job。条件可以由当前Job 的父 Job 输出的运行时参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定 Job 执行逻辑时获得更大的灵活性,例如,只要父 Job 之一成功,就可以运行当前 Job。

运行时参数案例:

  1. 基本原理

    • 父 Job 将参数写入 JOB_OUTPUT_PROP_FILE 环境变量所指向的文件

    • 子 Job 使用 ${jobName:param}来获取父 Job 输出的参数并定义执行条件

  2. 支持的条件运算符

    • == 等于
    • != 不等于
    • > 大于
    • >= 大于等于
    • < 小于
    • <= 小于等于
    • && 与
    • || 或
    • ! 非

案例:

  • 需求:JobA执行一个shell脚本,JobB执行一个shell脚本,但JobB不需要每天都执行,而只需要每个周一执行

  • 步骤:

    1. 新建jobA.sh

      #! /bin/bash
      
      echo "do JobA"
      wk=`date +%w`
      echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
      
    2. 新建JobB.sh

      #!/bin/bash
      echo "do JobB"
      
    3. 新建basic.flow

      nodes:
      	- name: JobA
        	  type: command
            config:
         		command: sh JobA.sh
      	- name: JobB
        	  type: command
       	  dependsOn:
       		- JobA
       	  config:
       		command: sh JobB.sh
            condition: ${JobA:wk} == 1
      

      按照设定条件,JobB会根据当日日期决定是否执行

2.7 预定义宏案例

Azkaban 中预置了几个特殊的判断条件,称为预定义宏。 预定义宏会根据所有父 Job 的完成情况进行判断,再决定是否执行。

可用的预定义宏如 下:

  • all_success: 表示父 Job 全部成功才执行(默认)
  • all_done:表示父 Job 全部完成才执行
  • all_failed:表示父 Job 全部失败才执行
  • one_success:表示父 Job 至少一个成功才执行
  • one_failed:表示父 Job 至少一个失败才执行

案例:

  • 需求

    JobA 执行一个 shell 脚本 ,JobB 执行一个 shell 脚本,JobC 执行一个 shell 脚本,要求 JobA、JobB 中有一个成功即可执行

  • 步骤

    1. 新建JobA.sh

      #!/bin/bash
      echo "do JobA"
      
    2. 新建JobC.sh

      #!/bin/bash
      echo "do JobC
      
    3. 新建basic.flow

      nodes:
      	- name: JobA
      	  type: command
      	  config:
      	  	command: sh JobA.sh
      	- name: JobB
      	  type: command
      	  config:
      	  	command: sh JobB.sh
      	- name: JobC
      	  type: command
      	  depondsOn:
      	  	- JobA
      	  	- JobB
      	  config: 
      	  	command: sh JobC.sh
      	  condition: one_success
      

      注意:没有JobB.sh

2.8 定时执行

  • Azkaban 可以定时执行工作流。在执行工作流时候,选择左下角 Schedule
  • 右上角注意时区是上海,然后在左面填写具体执行事件,填写的方法和 crontab 配置定时 任务规则一致
  • 点击 remove Schedule 即可删除当前任务的调度规则

2.9 邮箱报警案例

  1. 注册邮箱并开启smtp,获取第三方客户端授权码

  2. Azkaban 默认支持通过邮件对失败的任务进行报警,配置方法如下:

    1. 在 azkaban-web 节 点 node001上,编辑.../azkaban-web-server-3.84.4/conf/azkaban.properties,并修改如下内容:

      #这里设置邮件发送服务器
      mail.sender=xxxx@126.com
      mail.host=smtp.126.com
      mail.user=xxxx@126.com
      mail.password=用邮箱的授权码
      
    2. 保存并重启web-server

    3. 页面配置




















2.10 Azkaban多Executor模式注意事项

Azkaban 多 Executor 模式是指,在集群中多个节点部署 Executor。在这种模式下, Azkaban web Server 会根据策略,选取其中一个 Executor 去执行任务。

为确保所选的 Executor 能够准确的执行任务,我们须在以下两种方案任选其一,推荐使 用方案二。

  • 方案一:指定特定的 Executor(hadoop102)去执行任务。

    1. 在 MySQL 中 azkaban 数据库 executors 表中,查询 node001上的 Executor 的 id。

    2. 在执行工作流程时加入 useExecutor 属性,如下















  • 方案二:在 Executor 所在所有节点部署任务所需脚本和应用。
这篇关于一、Azkaban简明笔记的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!