分布式协调服务
主要⽤用来解决分布式集群中应⽤用系统的一致性问题
ZooKeeper 本质上是⼀个分布式的⼩文件存储系统, 基于类似于⽂文件系统的⽬目录树⽅方式的数 据存储,并且可以对树中的节点进⾏行行有效管理理。
ZooKeeper 提供给客户端监控存储在zk内部数据的功能, 达到基于数据的集群管理理
统一命名服务(dubbo)、分布式配置管理理(solr的配置集中管理理)、分布式消息队列列 (sub/pub)、分布式锁、分布式协调
Leader
Leader 不是手动指定的,而是 Follower 选举出来的
核心组件,事务请求(写操作) 的唯一处理者
Follower
处理客户端非事务(读操作) 请求
转发事务请求给 Leader
Observer
针对访问量大的 ZooKeeper 集群,可以增加 Observer
观察 ZooKeeper 集群的最新状态变化,并将这些状态同步
对于 非事务请求,可以独立处理
对于 事务请求,会转发给 Leader 服务器
不会参与任何形式的投票,只提供非事务服务
不影响群事务处理能力的前提下,提升集群的非事务处理能力
Leader + 多个 Follower 的集群
Leader 负责发起投票和决议,更新系统状态
Follower 用于接受客户请求,并向 客户端返回结果,参与投票
集群中有 半数节点存活,集群就能正常服务
因为 全局数据一致,每个 server 保存一份相同的数据副本
更新请求顺序执行
数据更新原子性,一次数据更新,要么成功,要么失败
tar -zxvf zookeeper-3.4.14.tar.gz -C ../servers/
添加 myid 配置
启动 zk
查看启动情况
集群启动停止脚本
ZooKeeper 数据模型 Znode
数据信息保存在若干数据节点 Znode 上,是 ZooKeeper 中最小的数据单位
Znode 组成 Znode 树的命名空间
持久性节点,直到删除操作才会被清除
临时节点,不能有子节点,其生命周期和客户端会话绑定在一起,会话结束后被清除
持久顺序节点,持久的节点,其节点名会有一个表示顺序的数字后缀
临时顺序节点,临时的节点,其节点名会有一个表示顺序的数字后缀
事务 - 对物理和抽象的应用状态上的操作集合
通常指的是数据库事务,一般包含了⼀系列对数据库有序的读写操作
但是,事务是指能够改变 ZooKeeper 服务器器状态的操作
或更新操作,⼀般包括数据节点创建与删除、数据节点内容更新等操作
zk 中的事务指的是对zk服务器状态改变的操作(create, update data,更新字节点); zk 对这些事务操作都会编号,这个编号是⾃增长的被称为ZXID
#使⽤用bin/zkCli.sh 连接到zk集群 [zk: localhost:2181(CONNECTED) 2] get /zookeeper cZxid = 0x0 ctime = Wed Dec 31 19:00:00 EST 1969 mZxid = 0x0 mtime = Wed Dec 31 19:00:00 EST 1969 pZxid = 0x0 cversion = -1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1
ZNode 节点内容包括两部分: 节点数据内容 和 节点状态信息。此处数据内容是空
cZxid 就是 Create ZXID,表示节点被创建时的事务 ID。
ctime 就是 Create Time,表示节点创建时间。
mZxid 就是 Modified ZXID,表示节点最后⼀次被修改时的事务ID。
mtime 就是 Modified Time,表示节点最后⼀次被修改的时间。
pZxid 表示该节点的子节点列表最后⼀次被修改时的事务 ID。只有子节点列表变更才会更新 pZxid,⼦节点内容变更不会更新。
cversion 表示子节点的版本号。
dataVersion 表示内容版本号。
aclVersion 标识 acl 版本
ephemeralOwner 表示创建该临时节点时的会话 sessionID,如果是持久性节点那么值为 0
dataLength 表示数据⻓长度。
numChildren 表示直系⼦子节点数。
实现 分布式数据的发布/订阅 功能
一对多的订阅关系,让多个订阅者同时监听某个主题对象,当主题对象状态变化时,会通知所有订阅者,并作出相应的处理
客户端命令行
bin/zkCli.sh 连接本地的 zk 服务器 bin/zkCli.sh -server ip:port(2181) 连接指定的服务器
查看可用命令
[zk: localhost:2181(CONNECTED) 3] help ZooKeeper -server host:port cmd args stat path [watch] set path data [version] ls path [watch] delquota [-n|-b] path ls2 path [watch] setAcl path acl setquota -n|-b val path history redo cmdno printwatches on|off delete path [version] sync path listquota path rmr path get path [watch] create [-s] [-e] path data acl addauth scheme auth quit getAcl path close connect host:port
创建节点
create [-s] [-e] path data acl -s -e 分别指定节点特性(顺序 或 临时), 若不指定,则为 持久节点
create -s /zk-test 123 -s 顺序节点 /zk-test 节点名 123 节点内容
create -e /zk-temp 123
create /zk-permanent 123
读取节点
ls 查看节点 ls path get 获取节点的数据内容和属性信息 get path
更新节点
set path data 如: set /zk-permanent 456
删除节点
delete path delete /zk-permanent
若删除节点存在子节点,则无法删除,需要先删除子节点
依赖:
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.14</version> <type>pom</type> </dependency> <!-- https://mvnrepository.com/artifact/com.101tec/zkclient --> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency>
创建会话
package com.lagou.zk.demo; import org.I0Itec.zkclient.ZkClient; public class ZkDemo { public static void main(String[] args) { // 1. 获取 zkClient 对象 final ZkClient zkClient = new ZkClient("linux121:2181"); System.out.println("zkclient is ready"); } }
创建节点
// 2. 创建节点 // 持久节点, 注意递归创建节点,需要 设置 true zkClient.createPersistent("/lagou-client/lagou-c1", true); System.out.println("node is created");
删除节点
// 3. 删除节点 // 递归删除 zkClient.deleteRecursive("/lagou-client"); System.out.println("node is deleted");
监听节点变化
注意 监听器 可用对 不存在的节点 进行监听
监听⽬录下子节点发生改变,可以接收到通知,携带数据有⼦节点列表
3 监听⽬录创建和删除本身也会被监听到
package com.lagou.zk.demo; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import java.util.List; public class Get_ChildNode_Change { public static void main(String[] args) throws InterruptedException { // 1. 获取 zkClient final ZkClient zkClient = new ZkClient("linux121:2181"); // 2. 对指定节点进行监听,指定收到通知后的处理逻辑 // 处理逻辑 通过重写 handleChildChange 方法实现 zkClient.subscribeChildChanges("/lg-client", new IZkChildListener() { @Override public void handleChildChange(String path, List<String> childs) throws Exception { // 打印节点信息 System.out.println(path + " child node changes - current children " + childs); } }); // 3. 删除节点,验证监听是否有效 zkClient.createPersistent("/lg-client"); Thread.sleep(1000); zkClient.createPersistent("/lg-client/c1"); Thread.sleep(1000); zkClient.delete("/lg-client/c1"); Thread.sleep(1000); zkClient.delete("/lg-client"); Thread.sleep(Integer.MAX_VALUE); } }
监听、获取数据
监听数据变化的监听器,需要监听节点是否被删除,以及监听数据是否改变
zkClient.subscribeDataChanges("/lg-client", new IZkDataListener() { @Override public void handleDataChange(String path, Object data) throws Exception { // 定义接收通知之后的处理逻辑 System.out.println(path + " data is changed, new data: " + data); } // 数据删除时 @Override public void handleDataDeleted(String path) throws Exception { System.out.println(path + " path is deleted"); } });
选举机制:
假设 5 个节点
服务器 1 启动,发出去的保文没有任何响应,所以其选举状态一直是 Looking 状态
服务器 2 启动,与 1 通信,互相交换自己的选举结果,但是由于两者都没有历史数据,所以 id 值较大的 2 胜出,但是由于没有达到超过半数以上的服务器的投票,所以其不能当选。此时 1 2 均保持 looking 状态
服务器 3 启动,根据 id 值较大的胜出,且超过半数投票,此时 3 为 Leader, 其余节点为 Follower
服务器 4 启动,id 值大小胜出,但是不超过半数投票,此时 4 为 Folloer
服务器 5 同 4, Folloer
分布式数据会有一致性问题
数据的多副本,以及多副本同时读写,会导致一致性问题
ZAB(ZooKeeper Atomic Broadcast, 原子消息广播协议)
主备模式,保证一致性
所有的数据都由主进程处理,然后复制给副本进程
ZAB 会将服务器数据的状态变更以事务 Proposal 的形式广播到所有的副本进程上,ZAB协议能够保证了事务操作的一个全局的变更更序号(ZXID)
广播消息
ZAB 协议广播消息过程,类似于 二阶段提交过程
对于 client 发送的 写请求,全部由 Leader 接收, Leader 将请求封装为 事务 Proposal,分发给所有 Follower
Leader 会给这个事务分配一个全局递增的唯一 ID, 即 事务 ID (ZXID)
ZAB 协议要求保证事务有序
Follower 反馈 ACK
如果超过半数 Follower 反馈 ACK,则执行 Commit 操作
先提交自己,再发送 Commit 给所有 Follower
不能正常反馈 Follower 恢复正常后会进入数据同步阶段最终与 Leader 保持一致
Leader 宕机奔溃
Leader 宕机后,选举新 Leader
ZAB 协议的选举算法,能够确保 Leader 提交的事务被集体接收,并且丢弃还没有提交的事务
因此,选举机制保证选举出的新 Leader 拥有集群中所有节点最大事务编号(ZXID) 的事务
应用:服务器动态上下线监听
分布式系统中,主节点会有多台
主节点因为任何原因出现宕机或者下线,客户端都要能实时感知到主节点服务器的状态
案例:
client 向服务器发送请求,获取时间
服务器为 client 请求创建 zk 节点
在节点内启动 main 函数,接收 client 请求后,返回时间
锁
对变量或者堆代码块做同步,即为 锁
目的是实现多个线程在一个时刻内,同一个代码块只能有一个线程可执行
单机程序中,多个线程可以同时改变某个变量时,为了保证线程安全,需要对变量 或 代码做同步,使其在改变变量时能够串行执行消除并发修改变量
分布式锁
对于分布式程序,多台机器上的多个线程,有了线程锁,但是由于操作的是同一个数据库, 分布式程序运行在不同机器的 JVM 里,线程锁无法作用与分布式系统
此时,需要分布式锁
在整个系统提供一个全局、唯一的锁
在分布式系统中的每个机器需要获取到该锁,才能执行后续的逻辑操作
zk 实现分布式锁
思路:
高可用,消除单点故障
搭建 两个 NN 的集群,消除单点故障 (Active NN / Standby NN)
元数据
两个 NN 各自保存一份元数据
Edits 日志只有 Active NN 可以写操作
两个 NN 都可以读取 Edits
需要一个 状态管理功能模块
实现一个 zkfailover 常驻在 两个 NN 节点上, 负责监控各自 NN 节点
利用 zk 进行状态标识,由 zkfailover 负责切换状态,切换过程需要防止 brain split 现象发生
集群中同时出现连个 Active NN
两个 NN 之间 ssh 免密登录
两个 NN 隔离,同一时刻只有一个 NN 对外提供服务
工作机制
HDFS - HA 的自动故障转移的实现是通过为 HDFS 部署两个新组件
自动故障转移
ZK 部分:
故障检测
现役 NN 选择
ZKC 部分:
是 ZooKeeper 的客户端
健康监测
ZK 会话管理
基于 ZK 选择
集群规划
linux121 | linux122 | linux123 |
---|---|---|
NN | NN | |
JournalNode | JournalNode | JournalNode |
DN | DN | DN |
ZK | ZK | ZK |
ResourceManager | ||
NM | NM | NM |
启动 zk 集群
停止原先 HDFS 集群
stop-dfs.sh
创建 HA 目录,拷贝愿 hadoop 目录到 HA 目录,删除里面的 data 目录
mkdir /opt/lagou/servers/ha cp -r /opt/lagou/servers/hadoop-2.9.2 /opt/lagou/servers/ha rm -rf /opt/lagou/servers/ha/hadoop-2.9.2/data
配置 hdfs-site.xml
<property> <name>dfs.nameservices</name> <value>lagoucluster</value> </property> <property> <name>dfs.ha.namenodes.lagoucluster</name> <value>nn1,nn2</value> </property> <property> <name>dfs.namenode.rpc-address.lagoucluster.nn1</name> <value>linux121:9000</value> </property> <property> <name>dfs.namenode.rpc-address.lagoucluster.nn2</name> <value>linux122:9000</value> </property> <property> <name>dfs.namenode.http-address.lagoucluster.nn1</name> <value>linux121:50070</value> </property> <property> <name>dfs.namenode.http-address.lagoucluster.nn2</name> <value>linux122:50070</value> </property> <property> <!-- edits 贡献目录 --> <name>dfs.namenode.shared.edits.dir</name> <value>qjournal://linux121:8485;linux122:8485;linux123:8485/lagou</value> </property> <property> <name>dfs.client.failover.proxy.provider.lagoucluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyPr ovider</value> </property> <property> <name>dfs.ha.fencing.methods</name> <value>sshfence</value> </property> <property> <name>dfs.ha.fencing.ssh.private-key-files</name> <value>/root/.ssh/id_rsa</value> </property> <property> <name>dfs.journalnode.edits.dir</name> <value>/opt/journalnode</value> </property> <property> <name>dfs.ha.automatic-failover.enabled</name> <value>true</value> </property>
配置 core-site.xml
<property> <name>fs.defaultFS</name> <value>hdfs://lagoucluster</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/opt/lagou/servers/ha/hadoop-2.9.2/data/tmp</value> </property> <property> <name>ha.zookeeper.quorum</name> <value>linux121:2181,linux122:2181,linux123:2181</value> </property>
拷贝配置好的 hadoop ha 环境 到其他节点
启动 HDFS - HA 集群
各个 JournalNode 节点上,启动 JN 服务
/opt/lagou/servers/ha/hadoop-2.9.2/sbin/hadoop-daemon.sh start journalnode
格式化 NN 1, 并启动
/opt/lagou/servers/ha/hadoop-2.9.2/bin/hdfs namenode -format /opt/lagou/servers/ha/hadoop-2.9.2/sbin/hadoop-daemon.sh start namenode
NN 2 同步 NN 1 的元数据信息
/opt/lagou/servers/ha/hadoop-2.9.2/bin/hdfs namenode -bootstrapStandby
NN 1 上初始化 ZKFC
/opt/lagou/servers/ha/hadoop-2.9.2/bin/hdfs zkfc -formatZK
NN 1 上启动集群
/opt/lagou/servers/ha/hadoop-2.9.2/sbin/start-dfs.sh
验证
kill Active NN 进程
Yarn - HA 不需要像 HDFS 那样同步元数据
集群规划
linux121 | linux122 | linux123 |
---|---|---|
NN | NN | |
JournalNode | JournalNode | JournalNode |
DN | DN | DN |
ZK | ZK | ZK |
ResourceManager | ResourceManager | |
NM | NM | NM |
配置 yarn-site.xml
<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!--启⽤用resourcemanager ha--> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <!--声明两台resourcemanager的地址--> <property> <name>yarn.resourcemanager.cluster-id</name> <value>cluster-yarn</value> </property> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>linux122</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>linux123</value> </property> <!--指定zookeeper集群的地址--> <property> <name>yarn.resourcemanager.zk-address</name> <value>linux121:2181,linux122:2181,linux123:2181</value> </property> <!--启⽤用⾃自动恢复--> <property> <name>yarn.resourcemanager.recovery.enabled</name> <value>true</value> </property> <!--指定resourcemanager的状态信息存储在zookeeper集群--> <property> <name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</ value> </property> </configuration>
同步到其他节点
启动 HDFS
sbin/start-yarn.sh
一个分布式海量列式非关系型
数据库系统
提供超大规模数据集的实时随机读写
列式存储的优点:减少空值,减少存储空间占用
特点:
应用:
交通、金融、电商、电信
适合海量明细数据的存储,并且需要很好的查询性能
模糊查询用 FILTER 参数 => “函数名(参数)”, 多列族用 COLUMNS 参数
hbase shell
help
list
create 'lagou', 'base_info', 'extra_info' -- 或者指定 列族信息 create 'lagou', {NAME => 'base_info', VERSIONS => '3'}, {NAME => 'extra_info', VERSIONS => '3'}
VERSIONS 表示此单元格内的数据可以保留最近的 3 个版本
-- row key 为 rk1, 列族 base_info 中添加 name 列标识符,值为 wang put 'lagou', 'rk1', 'base_info:name', 'wang' -- row key 为 rk1, 列族 base_info 中添加 age 列标识符,值为 30 put 'lagou', 'rk1', 'base_info:age', 30 -- row key 为 rk1, 列族 extra_info 中添加 address 列标识符,值为 shanghai put 'lagou', 'rk1', 'extra_info:address', 'shanghai'
-- 查询表中 row key 为 rk1 的所有信息 get 'lagou', 'rk1' -- 查看 row key 下面的某个列族的信息 get 'lagou', 'rk1', 'base_info' -- 查看 row key 指定列族中字段的值 get 'lagou', 'rk1', 'base_info:name', 'extra_info:address' -- 或者 get 'lagou', 'rk1', {COLUMN => ['base_info:name', 'extra_info:address']} -- 指定 row key 与 列族中列值,进行查询 binary 二分查找 get 'lagou', 'rk1', {FILTER => "ValueFilter(=, 'binary:wang')"} -- 对 列名 模糊查询 substring get 'lagou', 'rk1', {FILTER => "QualifierFilter(=, 'substring:a')"} -- 查询表中所有信息 scan 'lagou' -- 列族查询 scan 'lagou', {COLUMNS => 'base_info'} -- scan 也可以指定 VERSIONS 参数, RAW 参数等 scan 'lagou', {COLUMNS => 'base_info', RAW => true, VERSIONS => 3} -- 查询多个列族里,对列名模糊查询 scan 'lagou', {COLUMNS => ['base_info', 'extra_info'], FILTER => "QualifierFilter(=, 'substring:a')"} -- 查询 rowkey -- 顺序查询 scan 'lagou', {COLUMNS => 'base_info', STARTROW => 'rk1', ENDROW => 'rk3'} -- 模糊查询 scan 'lagou', {FILTER => "PrefixFilter('rk')"} -- 更新数据 -- 同插入数据 -- 更新数据值 put 'lagou', 'rk1', 'base_info:name', 'liang' -- 删除数据和表 -- 指定 rowkey 以及 列名,删除 delete 'lagou', 'rk1', 'base_info:name' -- 指定 rowkey 以及 列名 和 时间戳信息 删除 delete 'lagou', 'rk1', 'base_info:name', 1634301287482 -- 删除列族 alter 'lagou', 'delete' => 'base_info' -- 清空表数据 truncate 'lagou' -- 删除表 -- 先 disable, 再 drop disable 'lagou' drop 'lagou'
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9FWrsTxy-1634397879870)(HBase-readdata.png)]
HBase 读操作
首先从 zk 找到 meta 表的region位置,然后读取 meta 表中的数据,meta 表中存储了用户表的 region 信息
根据要查询的 namespace、表名和 rowkey 信息。找到写入数据对应的 region 信息
找到这个 region 对应的 regionServer,然后发送请求
查找对应的 region
先从 memstore 查找数据,如果没有,再从 BlockCache 上读取
HBase 上 Regionserver 的内存分为两个部分
一部分作为 Memstore 主要用来写
另一部分作为 BlockCache 主要用于读数据
如果 BlockCache 中也没有找到,再到 StoreFile 上进行读取
从 storeFile 中读取到数据之后,不是直接把结果数据返回给客户端,而是把数据先写入到 BlockCache 中,目的是为了加快后续的查询; 然后在返回结果给客户端。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OwzSbC0w-1634397879873)(HBase-writedata.png)]
package com.lagou.hbase.client; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.nio.charset.StandardCharsets; public class HbaseClientDemo { Configuration conf = null; Connection conn = null; @Before public void init() throws IOException { // 获取一个配置文件对象 conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "linux121,linux122,linux123"); conf.set("hbase.zookeeper.property.clientPort", "2181"); // 通过 conf 获取到 hbase 的集群链接 conn = ConnectionFactory.createConnection(conf); } // 创建一张 hbase 表 @Test public void creatTable() throws IOException { // 获取 HbaseAdmin 对象 HBaseAdmin admin = (HBaseAdmin) conn.getAdmin(); // 表描述器 HTableDescriptor teacher = new HTableDescriptor(TableName.valueOf("teacher")); // 设置列族描述器 teacher.addFamily(new HColumnDescriptor("info")); // 执行创建操作 admin.createTable(teacher); System.out.println("table teacher is created ..."); } // 插入数据 @Test public void putData() throws IOException { // 获取 table 对象 Table t1 = conn.getTable(TableName.valueOf("teacher")); // 创建 put 对象,设定 rowkey Put put = new Put(Bytes.toBytes("110")); // 列族,列,值 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("addr"), Bytes.toBytes("Beijing")); // 插入数据到 表 t1 t1.put(put); // 插入多条时,用 list<puts> 遍历批量插入 // 关闭对象 t1.close(); System.out.println("data is inserted to table ..."); } // 删除数据 @Test public void deleteData() throws IOException { // 获取 table 对象 Table t1 = conn.getTable(TableName.valueOf("teacher")); // 创建 delete 对象, 需要指定 rowkey Delete delete = new Delete(Bytes.toBytes("110")); t1.delete(delete); // 关闭 table 对象 t1.close(); System.out.println("data is deleted ..."); } // 查询某个列族数据 @Test public void getDataByColumnFamily() throws IOException { // 获取 table 对象 Table t1 = conn.getTable(TableName.valueOf("teacher")); // 创建 get 对象,指定 rowkey Get get = new Get(Bytes.toBytes("110")); // 指定 列族信息 get.addFamily(Bytes.toBytes("info")); // 查询 Result result = t1.get(get); // 获取 result 中的所有 cell 对象 Cell[] cells = result.rawCells(); for (Cell cell : cells) { String cf = Bytes.toString(CellUtil.cloneFamily(cell)); String column = Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); String rowkey = Bytes.toString(CellUtil.cloneRow(cell)); System.out.println(rowkey + "\t" + cf + "\t" + column + "\t" + value); } t1.close(); } @Test public void scanAllData() throws IOException { // 获取表 Table t1 = conn.getTable(TableName.valueOf("teacher")); // 创建 scan 对象 Scan scan = new Scan(); // ResultScanner resultScanner = t1.getScanner(scan); for (Result result : resultScanner) { Cell[] cells = result.rawCells(); for (Cell cell : cells) { String cf = Bytes.toString(CellUtil.cloneFamily(cell)); String column = Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); String rowkey = Bytes.toString(CellUtil.cloneRow(cell)); System.out.println(rowkey + "\t" + cf + "\t" + column + "\t" + value); } } t1.close(); } // 通过 startRowKey 和 endRowKey 进行扫描 @Test public void scanRowKey() throws IOException { // 获取表 Table t1 = conn.getTable(TableName.valueOf("teacher")); // scan 对象 Scan scan = new Scan(); scan.setStartRow("001".getBytes()); scan.setStopRow("2".getBytes()); ResultScanner resultScanner = t1.getScanner(scan); for (Result result : resultScanner) { Cell[] cells = result.rawCells(); for (Cell cell : cells) { String cf = Bytes.toString(CellUtil.cloneFamily(cell)); String column = Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); String rowkey = Bytes.toString(CellUtil.cloneRow(cell)); System.out.println(rowkey + "\t" + cf + "\t" + column + "\t" + value); } } t1.close(); } @After public void release() { if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
⼀个完整的数据分析系统通常都是由⼤量任务单元组成
各任务单元有前后依赖关系,组成工作流
如:某个业务系统每天产生 20 G 原始数据,每天都要进行处理,逻辑如下:
对于这类复杂的工作流,crontab 定时任务无法满足需求,就需要一个调度系统来支持
vi command.job # 内容 type=command command=echo 'hello'
zip command.job
在 azkaban 的 web 界面创建 project 并伤处 job 压缩包
执行 job
# 第二个 job 依赖 第一个 job # 第一个 job, foo.job type=command command=echo 'foo' # 第二个 job, bar.job type=command dependencies=foo command=echo 'bar'
打包时,将这两个 job 文件打包到一起
# 将 hdfs 命令写入 job 文件, fs.job type=command command=/opt/lagou/servers/hadoop-2.9.2/bin/hdfs dfs -mkdir /azkaban
# 需要 mr 程序的 jar 包 # mrwc.job type=command command=/opt/lagou/servers/hadoop-2.9.2/bin/hadoop jar hadoop-mapreduce-examples-2.9.2.jar wordcount /wordcount/input /wordcount/azout
# hive 脚本 test.hql use default; drop table aztest; create table aztest(id int, name string) row format delimited fields terminated by ',';
# hivef.job type=command command=/opt/lagou/servers/hive-2.3.7/bin/hive -f 'test.hql'
在 Azkaban 的 web 界面配置 project 的定时调度信息