Zookeeper深入学习

2023/4/27

# 1、要学习zookeeper哪些内容

①zookeeper实现分布式锁

②分布式一致性算法,比如Paxos算法

③ZAB协议,zookeeper在Paxos协议上解决一致性问题的方法

④CAP和Base理论学习

⑤Zookeeper源码分析(zookeeper启动源码、服务端加载数据源码、选举算法、状态同步算法、Leader启动源码等等)

# 2、Leader选举过程

# 2.1 服务器启动过程中的Leader选举

假如有如下一个服务器集群:

image-20230220214658328

依次启动服务器A、B、C、D、E,(服务器myid分别为1,2,3,4,5)选举过程如下:

首先当A服务器启动的时候,发起一次选举,投自己一票,此时服务器的票数为1,整个服务器集群为5台服务器,票数没有达到半数,选举失败,服务器进入LOOKING状态。

当服务器B启动的时候,发起一次选举,服务器A,B都投自己一票,并互相交换选票,服务器A发现服务器B的myid比自己投票选举的大,于是将投票改为服务器2,这是服务器票数为2,没有达到半数,服务器A,B都进入LOOKING状态。

当服务器C启动的时候,发起一次选举,服务器A,B,C都投自己一票,并互相交换选票,服务器A,B发现自己投票的服务器myid都没有服务器C大,于是将自己的投票改为服务器C,这是整个服务器的票数为3,并且都是服务器C,票数超过一半,服务器当选成为Leader,服务器1,2更改服务器状态为FOLLOWING,服务器C的状态改为LEADING

当服务器D启动的时候,发起一次选举,这次服务器A,B,C的状态已经不是LOOKING,不会更改选票信息,交换选票的时候,服务器C有3票,服务器D只有一票,服务器D将自己的状态改为FOLLOWING.

当服务器E启动的时候,过程跟服务器D启动同样原理。

# 2.2 服务器非第一次启动过程选举

当集群中某一个节点出现下面两种情况的时候就会进入选举:

①:服务器初始化启动

②:服务器运行期间无法与Leader保持连接

当一个服务器发起选举的时候就会出现两种情况,第一种就是集群中已经存在Leader,这时候在发起选举的时候就会被告知Leader信息,当前服务器就要跟Leader建立连接,并同步Leader信息。第二种就是集群中Leader服务器真的宕机了或者出现网络波动下线了,这时候就会进行选举。

每一台服务器都有三个重要的标识,SID(服务器唯一ID,跟myid中的相同),ZXID事务ID,在某一时刻, 集群中的每台机器的ZXID值不一定完全一 致,这和ZooKeeper服务器对于客户端“更 新请求”的处理逻辑有关。Epoch:每个Leader任期的代号。没有 Leader时同一轮投票过程中的逻辑时钟值是 相同的。每投完一次票这个数据就会增加。

假如存在下面的情况,服务器A,B,C,D,E的SID分别1,2,3,4,5,ZXID分别为8,8,8,7,7并且服务器C为Leader,这是服务器C和E同事出现网络波动下线,那么整个Leader选举的过程如下:先比较Epoch,大的直接为Leader,相同则比较ZXID,相同则比较SID大小,大的为Leader.

# 3.ZooKeeper客户端操作方式

在Linux系统中安装了Zookeeper服务器,对Zookeeper命令有一些了解的情况下,学习如何在客户端操作Zookeeper。目前,Zookeeper服务器有三种Java客户端: Zookeeper、Zkclient和Curator

Zookeeper: Zookeeper是官方提供的原生java客户端(官方) Zkclient: 是在原生zookeeper客户端基础上进行扩展的开源第三方Java客户端 Curator: Netflix公司在原生zookeeper客户端基础上开源的第三方Java客户端

# 3.1 Zookeeper客户端

在接下来的学习中,主要是使用Zookeeper3.7.1版本来演示客户端操作,依赖引入:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.7.1</version>
</dependency>

# 3.1.1 创建客户端连接Create

