import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.junit.Test; public class ZkTest { private RetryPolicy retryPolicy; private CuratorFramework client; public ZkTest() { // 重试策略 this.retryPolicy = new ExponentialBackoffRetry(1000,3); //创建连接 //注意sessionTimeoutMs 会话时间不能够设置太短,否则会报错 this.client = CuratorFrameworkFactory.newClient( "192.168.88.3:2181,192.168.88.4:2181,192.168.88.5:2181" ,50000 ,1000 , this.retryPolicy ); System.out.println("正在连接"); this.client.start(); System.out.println("连接成功"); } /* * 创建节点 路径 + 内容 */ @Test public void createPointFun1() throws Exception { this.client.create().forPath("/ZkTest/fun1","测试".getBytes()); } /* * 创建节点 路径 * 初始内容为空 */ @Test public void createPointFun2() throws Exception { this.client.create().forPath("/ZkTest/fun2"); } /* * 创建节点 * 自动递归创建父节点 */ @Test public void createPointFun3() throws Exception { this.client.create().creatingParentsIfNeeded().forPath("/ZkTest2/fun3"); } /* * 创建临时节点 */ @Test public void createPointFun4() throws Exception { this.client.create().withMode(CreateMode.EPHEMERAL).forPath("/ZkTest/fun4","测试创建临时节点".getBytes()); //等待10秒查看结果 Thread.sleep(10000); } /* * 删除节点 */ @Test public void deletePointFun1() throws Exception { this.client.delete().forPath("/ZkTest2/fun3"); } /* * 删除节点并递归删除其子节点 */ @Test public void deletePointFun2() throws Exception { this.client.delete().deletingChildrenIfNeeded().forPath("/ZkTest2"); } /* * 强制保证删除一个节点 */ @Test public void deletePointFun3() throws Exception { this.client.delete().guaranteed().forPath("/ZkTest2"); } /* * 读取数据 */ @Test public void getDataFun1() throws Exception { byte[] result = this.client.getData().forPath("/ZkTest/fun1"); String s = new String(result); System.out.println(s); } /* * 包含状态的查询 */ @Test public void getDataFun2() throws Exception { Stat stat = new Stat(); byte[] result = this.client.getData().storingStatIn(stat).forPath("/ZkTest/fun1"); System.out.println(new String(result)); System.out.println(stat.toString()); } /* * 更新数据 */ @Test public void setDataFun3() throws Exception { this.client.setData().forPath("/ZkTest/fun1","测试 更新内容".getBytes()); } /* * 监听节点 */ @Test public void watchFun1() throws Exception { NodeCache nodecache = new NodeCache(this.client,"/ZkTest/fun1"); nodecache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { String path = nodecache.getPath(); System.out.println("节点 " + path + "检测到操作"); } }); nodecache.start(true); System.out.println("监听器开启"); Thread.sleep(60000); } }