Java教程

Zookeeper 使用 Java 进行增删改查操作

本文主要是介绍Zookeeper 使用 Java 进行增删改查操作,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

前面已经介绍了使用命令行操作 Zookeeper,方便我们对 Zookeeper 有一个整体的认识。

Zookeeper 本质上就是一个 NoSQL 数据库,只不过其存储的数据结构是树状结构形式,理解起来很简单。

对于一个新手小白来说,面对 Zookeeer,介绍一大堆概念,没啥用处,没有什么比代码来得更加实际一些。

本篇博客通过编写 Java 代码对 Zookeeper 进行增删改查,让大家轻松掌握 Zookeeper 的基本操作,增强自信心。

在本篇博客的最后会提供源代码 Demo 下载。


一、搭建工程

新建一个 maven 项目,导入相关 jar 包,内容如下:

有关具体的 jar 包地址,可以在 https://mvnrepository.com 上进行查询。

<dependencies>

    <!--导入 Spring 的 jar 包-->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.3.18</version>
    </dependency>

    <!--导入 Spring 整合 junit 的 jar 包-->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.3.18</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>

    <!--导入 curator 的 jar 包-->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.0</version>
    </dependency>

    <!--导入日志相关的 jar 包-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.21</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.21</version>
    </dependency>
</dependencies>

搭建后的最终工程如下图所示,非常简单:

image

使用 Java 操作 Zookeeper 的方式很多,相比而言采用 Curator 组件比较简单,因此也比较流行。

本篇博客的 Demo 采用 Spring 集成 Curator 组件的方式,通过 Java 调用其 API 方法对 Zookeeper 进行增删改查。

需要导入 curator-framework 和 curator-recipes 两个 jar 包,请参考官网,需要注意 Curator 与 zookeeper 版本对应问题。

Curator 的官网地址是:https://curator.apache.org


二、代码细节

首先需要对连接 Zookeeper 的信息进行配置,具体配置细节在 zookeeper.properties 文件中:

# zookeeper的连接字符串
# 如果是操作 zookeeper 集群,可以配置多个 zookeeper 地址
# 多个地址之间用英文逗号分隔,如 ip1:port1,ip2:port2,ip3:port3
zk.connectString=127.0.0.1:2181

# zookeeper的会话超时时间
# 单位:毫秒,默认是 60 秒
zk.sessionTimeoutMs=60000

# zookeeper的连接超时时间
# 单位:毫秒,默认是 15 秒
zk.connectionTimeoutMs=15000

# zookeeper默认操作的根节点
# 所有的增删改查操作,默认在该节点下进行
zk.namespace=jobs

然后在代码中使用 Spring 集成 Curator 组件,从 zookeeper.properties 读取连接配置信息:

package com.jobs.config;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;

//加载 zookeeper.properties 文件内容
@PropertySource("classpath:zookeeper.properties")
public class zookeeperConfig {

    @Value("${zk.connectString}")
    private String connectString;

    @Value("${zk.sessionTimeoutMs}")
    private Integer sessionTimeoutMs;

    @Value("${zk.connectionTimeoutMs}")
    private Integer connectionTimeoutMs;

    @Value("${zk.namespace}")
    private String namespace;


    //获取 Curator 的客户端连接
    @Bean
    public CuratorFramework getCuratorFramework(){
        //配置重试策略,如果没有连接失败,最多重试 1 次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectString)
                        .sessionTimeoutMs(sessionTimeoutMs)
                        .connectionTimeoutMs(connectionTimeoutMs)
                        .namespace(namespace)
                        .retryPolicy(retryPolicy)
                        .build();
        client.start();
        return client;
    }
}
package com.jobs.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

//采用 Spring 集成 Curator,导入 zookeeperConfig 配置类
@Configuration
@Import(zookeeperConfig.class)
public class springConfig { }

下面我们就使用 junit 单元测试,使用 Java 采用 Curator 的 API 方法操作 Zookeeper,具体如下所示:

package com.jobs.test;

import com.jobs.config.springConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = springConfig.class)
public class zkTest {

    @Autowired
    private CuratorFramework client;

    //注意:
    //由于我在 zookeeper.properties 下配置的 namespace 是 jobs
    //因此下面对 zookeeper 的所有操作,默认情况下都是在 /jobs 节点下进行操作的

    @Test
    public void createTest() throws Exception {

        //默认情况下,创建的都是持久化的节点,一旦创建,永久存储

        //在创建节点时,没有指定存储的数据,默认情况下存储的是当前客户端机器的 ip 地址
        String path1 = client.create().forPath("/test1");
        System.out.println("成功创建节点:" + path1);

        //获取节点中存储的数据,由于在创建时没有存储数据,所以发现存储的是当前客户端机器的 ip 地址
        byte[] data1 = client.getData().forPath("/test1");
        System.out.println("存储的数据为:" + new String(data1));

        System.out.println("-------------------------------------");

        //创建节点时,同时存储数据
        String path2 = client.create().forPath("/test2", "乔京飞".getBytes());
        System.out.println("成功创建节点:" + path2);

        //获取节点中存储的数据
        byte[] data2 = client.getData().forPath("/test2");
        System.out.println("存储的数据为:" + new String(data2));
    }

