前面已经介绍了使用命令行操作 Zookeeper,方便我们对 Zookeeper 有一个整体的认识。
Zookeeper 本质上就是一个 NoSQL 数据库,只不过其存储的数据结构是树状结构形式,理解起来很简单。
对于一个新手小白来说,面对 Zookeeer,介绍一大堆概念,没啥用处,没有什么比代码来得更加实际一些。
本篇博客通过编写 Java 代码对 Zookeeper 进行增删改查,让大家轻松掌握 Zookeeper 的基本操作,增强自信心。
在本篇博客的最后会提供源代码 Demo 下载。
新建一个 maven 项目,导入相关 jar 包,内容如下:
有关具体的 jar 包地址,可以在 https://mvnrepository.com 上进行查询。
<dependencies> <!--导入 Spring 的 jar 包--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.3.18</version> </dependency> <!--导入 Spring 整合 junit 的 jar 包--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.3.18</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> <scope>test</scope> </dependency> <!--导入 curator 的 jar 包--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <!--导入日志相关的 jar 包--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> </dependency> </dependencies>
搭建后的最终工程如下图所示,非常简单:
使用 Java 操作 Zookeeper 的方式很多,相比而言采用 Curator 组件比较简单,因此也比较流行。
本篇博客的 Demo 采用 Spring 集成 Curator 组件的方式,通过 Java 调用其 API 方法对 Zookeeper 进行增删改查。
需要导入 curator-framework 和 curator-recipes 两个 jar 包,请参考官网,需要注意 Curator 与 zookeeper 版本对应问题。
Curator 的官网地址是:https://curator.apache.org
首先需要对连接 Zookeeper 的信息进行配置,具体配置细节在 zookeeper.properties 文件中:
# zookeeper的连接字符串 # 如果是操作 zookeeper 集群,可以配置多个 zookeeper 地址 # 多个地址之间用英文逗号分隔,如 ip1:port1,ip2:port2,ip3:port3 zk.connectString=127.0.0.1:2181 # zookeeper的会话超时时间 # 单位:毫秒,默认是 60 秒 zk.sessionTimeoutMs=60000 # zookeeper的连接超时时间 # 单位:毫秒,默认是 15 秒 zk.connectionTimeoutMs=15000 # zookeeper默认操作的根节点 # 所有的增删改查操作,默认在该节点下进行 zk.namespace=jobs
然后在代码中使用 Spring 集成 Curator 组件,从 zookeeper.properties 读取连接配置信息:
package com.jobs.config; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.PropertySource; //加载 zookeeper.properties 文件内容 @PropertySource("classpath:zookeeper.properties") public class zookeeperConfig { @Value("${zk.connectString}") private String connectString; @Value("${zk.sessionTimeoutMs}") private Integer sessionTimeoutMs; @Value("${zk.connectionTimeoutMs}") private Integer connectionTimeoutMs; @Value("${zk.namespace}") private String namespace; //获取 Curator 的客户端连接 @Bean public CuratorFramework getCuratorFramework(){ //配置重试策略,如果没有连接失败,最多重试 1 次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .namespace(namespace) .retryPolicy(retryPolicy) .build(); client.start(); return client; } }
package com.jobs.config; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; //采用 Spring 集成 Curator,导入 zookeeperConfig 配置类 @Configuration @Import(zookeeperConfig.class) public class springConfig { }
下面我们就使用 junit 单元测试,使用 Java 采用 Curator 的 API 方法操作 Zookeeper,具体如下所示:
package com.jobs.test; import com.jobs.config.springConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = springConfig.class) public class zkTest { @Autowired private CuratorFramework client; //注意: //由于我在 zookeeper.properties 下配置的 namespace 是 jobs //因此下面对 zookeeper 的所有操作,默认情况下都是在 /jobs 节点下进行操作的 @Test public void createTest() throws Exception { //默认情况下,创建的都是持久化的节点,一旦创建,永久存储 //在创建节点时,没有指定存储的数据,默认情况下存储的是当前客户端机器的 ip 地址 String path1 = client.create().forPath("/test1"); System.out.println("成功创建节点:" + path1); //获取节点中存储的数据,由于在创建时没有存储数据,所以发现存储的是当前客户端机器的 ip 地址 byte[] data1 = client.getData().forPath("/test1"); System.out.println("存储的数据为:" + new String(data1)); System.out.println("-------------------------------------"); //创建节点时,同时存储数据 String path2 = client.create().forPath("/test2", "乔京飞".getBytes()); System.out.println("成功创建节点:" + path2); //获取节点中存储的数据 byte[] data2 = client.getData().forPath("/test2"); System.out.println("存储的数据为:" + new String(data2)); } @Test public void createTempTest() throws Exception { //创建临时节点,一旦当前连接断开,zookeeper 会自动销毁所创建的节点 String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/test3", "任肥肥".getBytes()); System.out.println("成功创建临时节点:" + path); byte[] data = client.getData().forPath("/test3"); System.out.println("存储的数据为:" + new String(data)); //注意: //由于当前方法执行完后,zookeeper 连接就断开了,临时节点就销毁了 //所以你可以通过命令行的方式,查看 zookeeper 中 /jobs 下的节点,发现找不到 /jobs/test3 节点 //但是:通过上面运行的程序,打印出节点信息和节点存储的数据,可以断定:/jobs/test3 节点曾经被创建过 } @Test public void autoCreateParentsNodeTest() throws Exception { //当我们创建深层次节点时,可能路径上的父节点并不存在,所以仅仅 create 无法同时创建父子节点 //可以使用 creatingParentsIfNeeded 方法,同时创建父子节点 String path = client.create().creatingParentsIfNeeded().forPath("/test4/node1"); System.out.println(path); } //---------------------------------------- //---------------------------------------- @Test public void getTest() throws Exception { //查询节点中存储的数据 byte[] data = client.getData().forPath("/test4/node1"); //由于在创建节点时,没有存储数据,所以默认存储的是当前客户端所在机器的 ip 地址 System.out.println(new String(data)); } @Test public void getChildrenNodeTest() throws Exception { //获取一个节点下面,所有的子节点,并打印出来 //注意:由于 namespace 配置的是 jobs ,所以这里的 / 代表的是 /jobs List<String> pathlist = client.getChildren().forPath("/"); for (String path : pathlist) { System.out.println(path); } } @Test public void getNodeStatusTest() throws Exception { //查看一个节点的详细状态信息,相当于执行命令 ls -s 节点 Stat status = new Stat(); client.getData().storingStatIn(status).forPath("/test2"); //由于开发 Curator 的程序员太懒惰了, //所以打印出来的每个数字的含义,请查看 Stat 类的 toString 方法 System.out.println(status); } //---------------------------------------- //---------------------------------------- @Test public void setDataTest() throws Exception { byte[] data1 = client.getData().forPath("/test1"); System.out.println("【修改前】存储的数据为:" + new String(data1)); //修改数据为当前时间毫秒值 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String nowTime = sdf.format(new Date()); client.setData().forPath("/test1", nowTime.getBytes()); byte[] data2 = client.getData().forPath("/test1"); System.out.println("【修改后】存储的数据为:" + new String(data2)); } @Test public void SetDataTest2() throws Exception { //如果所要修改的数据,是公共资源,存在并发修改的情况 //为了确保修改数据的安全性,可以采用版本号进行控制修改操作 //也就是所谓的乐观锁机制 //在修改数据时,如果发现数据版本号不是自己读取的版本号,则放弃修改 Stat status = new Stat(); client.getData().storingStatIn(status).forPath("/test2"); //获取数据版本号 int version1 = status.getVersion(); System.out.println("【修改前】的数据版本号:" + version1); //要修改的节点版本号,与所提供的版本号,不一致时,会报异常 client.setData().withVersion(version1).forPath("/test2", "任肥肥".getBytes()); client.getData().storingStatIn(status).forPath("/test2"); //获取数据版本号 int version2 = status.getVersion(); System.out.println("【修改后】的数据版本号:" + version2); } //---------------------------------------- //---------------------------------------- @Test public void deleteTest() throws Exception { //删除节点,如果节点下有子节点的话,无法删除 //要删除的节点不存在时,会报异常 client.delete().forPath("/test1"); } @Test public void deleteAllTest() throws Exception { //删除节点,即使有子节点,连同子节点一起删除 //要删除的节点不存在时,会报异常 client.delete().deletingChildrenIfNeeded().forPath("/test4"); } @Test public void testDelete3() throws Exception { //保证删除成功,如果网络有问题,则会定期重试 //要删除的节点不存在时,会报异常 client.delete().guaranteed().forPath("/test2"); } @Test public void testDelete4() throws Exception { //节点删除成功后,执行回调方法 //要删除的节点不存在时,会报异常 client.delete().guaranteed().inBackground((client, event) -> { System.out.println("Lambda表达式写法:节点删除操作执行成功...."); System.out.println(event); }).forPath("/test1"); client.delete().guaranteed().inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { //参数含义: //client 其实就是客户端连接对象,可以对 zookeeper 节点进行增删改查 //event 事件状态对象 System.out.println("匿名函数写法:节点删除操作执行成功...."); System.out.println(event); } }).forPath("/test2"); } }
在运行以上单元测试方法时,可以使用上篇博客中所介绍的命令行操作 Zookeeper 进行操作结果的查看和验证。
到此为止,有关使用 Java 通过 Curator 组件的 API 对 Zookeeper 进行基本的增删改查操作,已经介绍完毕,非常简单。
本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/zookeeper_curator.zip