public class ZookeeperUtils {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void init(String connection,int timeout){
        try{
            ZooKeeper zooKeeper = new ZooKeeper(connection,timeout,watcher -> {
                //获取监听事件状态
                Watcher.Event.KeeperState state = watcher.getState();
                //获取事件类型
                Watcher.Event.EventType type = watcher.getType();

                //如果连接已经建立
                if(Watcher.Event.KeeperState.SyncConnected == state){
                    if(Watcher.Event.EventType.None == type){
                        System.out.println("ZooKeeper连接已经建立");
                        countDownLatch.countDown();
                    }
                }
                
                if(Watcher.Event.EventType.NodeCreated == type){
                    System.out.println("Zookeeper有新的节点" + watcher.getPath() + "创建");
                }

                if(Watcher.Event.EventType.NodeDataChanged == type){
                    System.out.println("Zookeeper节点" + watcher.getPath() + "中数据已经发生变化");
                }

                if(Watcher.Event.EventType.NodeDeleted == type){
                    System.out.println("Zookeeper节点" + watcher.getPath() + "被删除");
                }

                if(Watcher.Event.EventType.NodeChildrenChanged == type){
                    System.out.println("Zookeeper节点" + watcher.getPath() + "的子节点已经发生变化");
                }
            });
            //必须等Zookeeper连接成功之后才能进行后续操作,否则一直等待
            countDownLatch.await();
            System.out.println("init connection success " + zooKeeper);

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

# 3.1.2 创建节点Node

	/**
     * 根据路径和值创建Zookeeper节点
     * @param path
     * @param value
     */
    public static void createNode(String path,String value) throws InterruptedException, KeeperException {
        if("".equals(path)){
            System.out.println("节点路径不能为空");
            return;
        }

        ZooKeeper zooKeeper =  ZookeeperUtils.init("*.*.*.*:2181",60000);

        //如果路径已经存在不能创建
        if(zooKeeper.exists(path,false) != null){
            System.out.println("节点:" + path + "已经存在。不能创建");
            return;
        }

        String result = zooKeeper.create(path,value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("Zookeeper已经创建路径为:" + path + ",值为:" + value + "的节点");
    }

    /**
     * 递归创建节点
     * @param path
     * @param value
     */
    public static void createNodeRecursion(String path,String value) throws InterruptedException, KeeperException {
        if("".equals(path)){
            System.out.println("创建节点的路径不能为空");
            return;
        }

        String[] paths = path.substring(1).split("/");
        for(int i = 0; i < paths.length; i++){
            String childPath = "";
            for(int j = 0; j <= i; j++){
                childPath += "/" + paths[j];
            }
            createNode(childPath,value);
        }
    }

# 3.1.3 查询节点

	/**
     * 根据路径查询节点
     * @param path
     */
    public static void queryNode(String path) throws InterruptedException, KeeperException {
        if("".equals(path)){
            System.out.println("查询节点的路径不能为空!");
            return;
        }
        ZooKeeper zooKeeper =  ZookeeperUtils.init("*.*.*.*:2181",60000);
        byte[] result = zooKeeper.getData(path,false,null);
        System.out.println(new String(result));

        Stat stat = new Stat();
        byte[] result1 = zooKeeper.getData(path,true,stat);
        System.out.println("节点:" + path + "的值为:" + new String(result1) + "Stat:" + stat);
    }

# 3.1.4 更新节点

public static void setData(String path,String value) throws InterruptedException, KeeperException {
        ZooKeeper zooKeeper =  ZookeeperUtils.init("*.*.*.*:2181",60000);
        zooKeeper.setData(path,value.getBytes(),-1);

    }

# 3.1.5删除节点

public static void deleteNode(String path) throws InterruptedException, KeeperException {
        ZooKeeper zooKeeper =  ZookeeperUtils.init("*.*.*.*:2181",60000);
        if(zooKeeper.exists(path,false) == null){
            System.out.println("要删除的节点不存在!");
        }
        zooKeeper.delete(path,-1);

    }

# 4.使用Zookeeper实现分布式锁

在单体项目中JVM中的锁就可以完成大部分需求,但是在分布式项目中,项目被拆分成各个模块,分别部署在不同的服务器中,比如一个完成一个订单可以需要多个操作,订单模块生成订单信息,支付模块完成支付,库存模块进行相关商品信息增减等等,往往一个业务需要多个模块共同完成操作,这个时候需要分布式锁对我们要操作的资源进行锁定,方便我们进行业务操作。分布式锁实现方式有很多种,有数据库实现,有缓存实现,也可以使用Zookeeper,在Zookeeper中实现分布式锁,最重要的特点是利用临时序号节点来实现。

初始代码实现:

/**
 * 原生Zookeeper实现分布式锁,原理利用zookeeper临时序号节点的特点
 */
public class DistributedLock1 {
    private static final String connectionLocalhost = "*.*.*.*:2181";
    private static final int timeout = 60000;
    private ZooKeeper zooKeeper;
    private CountDownLatch connectionLatch = new CountDownLatch(1);
    private CountDownLatch isLock = new CountDownLatch(1);
    private String preNode;
    private String currentNode;

    public DistributedLock1(){
        try{
            zooKeeper = new ZooKeeper(connectionLocalhost,timeout,(watcher) -> {
                //只有与zookeeper集群建立连接之后才能进行后续操作
                if(watcher.getState() == Watcher.Event.KeeperState.SyncConnected){
                    if(watcher.getType() == Watcher.Event.EventType.None){
                        System.out.println("成功与Zookeeper集群连接");
                        connectionLatch.countDown();
                    }
                }

                //当前节点的前一个临时序号节点删除的时候会触发当前节点获取分布式锁
                if(watcher.getType() == Watcher.Event.EventType.NodeDeleted && watcher.getPath().equals(preNode)){
                    isLock.countDown();
                }
            });
            //成功建立连接之后才能进行后续操作
            connectionLatch.await();
            Stat stat = zooKeeper.exists("/locks", false);
            if(stat == null){
                //根节点不存在的时候先建立根节点
                zooKeeper.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }

        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 上锁
     */

    public void lock(){
        try {
            //创建临时序号节点
            currentNode = zooKeeper.create("/locks/sub-","sub".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);

            List<String> children = zooKeeper.getChildren("/locks", false);
            if(children.size() == 1){
                //只有一个子节点,肯定是当前创建的临时序号节点,直接获取锁
                return;
            } else {
                //获取当前节点在集合中的次序
                Collections.sort(children);
                String thisPath = currentNode.substring("/locks/".length());
                int index = children.indexOf(thisPath);
                if(index == 0){
                    //当前节点就是最小的节点,直接获取锁
                    return;
                } else if(index == -1){
                    System.out.println("获取节点索引错误");
                }else{
                    this.preNode = "/locks/" + children.get(index-1);
                    zooKeeper.getData(preNode,true,new Stat());
                    isLock.await();
                    return;
                }
            }
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    /**
     * 释放锁
     */
    public void unlock(){
        try{
            zooKeeper.delete(currentNode,-1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        }
    }

}

创建两个线模拟:

image-20230225191829516

上面的实现方式虽然实现分布式锁相关内容,但是整个架构设计不太好,不利于扩展,假如我现在要切换到Redis的实现方式,那么整个项目代码又得重复开发,不能复用,所以,我们对上面代码使用模板设计模式进行重新优化:

image-20230225201509385

先看看顶层接口基类:

/**
 * 分布式基类接口
 */
public interface Lock {
    /**
     * 获取锁
     */
    void requireLock();

    /**
     * 释放锁
     */
    void releaseLock();
}

只定义了两个最基本的方法,获取锁和释放锁

再看看抽象类:

/**
 *模板抽象类
 */
public abstract class AbstractTemplateLock implements Lock{

    protected abstract boolean tryLock();
    protected abstract void waitLock();
    protected abstract void unLock();

    @Override
    public void releaseLock() {
        unLock();
        System.out.println(Thread.currentThread().getName() + "成功释放分布式锁");
    }

    @Override
    public void requireLock() {
        if(tryLock()){
            System.out.println(Thread.currentThread().getName() + "成功获取到分布式锁");
        }else{
            //等待并监听
            waitLock();
            //尝试重新获取锁
            requireLock();
        }
    }
}

在获取锁的时候,先去判断是否成功获取到锁,如果成功获取到锁就继续,没有获取到锁就等待并监听,并再次尝试获取锁。

再看看基于Zookeeper中临时顺序节点原理实现方式:

/**
 * 基于Zookeeper实现分布式锁
 */
public class ZookeeperTemplateLock extends AbstractTemplateLock{
    private static final String connectionLocalhost = "119.96.93.67:2181";
    private static final int connectionTimeout = 60000;
    private ZooKeeper zooKeeper;
    private String currentNode;
    private String preNode;
    private CountDownLatch connectionLatch = new CountDownLatch(1);
    private CountDownLatch isLock = new CountDownLatch(1);


    public ZookeeperTemplateLock(){
        try {
            zooKeeper = new ZooKeeper(connectionLocalhost,connectionTimeout,watcher -> {
                //连接成功建立的时候触发监听
                if(watcher.getState() == Watcher.Event.KeeperState.SyncConnected){
                    if(watcher.getType() == Watcher.Event.EventType.None){
                        System.out.println("与Zookeeper连接建立成功");
                        connectionLatch.countDown();
                    }
                }
                //当前节点的前一个节点被删除的时候触发监听
                if(watcher.getType() == Watcher.Event.EventType.NodeDeleted && watcher.getPath().equals(preNode)){
                    isLock.countDown();
                }
            });
            //只有当连接成功建立的时候才能继续后续操作,否则一直阻塞等待
            connectionLatch.await();
            Stat stat = zooKeeper.exists("/zkLocks", false);
            if(stat == null){
                zooKeeper.create("/zkLocks","zkLocks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    protected boolean tryLock() {
        try {
            if(currentNode == null){
                currentNode = zooKeeper.create("/zkLocks/sub-", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            }

            List<String> children = zooKeeper.getChildren("/zkLocks", false);
            if(children.size() == 1){
                //只有一个子节点,肯定是刚刚创建的子节点,获取锁
                return true;
            }else{
                Collections.sort(children);
                String thisPath = currentNode.substring("/zkLocks/".length());
                int index = children.indexOf(thisPath);

                if(index == 0){
                    //当前节点是第一个节点获取锁
                    return true;
                }else{
                    preNode = "/zkLocks/" + children.get(index-1);
                    return false;
                }
            }

        } catch (KeeperException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void waitLock() {
        try {
            zooKeeper.getData(preNode,true,new Stat());
            //阻塞并等待前一个节点删除
            isLock.await();
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

    }

    @Override
    protected void unLock() {
        try {
            zooKeeper.delete(currentNode,-1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        }
    }
}

测试:创建6个线程并发执行

image-20230225201949644

上面的设计架构有很好的扩展型,当我们想要切换到Redis或者其它实现方式的时候,只需要根据抽象类重写相应方式即可,整体架构不会变。

# 5.Kafka学习

# 1.什么是Kafka

Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

**Kafka最新定义 **: Kafka是 一个开源的分布式事件流平台 (Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

目 前企业中比较常见的消息队列产品 要有Kafka、ActiveMQ 、RabbitMQ 、 RocketMQ 等。 在大数据场景主要采用 Kafka作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。

传统的消息队列的主要应用场景包括:缓存/消峰解耦异步通信

# 2.KafKa安装

在官网下载Kafka,地址:https://kafka.apache.org/downloads

image-20230226225825352

将安装包上传到服务器中,我习惯于将软件安装到/usr/local/目录中,将kafka压缩包上传至目录中,进行解压:

tar -zxvf kafka_2.12-3.0.0.tgz
#重命名
mv kafka_2.12-3.0.0 kafka

进入kafka目录查看

image-20230226230145281

bin:主要是kafka的可执行文件,一些常用脚本,比如启动,关闭
config:主要存放配置文件
datas:我们自己新建的目录,主要用于存放kafka数据
libs:kafka依赖的一些文件
logs:日志

进入配置文件目录

image-20230226230519148

上面三个配置文件是我们关注的重点,我们先进入server.properties查看,进行第一次启动准备

image-20230226230640435

第一次主要进行几个配置的设置:
broker.id #broker 的全局唯一编号,不能重复,只能是数字
log.dirs=/usr/local/kafka/datas 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
配置多个磁盘路径,路径与路径之间可以用","分隔
zookeeper.connect#配置连接 Zookeeper 集群地址

启动:./bin/kafka-server-start.sh -daemon ./config/server.properties,使用jps查看

image-20230226231118762

注意:启动之前,一定要先启动Zookeeper集群