    @Test
    public void createTempTest() throws Exception {

        //创建临时节点,一旦当前连接断开,zookeeper 会自动销毁所创建的节点

        String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/test3", "任肥肥".getBytes());
        System.out.println("成功创建临时节点:" + path);
        byte[] data = client.getData().forPath("/test3");
        System.out.println("存储的数据为:" + new String(data));

        //注意:
        //由于当前方法执行完后,zookeeper 连接就断开了,临时节点就销毁了
        //所以你可以通过命令行的方式,查看 zookeeper 中 /jobs 下的节点,发现找不到 /jobs/test3 节点
        //但是:通过上面运行的程序,打印出节点信息和节点存储的数据,可以断定:/jobs/test3 节点曾经被创建过
    }

    @Test
    public void autoCreateParentsNodeTest() throws Exception {

        //当我们创建深层次节点时,可能路径上的父节点并不存在,所以仅仅 create 无法同时创建父子节点
        //可以使用 creatingParentsIfNeeded 方法,同时创建父子节点

        String path = client.create().creatingParentsIfNeeded().forPath("/test4/node1");
        System.out.println(path);
    }

    //----------------------------------------
    //----------------------------------------

    @Test
    public void getTest() throws Exception {

        //查询节点中存储的数据
        byte[] data = client.getData().forPath("/test4/node1");
        //由于在创建节点时,没有存储数据,所以默认存储的是当前客户端所在机器的 ip 地址
        System.out.println(new String(data));
    }

    @Test
    public void getChildrenNodeTest() throws Exception {

        //获取一个节点下面,所有的子节点,并打印出来
        //注意:由于 namespace 配置的是 jobs ,所以这里的 / 代表的是 /jobs
        List<String> pathlist = client.getChildren().forPath("/");
        for (String path : pathlist) {
            System.out.println(path);
        }
    }

    @Test
    public void getNodeStatusTest() throws Exception {

        //查看一个节点的详细状态信息,相当于执行命令 ls -s 节点

        Stat status = new Stat();
        client.getData().storingStatIn(status).forPath("/test2");

        //由于开发 Curator 的程序员太懒惰了,
        //所以打印出来的每个数字的含义,请查看 Stat 类的 toString 方法
        System.out.println(status);
    }

    //----------------------------------------
    //----------------------------------------

    @Test
    public void setDataTest() throws Exception {

        byte[] data1 = client.getData().forPath("/test1");
        System.out.println("【修改前】存储的数据为:" + new String(data1));

        //修改数据为当前时间毫秒值
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String nowTime = sdf.format(new Date());
        client.setData().forPath("/test1", nowTime.getBytes());

        byte[] data2 = client.getData().forPath("/test1");
        System.out.println("【修改后】存储的数据为:" + new String(data2));
    }

    @Test
    public void SetDataTest2() throws Exception {

        //如果所要修改的数据,是公共资源,存在并发修改的情况
        //为了确保修改数据的安全性,可以采用版本号进行控制修改操作
        //也就是所谓的乐观锁机制
        //在修改数据时,如果发现数据版本号不是自己读取的版本号,则放弃修改

        Stat status = new Stat();
        client.getData().storingStatIn(status).forPath("/test2");
        //获取数据版本号
        int version1 = status.getVersion();
        System.out.println("【修改前】的数据版本号:" + version1);

        //要修改的节点版本号,与所提供的版本号,不一致时,会报异常
        client.setData().withVersion(version1).forPath("/test2", "任肥肥".getBytes());

        client.getData().storingStatIn(status).forPath("/test2");
        //获取数据版本号
        int version2 = status.getVersion();
        System.out.println("【修改后】的数据版本号:" + version2);
    }

    //----------------------------------------
    //----------------------------------------

    @Test
    public void deleteTest() throws Exception {

        //删除节点,如果节点下有子节点的话,无法删除
        //要删除的节点不存在时,会报异常
        client.delete().forPath("/test1");
    }

    @Test
    public void deleteAllTest() throws Exception {

        //删除节点,即使有子节点,连同子节点一起删除
        //要删除的节点不存在时,会报异常
        client.delete().deletingChildrenIfNeeded().forPath("/test4");
    }

    @Test
    public void testDelete3() throws Exception {

        //保证删除成功,如果网络有问题,则会定期重试
        //要删除的节点不存在时,会报异常
        client.delete().guaranteed().forPath("/test2");
    }

    @Test
    public void testDelete4() throws Exception {

        //节点删除成功后,执行回调方法
        //要删除的节点不存在时,会报异常

        client.delete().guaranteed().inBackground((client, event) -> {
            System.out.println("Lambda表达式写法:节点删除操作执行成功....");
            System.out.println(event);
        }).forPath("/test1");


        client.delete().guaranteed().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {

                //参数含义:
                //client 其实就是客户端连接对象,可以对 zookeeper 节点进行增删改查
                //event 事件状态对象

                System.out.println("匿名函数写法:节点删除操作执行成功....");
                System.out.println(event);
            }
        }).forPath("/test2");
    }
}

在运行以上单元测试方法时,可以使用上篇博客中所介绍的命令行操作 Zookeeper 进行操作结果的查看和验证。


到此为止,有关使用 Java 通过 Curator 组件的 API 对 Zookeeper 进行基本的增删改查操作,已经介绍完毕,非常简单。

本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/zookeeper_curator.zip


这篇关于Zookeeper 使用 Java 进行增删改查操作的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!