ZooKeeper Java客户端。
这里介绍两个客户端,一个为 ZooKeeper 原生客户端,一个为 ZkClient。
首先创建一个maven项目,pom文件中添加依赖。
<!-- 原生 --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency> <!-- zkclient --> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
之前安装过程看到 ZooKeeper 依赖 jdk 环境,可以看出它的源码使用了Java语言基础,学习原生客户端对于以后看源码有帮助,接下来看一看使用方式。
/** * @Author SunnyBear * @Description 创建Session */ public class TestCreateSession { /** * ZooKeeper服务地址 */ private final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181"; /** * 会话超时时间 */ private static final int SESSION_TIMEOUT = 30000; public static void main(String[] args) throws IOException, InterruptedException { new TestCreateSession().testSession1(); System.out.println("--------------------------华丽的分割线--------------------------"); new TestCreateSession().testSession2(); } /** * 获得session的方式,这种方式可能会在ZooKeeper还没有获得连接的时候就已经对ZK进行访问了 * 测试可以发现连接状态为CONNECTING,而不是CONNECTED */ public void testSession1() throws IOException { ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, null); System.out.println("zooKeeper: " + zooKeeper); System.out.println("zooKeeper.getState(): " + zooKeeper.getState()); } /** * 发令枪 */ private CountDownLatch connectedSemaphore = new CountDownLatch(1); /** * 使用发令枪对获得Session的方式进行优化,在ZooKeeper初始化完成以前先等待,等待完成后再进行后续操作 */ public void testSession2() throws IOException, InterruptedException { ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, new Watcher() { public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { // 状态为已连接时才进行后续操作 connectedSemaphore.countDown(); System.out.println("状态为已连接。。"); } } }); connectedSemaphore.await(); System.out.println("zooKeeper: " + zooKeeper); System.out.println("zooKeeper.getState(): " + zooKeeper.getState()); } }
/** * @Author SunnyBear * @Description 基本操作 */ public class TestJavaApi implements Watcher { private static final int SESSION_TIMEOUT = 10000; private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181"; private static final String ZK_PATH = "/SunnyBearApiTest"; private static final String ZK_DATA = "我是SunnyBearApiTest的数据"; private ZooKeeper zk = null; private CountDownLatch connectedSemaphore = new CountDownLatch(1); /** * 创建连接 * @param server zk服务器地址列表 * @param sessionTimeout session超时时间 */ public void createConnection(String server, int sessionTimeout) { try { zk = new ZooKeeper(server, sessionTimeout, this); connectedSemaphore.await(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 收到来自Server的Watcher通知,然后调用的方法处理 */ public void process(WatchedEvent watchedEvent) { System.out.println("收到事件通知:" + watchedEvent); if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { connectedSemaphore.countDown(); } } /** * 关闭连接 */ public void releaseConnection() { if (this.zk != null) { try { this.zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 创建节点 * @param path 节点path * @param data 初始数据内容 * @return 是否创建成功 */ public boolean createPath(String path, String data) { try { /** * ZooDefs.Ids.OPEN_ACL_UNSAFE:节点权限 * CreateMode.EPHEMERAL:节点类型为临时节点 */ String createPath = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("创建节点path:" + createPath); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } /** * 读取指定节点数据 * @param path 节点path * @return 节点内容 */ public String readData(String path) { try { String data = new String(this.zk.getData(path, false, null)); System.out.println("读取数据成功path:" + path + ",data:" + data); return data; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return ""; } /** * 更新指定节点数据 * @param path 节点path * @param data 数据data * @return 是否成功 */ public boolean writeData(String path, String data) { try { System.out.println("更新数据成功,path:" + path + ", stat: " + this.zk.setData(path, data.getBytes(), -1)); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } /** * 删除节点 * @param path 节点path * @return 是否成功 */ public boolean deleteNode(String path) { try { this.zk.delete(path, -1); System.out.println("删除节点成功,path:" + path); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } public static void main(String[] args) { TestJavaApi testJavaApi = new TestJavaApi(); testJavaApi.createConnection(SERVER, SESSION_TIMEOUT); if (testJavaApi.createPath(ZK_PATH, ZK_DATA)) { System.out.println("创建后数据内容:" + testJavaApi.readData(ZK_PATH)); testJavaApi.writeData(ZK_PATH, "我是SunnyBearApiTest修改后的数据"); System.out.println("更新后数据内容:" + testJavaApi.readData(ZK_PATH)); testJavaApi.deleteNode(ZK_PATH); } testJavaApi.releaseConnection(); } }
Zookeeper采用了Watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在Watcher注册后轮询阻塞,从而减轻了客户端压力,Watcher是一次性的,如果被触发了需要重新注册。
/** * @Author SunnyBear * @Description 监听机制 */ public class TestWatcher implements Watcher { private AtomicInteger seq = new AtomicInteger(); private static final int SESSION_TIMEOUT = 100000; private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181"; private static final String PARENT_PATH = "/testWatch"; private static final String CHILDREN_PATH = "/testWatch/children"; private CountDownLatch connectedSemaphore = new CountDownLatch(1); private ZooKeeper zk = null; /** * 创建连接 * @param server zk服务器地址列表 * @param sessionTimeout session超时时间 */ public void createConnection(String server, int sessionTimeout) { try { zk = new ZooKeeper(server, sessionTimeout, this); connectedSemaphore.await(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 关闭连接 */ public void releaseConnection() { if (this.zk != null) { try { this.zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 创建节点 * @param path 节点path * @param data 初始数据内容 * @return 是否创建成功 */ public boolean createPath(String path, String data) { try { // 设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控) this.zk.exists(path, true); // 创建持久节点 String createPath = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("创建节点path:" + createPath); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } /** * 读取指定节点数据 * @param path 节点path * @param needWatch 表示是否需要注册一个watcher。true:注册默认watcher,false:不需要注册watcher * @return 节点内容 */ public String readData(String path, boolean needWatch) { try { String data = new String(this.zk.getData(path, needWatch, null)); System.out.println("读取数据成功path:" + path + ",data:" + data); return data; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return ""; } /** * 更新指定节点数据 * @param path 节点path * @param data 数据data * @return 是否成功 */ public boolean writeData(String path, String data) { try { System.out.println("更新数据成功,path:" + path + ", stat: " + this.zk.setData(path, data.getBytes(), -1)); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } /** * 删除节点 * @param path 节点path * @return 是否成功 */ public boolean deleteNode(String path) { try { this.zk.delete(path, -1); System.out.println("删除节点成功,path:" + path); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } /** * 判断指定节点是否存在 * @param path 节点路径 * @param needWatch 表示是否需要注册一个watcher */ public Stat exists(String path, boolean needWatch) { try { return this.zk.exists(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 获取子节点 * @param path 节点路径 * @param needWatch 表示是否需要注册一个watcher */ private List<String> getChildren(String path, boolean needWatch) { try { return this.zk.getChildren(path, needWatch); } catch (Exception e) { e.printStackTrace(); } return null; } /** * 删除测试用的所有节点 */ public void deleteAllTestPath() { if(this.exists(CHILDREN_PATH, false) != null){ this.deleteNode(CHILDREN_PATH); } if(this.exists(PARENT_PATH, false) != null){ this.deleteNode(PARENT_PATH); } } /** * 收到来自Server的Watcher通知,然后调用的方法处理 */ public void process(WatchedEvent watchedEvent) { System.out.println("收到事件通知:" + watchedEvent); try { Thread.sleep(200); if (watchedEvent == null) { return; } // 连接状态 Event.KeeperState eventState = watchedEvent.getState(); // 事件类型 Event.EventType eventType = watchedEvent.getType(); // 受影响的path String eventPath = watchedEvent.getPath(); // 打印一下监听的信息 String eventPrefix = "[Watch - " + this.seq.incrementAndGet() + "] "; System.out.println(eventPrefix + "收到Watcher通知"); System.out.println(eventPrefix + "连接状态:\t" + eventState.toString()); System.out.println(eventPrefix + "事件类型:\t" + eventType.toString()); if (Event.KeeperState.SyncConnected == eventState) { switch (eventType) { case None: System.out.println(eventPrefix + "成功连接zk服务器"); connectedSemaphore.countDown(); break; case NodeCreated: System.out.println(eventPrefix + "节点创建"); Thread.sleep(200); this.zk.exists(eventPath, true); break; case NodeDataChanged: System.out.println(eventPrefix + "节点数据更新"); Thread.sleep(200); System.out.println(eventPrefix + "数据内容: " + this.readData(PARENT_PATH, true)); break; case NodeChildrenChanged: System.out.println(eventPrefix + "子节点变更"); Thread.sleep(3000); System.out.println(eventPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true)); break; case NodeDeleted: System.out.println(eventPrefix + "节点 " + eventPath + " 被删除"); break; default: break; } } else if (Watcher.Event.KeeperState.Disconnected == eventState) { System.out.println(eventPrefix + "与ZK服务器断开连接"); } else if (Watcher.Event.KeeperState.AuthFailed == eventState) { System.out.println(eventPrefix + "权限检查失败"); } else if (Watcher.Event.KeeperState.Expired == eventState) { System.out.println(eventPrefix + "会话失效"); } } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } System.out.println("-------------------------------------------------------------"); } public static void main(String[] args) throws InterruptedException { TestWatcher zkWatch = new TestWatcher(); // 创建连接 zkWatch.createConnection(SERVER, SESSION_TIMEOUT); Thread.sleep(1000); // 删除所有节点 zkWatch.deleteAllTestPath(); if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) { /** * 读取数据,在操作节点数据之前先调用zookeeper的getData()方法是为了可以watch到对节点的操作。watch是一次性的 * 也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次 */ System.out.println("------------------------读取PARENT------------------------"); zkWatch.readData(PARENT_PATH, true); // 更新数据 zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + ""); Thread.sleep(1000); /** * 读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,而不会输出NodeChildrenChanged, * 也就是说创建子节点时没有watch, * 如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在创建c1时watch,输出c1的NodeChildrenChanged, * 而不会输出创建c2时的NodeChildrenChanged,如果watch到c2的NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法, * 其中path="/p/c1" */ System.out.println("------------------------读取CHILDREN------------------------"); zkWatch.getChildren(PARENT_PATH, true); Thread.sleep(1000); // 创建子节点 zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + ""); Thread.sleep(1000); zkWatch.readData(CHILDREN_PATH, true); zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + ""); } Thread.sleep(20000); // 清理节点 zkWatch.deleteAllTestPath(); Thread.sleep(1000); zkWatch.releaseConnection(); } }
/** * @Author SunnyBear * @Description ZkClient基本操作 */ public class TestZkClientApi { private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181"; private static final int SESSION_TIMEOUT = 30000; public static void main(String[] args) { ZkClient zkClient = new ZkClient(SERVER, SESSION_TIMEOUT); // 创建临时节点,值为null zkClient.createEphemeral("/zkTemp"); // 创建持久节点,,值为null,如果父节点不存在则创建 zkClient.createPersistent("/zkPersistent/zk1", true); // 创建持久节点,有值 zkClient.createPersistent("/zkPersistent/zk2", "zk2内容"); zkClient.createPersistent("/zkPersistent/zk3", "zk3内容"); // 查询节点下面的所有节点 List<String> childrenList = zkClient.getChildren("/zkPersistent"); for (String p : childrenList) { String childrenPath = "/zkPersistent/" + p; // 查询节点数据 String data = zkClient.readData(childrenPath); System.out.println(childrenPath + ":" + data); } // 修改节点数据 zkClient.writeData("/zkPersistent/zk1", "给zk1更新了内容"); System.out.println(zkClient.readData("/zkPersistent/zk1")); // 删除节点 zkClient.delete("/zkTemp"); // 递归删除,即包含子目录的删除 zkClient.deleteRecursive("/zkPersistent"); // 关闭连接 zkClient.close(); } }
/** * @Author SunnyBear * @Description ZkClient监听的测试 */ public class ZkClientWatcher { private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181"; private static final int SESSION_TIMEOUT = 30000; public static void main(String[] args) throws InterruptedException { // test1(); test2(); } /** * 订阅子节点的变化 */ public static void test1() throws InterruptedException { ZkClient zkClient = new ZkClient(new ZkConnection(SERVER), SESSION_TIMEOUT); // 给父节点添加监听子节点变化 zkClient.subscribeChildChanges("/zkPersistent", new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChildList) throws Exception { System.out.println("parentPath:" + parentPath); System.out.println("currentChildList" + currentChildList); } }); Thread.sleep(3000); zkClient.createPersistent("/zkPersistent"); Thread.sleep(1000); zkClient.createPersistent("/zkPersistent/"+ "zk1", "zk1内容"); Thread.sleep(1000); zkClient.createPersistent("/zkPersistent/" + "zk2", "zk2内容"); Thread.sleep(1000); zkClient.delete("/super/c2"); Thread.sleep(1000); zkClient.deleteRecursive("/super"); Thread.sleep(10000); zkClient.close(); } /** * 订阅内容变化 */ public static void test2() throws InterruptedException { ZkClient zkClient = new ZkClient(new ZkConnection(SERVER), SESSION_TIMEOUT); zkClient.createPersistent("/zkPersistent", "zkPersistentData"); // 对父节点添加监听子节点变化。 zkClient.subscribeDataChanges("/zkPersistent", new IZkDataListener() { public void handleDataDeleted(String path) throws Exception { System.out.println("删除的节点为:" + path); } public void handleDataChange(String path, Object data) throws Exception { System.out.println("变更的节点为:" + path + ", 变更内容为:" + data); } }); Thread.sleep(3000); zkClient.writeData("/zkPersistent", "zkPersistentDataUpdate", -1); Thread.sleep(1000); zkClient.delete("/zkPersistent"); Thread.sleep(10000); } }
都读到这里了,来个 点赞、评论、关注、收藏 吧!
文章作者:IT王小二
首发地址:https://www.itwxe.com/posts/5a33d634/
版权声明:文章内容遵循 署名-非商业性使用-禁止演绎 4.0 国际 进行许可,转载请在文章页面明显位置给出作者与原文链接。