Zookeeper命令行与API应用
上一章,我们对zookeeper的基本概念以及环境搭建进行了学习与配置,有了环境,接下来就正式操作zookeeper啦
zookeeper和HDFS一样,同样分为命令行和客户端操作两种方式,主要就是各种命令和API的调用,大家需要的时候查看这篇博客即可
命令基本语法 | 功能描述 |
---|---|
help | 显示所有操作命令 |
ls path 使用 ls 命令来查看当前znode的子节点 | -w 监听子节点变化 -s 附加次级信息 |
create 普通创建 | -s 含有序列 -e 临时(重启或者超时消失) |
get path 获得节点的值 | -w 监听节点内容变化 -s 附加次级信息 |
set | 设置节点的具体值 |
stat | 查看节点状态 |
delete | 删除节点 |
deleteall | 递归删除节点 |
具体操作方法(注意,一定先启动了zookeeper才可以开启客户端)
启动客户端
zkCli.sh
help
显示所有操作命令
help
ls
查看当前znode总所包含的内容
# 查看根节点下所有的内容 ls /
ls2
查看当前节点详细数据
# 查看根节点下的详细信息 ls2 /
create
分别创建2个普通节点
# 创建一个节点 sanguo 并且添加值jinlian create /sanguo "jinlian" # 在节点sanguo下创建一个shuguo,在shuguo节点下添加内容 liubei create /sanguo/shuguo "liubei"
get
获得节点的值
# 获取sanguo节点中存储的节点值 jinlian get /sanguo # 获取sanguo节点下的子节点shuguo的节点值 liubei get /sanguo/shuguo
create -e
创建临时节点
# 在sanguo节点下创建一个临时节点wuguo 并且添加节点值zhouyu create -e /sanguo/wuguo "zhouyu"
注意:临时节点在当前客户端是可以看到的,当我们关闭客户端重新连接后,临时节点就自动销毁了
create -s
创建带序号的节点
# 先创建一个普通的根节点/sanguo/weiguo /sanguo/weiguo # 然后创建一个带序号的子节点weiguo 然后再weiguo下面创建一个子节点xiaoqiao并且添加节点值为 jinlian create -s /sanguo/weiguo/xiaoqiao "jinlian"
set
修改节点的数据值
# 修改sanguo下的weiguo节点中的值为simayi set /sanguo/weiguo "simayi"
get -w
监听节点的值变化情况
get -w /sanguo
set /sanguo "zhaoyun"
WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo
ls -w
监听节点中子节点的变化情况(路径变化)
bashls -w /sanguo
create /sanguo/qinguo
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo
delete
删除节点
# 删除 /sanguo/节点下的jin_guo delete /sanguo/jin_guo
deleteall
递归删除节点
# 递归删除/sanguo下的shuguo中所有的内容 deleteall /sanguo/shuguo
stat
查看节点状态
# 查看/sanguo的节点状态 stat /sanguo
命令行模式介绍完后,下面就是如何通过代码来操作zookeeper,同样的,我们需要先在idea
中配置一下
搭建前的准备工作,在zookeeper官网下载zookeeper的源码包,然后我们将其解压
解压后的文件
第一步: 创建一个Maven工程
第二步: 添加依赖到pom.xml配置文件
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.7</version> </dependency> </dependencies>
第三步: 添加log4j.properties
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
第四步:关联zookeeper的源码到我们的项目中
操作方式和HDFS,JDBC类似,我就直接上代码了,步骤同样是:
package com.company.zk; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.List; /* 通过代码操作ZK: 1.创建连接对象 2.具体的操作 3.关闭资源 */ public class ZKDemo { private ZooKeeper zk; @Before public void before() throws IOException { /* ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) connectString : zk节点的地址 sessionTimeout : 会话超时时间 watcher :监听器对象(当我们监听的节点或数据发生改变,zk就会调用该对象中的process方法 所以可以在process方法中去实现监听事件发生后的业务逻辑代码) 注意:该监听器对象是对所有事件进行监听(咱们一般不用)。 */ String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; //1.创建连接对象 zk = new ZooKeeper(connectString, 4000, new Watcher() { public void process(WatchedEvent event) { } }); } @After public void after(){ //3.关资源 try { if (zk != null) { zk.close(); } } catch (InterruptedException e) { e.printStackTrace(); } } /* 创建子节点 */ @Test public void test() throws IOException, KeeperException, InterruptedException { /* create(final String path, byte data[], List<ACL> acl, CreateMode createMode) path : 节点路径 data : 节点上存储的数据 acl : 访问控制权限(节点的) createMode : 节点的类型(无序列号永久 无序列号临时 有序列号永久 有序列号临时) */ //2.具体的操作 String s = zk.create("/0323/canglaoshi", "mei".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(s + "====================================="); } /* 判断子节点是否存在 */ @Test public void test2() throws KeeperException, InterruptedException { /* exists(String path, boolean watch) path : 节点路径 watch : 是否监听该节点 注意 : 如果节点不存在则返回null */ Stat exists = zk.exists("/0323/canglaoshi1236666", false); System.out.println((exists==null)?"不存在":"存在"); } /* 获取子节点并监听节点变化 */ @Test public void test3() throws KeeperException, InterruptedException { /* getChildren(final String path, Watcher watcher) path : 节点路径 watcher :监听器对象(当我们监听的节点或数据发生改变,zk就会调用该对象中的process方法 所以可以在process方法中去实现监听事件发生后的业务逻辑代码) 返回结果为所有的子节点 */ List<String> children = zk.getChildren("/0323", new Watcher() { /* 当节点发生变化后后被调用 注意:该方法不一定什么时候会被调用。 */ public void process(WatchedEvent event) { System.out.println("节点发生了变化"); } }); //遍历所有的子节点 for (String name : children) { System.out.println(name); } //让线程睡觉 Thread.sleep(Long.MAX_VALUE); } /* 循环监听子节点变化 */ @Test public void test4() throws KeeperException, InterruptedException { listener(); //让线程睡觉 Thread.sleep(Long.MAX_VALUE); } public void listener() throws KeeperException, InterruptedException { List<String> children = zk.getChildren("/0323", new Watcher() { /* 当节点发生变化后后被调用 注意:该方法不一定什么时候会被调用。 */ public void process(WatchedEvent event) { System.out.println("节点发生了变化"); //再次监听 try { listener(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); //遍历所有的子节点 for (String name : children) { System.out.println(name); } } }
下面就来模拟演示上一章zookeeper应用中服务器动态上线下的案例
需求:某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线
编码步骤:
代码如下
package com.company.zookeeperlisten; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.List; /* 客户端要做的事情: 1. 创建zookeeper对象 2.监听父节点 注意:是否允许先启动客户端.(允许---也需要判断父节点是否存在) */ public class ZKClient { private static ZooKeeper zk; public static void main(String[] args) throws IOException { try { //1.创建连接对象 String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; /* ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) connectString : zk节点的地址 sessionTimeout : 会话超时时间 watcher :监听器对象(当我们监听的节点或数据发生改变,zk就会调用该对象中的process方法 所以可以在process方法中去实现监听事件发生后的业务逻辑代码) 注意:该监听器对象是对所有事件进行监听(咱们一般不用)。 */ zk = new ZooKeeper(connectString, 4000, new Watcher() { public void process(WatchedEvent event) { } }); //2.获取子节点并监听节点 listener(); Thread.sleep(Long.MAX_VALUE); }catch (Exception e){ e.printStackTrace(); }finally { //关闭资源 if (zk != null){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } } } // 循环监听子节点 public static void listen(){ List<String> children = null; try { // 判断父节点是否存在 Stat exists = zooKeeper.exists("/parent",false); if (exists == null){ // 如果不存在就添加 zooKeeper.create("/parent","fu".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } //获取子节点并监听,并返回所有的子节点 children = zooKeeper.getChildren("/parent", watchedEvent -> { System.out.println("========="+"parent节点发生了变化"+"========"); listen(); }); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } if (children != null){ // 循环打印出所有的子节点 for (String child : children) { System.out.println(child); } } } }
package com.company.zookeeperlisten; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; /* 服务器要做的事情: 1.创建zookeeper对象 2.创建临时节点 注意 : ①判断父节点是否存在-不在需要创建 */ public class ZKServer { public static void main(String[] args){ ZooKeeper zk = null; try { //1.创建连接对象 String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; /* ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) connectString : zk节点的地址 sessionTimeout : 会话超时时间 watcher :监听器对象(当我们监听的节点或数据发生改变,zk就会调用该对象中的process方法 所以可以在process方法中去实现监听事件发生后的业务逻辑代码) 注意:该监听器对象是对所有事件进行监听(咱们一般不用)。 */ zk = new ZooKeeper(connectString, 4000, new Watcher() { public void process(WatchedEvent event) { } }); //2.判断父节点是否存在-不在需要创建 Stat exists = zk.exists("/parent", false); if (exists == null) {//不存在 zk.create("/parent", "aa".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } //3.创建临时节点 zk.create("/parent/" + args[0], args[1].getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Thread.sleep(Long.MAX_VALUE); }catch (Exception e){ e.printStackTrace(); }finally { //要关资源 if (zk != null){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
IDEA传参方式
本章节介绍了如何简单的通过命令行(shell)和代码操作zookeeper,操作方法和JDBC、HDFS相同,只是API不一样…大家根据这篇博客的案例即可轻松掌握如何使用zookeeper,当然了,zookeeper深挖了也有很多的东西,不过我们只需要知道大概,因为学习它是为了我们下一步Hadoop的HA做准备的,好了,下一章,我为大家带来zookeeper的内部原理