ZK java的API主要复现shell的方法,增加了同步和异步的实现方法。
ZK的客户端可以是观察者也可以是被观察者。注册监听的对象Watcher就是观察者,能接受事件回调的信息。同时我们也可以自己实现
Watcher
接口,重写process
方法自定义事件的输出
Api主要有连接
,(是否异步or递归)创建节点
,(是否异步or递归)删除节点
,更新节点数据
,查看节点信息
,判断节点存在
,获取子节点
,关闭连接
,实现监听等
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.7</version> </dependency> </dependencies>
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
public abstract class ZkClientFactory { private static String hosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"; private static int sessionTimeOut=6000; /** * 同步创建节点方法 * @return * @throws IOException * @throws KeeperException * @throws InterruptedException */ public static ZooKeeper createNode() throws IOException, KeeperException, InterruptedException { /** * 构造一个 同步计数器 初始化的CountDownLatch 。 * 参数: * count –在线程可以通过await之前必须调用countDown的次数 * 这里填1表示在之后程序调用一次 */ final CountDownLatch countDownLatch = new CountDownLatch(1); /** * hosts: 连接对象 * sessionTimeOut: 会话超时时间 * Watcher: 观察者对象,接收响应事件 */ ZooKeeper zooKeeper = new ZooKeeper(hosts, sessionTimeOut, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //已建立连接 if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) { countDownLatch.countDown(); } } }); countDownLatch.await(); System.out.println("over"); return zooKeeper; } public void closeClient(){} }
public class ZkClient extends ZkClientFactory{ private ZooKeeper zooKeeper=createNode(); ...}
/** * 测试四种同步节点创建方式 */ @Test public void createSyncNodes() { try { final CountDownLatch countDownLatch = new CountDownLatch(1); //持久节点 String app1 = zooKeeper.create("/app1", "app1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //临时节点 String app2 = zooKeeper.create("/app2", "app2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); //持久顺序编号节点 String app3 = zooKeeper.create("/app3", "app3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); //临时顺序编号节点 String app4 = zooKeeper.create("/app4", "app4".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); countDownLatch.await(); System.out.println("sync over"); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }finally { try { zooKeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } } }
class IStringCallBack implements AsyncCallback.StringCallback { @Override public void processResult(int rc, String path, Object ctx, String name) { switch (KeeperException.Code.get(rc)){ case CONNECTIONLOSS: System.out.println("无连接传输模式"); break; case OK: System.out.println("{"+"OK"+path+ ", " + name + ", " + ctx + "}"); CountDownLatch countDownLatch1 = (CountDownLatch) ctx; countDownLatch1.countDown(); break; case NODEEXISTS: System.out.println(path+"exists"); break; default: System.out.println("default"); break; } } }
/** * 测试四种异步节点创建方式 */ @Test public void createAsyncNodes() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); zooKeeper.create("/Async", "hello Async".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (AsyncCallback.StringCallback) new IStringCallBack(), countDownLatch); zooKeeper.create("/Async", "hello Async".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (AsyncCallback.StringCallback) new IStringCallBack(), countDownLatch); zooKeeper.create("/Async", "hello Async".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, (AsyncCallback.StringCallback) new IStringCallBack(), countDownLatch); zooKeeper.create("/Async", "hello Async".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, (AsyncCallback.StringCallback) new IStringCallBack(), countDownLatch); countDownLatch.await(); System.out.println("Async over"); }
/** * 同步异步删除节点数据 * @throws KeeperException * @throws InterruptedException */ @Test public void delete() throws KeeperException, InterruptedException { //SyncDelete zooKeeper.delete("/Async",-1); //AsyncDelete // zooKeeper.delete(); }
/** * 递归删除多层节点 * @param path 多层节点路径 * @throws KeeperException * @throws InterruptedException */ private void delete(String path) throws KeeperException, InterruptedException { List<String> children = zooKeeper.getChildren(path, false); for (String child : children) { delete(path + "/" + child); } zooKeeper.delete(path, -1); }
/** * 递归创建多层节点 * @param path 多层节点路径 * @param data 多层节点数据 * @throws KeeperException * @throws InterruptedException */ private void create(String path, String data) throws KeeperException, InterruptedException { String[] split = path.split("/"); StringBuilder p = new StringBuilder(); for (int i = 1; i < split.length - 1; i++) { p.append("/").append(split[i]); if(null == zooKeeper.exists(p.toString(), null)) { zooKeeper.create(p.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }
/** * 同步异步获取节点数据 * @throws KeeperException * @throws InterruptedException */ @Test public void getData() throws KeeperException, InterruptedException { //sync byte[] data1 = zooKeeper.getData("/Async", null, null); System.out.println(new String(data1)); //async zooKeeper.getData("/Async", null, (rc, path, ctx, data, stat) -> { System.out.printf("/Async data:{}", new String(data)); }, null); }
/** * 更新节点数据 * @throws KeeperException * @throws InterruptedException */ @Test public void updateNode() throws KeeperException, InterruptedException { zooKeeper.setData("/Async", "Async yes or no?".getBytes(), -1); getData(); }
/** * 判断节点是否存在,获取节点信息 * @throws KeeperException * @throws InterruptedException */ @Test public void isExists() throws KeeperException, InterruptedException { Stat stat = zooKeeper.exists("/app1", false); }