此文章是vip文章,如何查看?  

1,点击链接获取密钥 http://nicethemes.cn/product/view29882.html

2,在下方输入文章查看密钥即可立即查看当前vip文章


Zookeeper使用总结(进阶篇)

  • 时间:
  • 浏览:
  • 来源:互联网

创作不易,如果觉得这篇文章对你有帮助,欢迎各位老铁点个赞呗,您的支持是我创作的最大动力!

本系列主要总结下Zookeeper的基础使用,笔者准备写四篇文章:

博文内容 资源链接
Linux下搭建Zookeeper运行环境 https://blog.csdn.net/smilehappiness/article/details/105933433
Zookeeper入门,一篇就够啦 https://blog.csdn.net/smilehappiness/article/details/105933292
Zookeeper客户端ZkClient、Curator的使用,史上最详细的教程来啦~ https://blog.csdn.net/smilehappiness/article/details/105938058
Zookeeper使用总结(进阶篇) https://blog.csdn.net/smilehappiness/article/details/105938119

文章目录

  • 1 前言
  • 2 Zookeeper的Leader选举
  • 3 Zookeeper之ACL
    • 3.1 什么是ACL
    • 3.2 Zookeeper节点权限模式
    • 3.3 Zookeeper节点操作权限
    • 3.4 设置Zookeeper节点权限
      • 3.4.1 命令行操作方式
        • 3.4.1.1 命令行操作第一种方式(推荐)
        • 3.4.1.2 命令行操作第二种方式
      • 3.4.2 使用代码操作设置节点权限
        • 3.4.2.1 设置节点权限
        • 3.4.2.2 使用有权限的用户访问
    • 3.5 设置Zookeeper节点超级用户权限
      • 3.5.1 背景
      • 3.5.2 对超级用户的密码加密
      • 3.5.3 超级用户加入到服务端启动参数
      • 3.5.4 在命令行客户端使用超级用户登陆
  • 4 Zookeeper集群部署
  • 5 分布式系统生成全局唯一ID
    • 5.1 背景
    • 5.2 解决方案
      • 5.2.1 UUID方案
      • 5.2.2 数据库自增ID方案
      • 5.2.3 Redis方案(推荐)
      • 5.2.4 基于Twiitter的snowflake算法
      • 5.2.5 基于MongoDB的ObjectID
      • 5.2.6 基于Zookeeper的方案(推荐)
        • 5.2.6.1 通过持久化的顺序节点实现全局唯一Id
        • 5.2.6.2 基于Zookeeper节点版本号实现全局唯一Id
    • 5.3 小结
  • 6 基于Zookeeper实现分布式锁

1 前言

上一篇,主要介绍了几种zk客户端的使用,可以方便操作Zookeeper的zNode节点,本文是Zookeeper系列的最后一篇文章了,将主要介绍一些Zookeeper的高级使用,以及总结一些Zookeeper的使用场景,希望多老铁们有所帮助。

2 Zookeeper的Leader选举

更新中,请耐心等待…

3 Zookeeper之ACL

3.1 什么是ACL

访问控制列表(ACL)是一种基于包过滤的访问控制技术,它可以根据设定的条件对接口上的数据包进行过滤,允许其通过或丢弃。访问控制列表被广泛地应用于路由器和三层交换机,借助于访问控制列表,可以有效地控制用户对网络的访问,从而最大程度地保障网络安全。 百度百科

ACL (Access Control List),Zookeeper作为一个分布式协调框架,其内部存储的都是一些关于分布式系统运行时状态的元数据,默认情况下,所有应用都可以读写任何节点,在现如今复杂的网络应用中,这很明显不太安全,任何应用都会有一些用户权限,来保证系统的安全访问,所以,Zookeeper也通过ACL访问控制机制,来解决访问权限问题。

3.2 Zookeeper节点权限模式

Zookeeper节点权限模式,即是Scheme

开发中,最多使用的有以下四种节点权限模式:

  • ip: ip模式通过ip地址粒度进行权限控制模式,例如配置了:192.168.1.101,即表示权限控制都是针对这个ip地址的(实际开发中用的比较少)

  • digest:digest是最常用的权限控制模式,也是开发中用的最多的一种Scheme方式。
    该方式采用username:password形式的权限标识进行权限配置,ZK会对形成的权限标识先后进行两次加密处理,分别是SHA-1加密算法和Base64编码

  • world: world是一种最开放的权限控制模式,也是默认的权限模式,这种模式可以看做特殊的digest,它仅仅是一个标识而已,有个唯一的id。 anyone表示所有人。
    在这里插入图片描述

  • auth:不使用任何id,代表任何已认证的用户

3.3 Zookeeper节点操作权限

Zookeeper节点操作权限:
权限,就是指那些通过权限校验后,才可以被允许执行的操作,在ZK中,对数据的操作权限分为以下五大类:createdeletereadwriteadmin
也就是 增、删、改、查、管理权限,这5种权限简写就是cdrwa(每个单词的首字符缩写)

各权限含义说明:

  • create: 创建子节点的权限
  • delete: 删除节点的权限
  • read: 读取节点数据的权限
  • write: 修改节点数据的权限
  • admin: 设置子节点权限的权限

3.4 设置Zookeeper节点权限

3.4.1 命令行操作方式

对命令行操作不熟悉的童鞋们,可以参考我的博文https://blog.csdn.net/smilehappiness/article/details/105933292 5.2节 命令行客户端 脑补一下。

首先需要使用命令行连接Zookeeper:
./zkCli.sh -server ip:2181

如果对节点设置了权限,访问时会没有权限,如:

Authentication is not valid : /root

所以需要以下方式,才能进行节点的操作。

3.4.1.1 命令行操作第一种方式(推荐)

这种方式,是使用明文的方式创建节点用户权限。

在命令行,使用以下命令:

  • 创建hello节点:
    create /hello

  • 增加一个认证用户
    语法:addauth scheme auth
    如:addauth digest zhaoliu:123456789

  • 设置权限
    语法:setAcl [-s] [-v version] [-R] path acl(setAcl /path auth:用户名:密码明文:权限)
    如: setAcl /hello auth:zhaoliu:123456789:cdrwa

  • 查看Acl设置
    语法: getAcl [-s] path
    如: getAcl /root

3.4.1.2 命令行操作第二种方式

这种方式,是使用密文的方式创建节点用户权限。

生成密文:

DigestAuthenticationProvider.generateDigest("wangwu:123456")//wangwu:HVn0+DK8rXzHTPwF2BhajA/Cn1M=

在命令行,使用以下命令:

  • 创建test节点:
    create /test

  • 设置权限
    setAcl /test digest:用户名:密码密文:权限
    如:setAcl /test digest:wangwu:HVn0+DK8rXzHTPwF2BhajA/Cn1M=:cdrwa
    这里的加密规则是先SHA1加密,然后base64编码

  • 使用wangwu用户,明文密码的方式登录
    addauth digest wangwu:123456

  • 查看Acl设置
    语法: getAcl [-s] path
    如: getAcl /test

3.4.2 使用代码操作设置节点权限

使用代码的方式,可以在创建节点的时候设置节点权限,也可以修改节点权限的归属,下面,举一些示例,帮助老铁们理解。

3.4.2.1 设置节点权限

开始连接Zookeeper客户端对象时,不设置用户权限,然后通过以下代码设置节点用户访问权限后,再次连接时,就不能访问了(NoAuthException: KeeperErrorCode = NoAuth for /root),此时需要设置权限,才可以连接客户端。

  • 在创建节点的时候设置节点权限:
    【代码示例】

    Id zhangsan = new Id("digest", DigestAuthenticationProvider.generateDigest("zhangsan:123456"));
    Id lisi = new Id("digest", DigestAuthenticationProvider.generateDigest("lisi:654321"));
    
    List<ACL> aclList = new ArrayList<>();
    //赋值权限(可选值:READ、WRITE、CREATE、DELETE、ADMIN、ALL,即:cdrwa)
    aclList.add(new ACL(ZooDefs.Perms.ADMIN, zhangsan));
    aclList.add(new ACL(ZooDefs.Perms.READ, lisi));
    
    //创建节点时,就指定节点所属用户和权限
    String node = client.create()
             .creatingParentsIfNeeded()
             .withACL(aclList)
             .forPath(nodePath, data.getBytes());
    System.out.println(node);
    
  • 修改节点权限的归属:
    【代码示例】

    Id wangwu= new Id("digest", DigestAuthenticationProvider.generateDigest("wangwu:123456"));
    Id zhaoliu= new Id("digest", DigestAuthenticationProvider.generateDigest("zhaoliu:654321"));
    
    List<ACL> aclList = new ArrayList<>();
    //赋值权限(可选值:READ、WRITE、CREATE、DELETE、ADMIN、ALL,即:cdrwa)
    aclList.add(new ACL(ZooDefs.Perms.ALL, wangwu));
    aclList.add(new ACL(ZooDefs.Perms.WRITE, zhaoliu));
    
    //把ROOT_NODE节点设置两个用户的权限,修改该节点所属用户以及权限
    client.setACL().withACL(aclList).forPath(ROOT_NODE);            
    

3.4.2.2 使用有权限的用户访问

上面步骤设置完权限之后,不设置用户,就不能访问到刚才那个节点了

连接zk客户端对象时,需要设置节点用户权限:

	/**
     * <p>
     * 创建Curator连接对象
     * <p/>
     *
     * @param
     * @return
     * @Date 2020/6/21 12:29
     */
    public static void connectCuratorClient() {
        //创建zookeeper连接
        client = CuratorFrameworkFactory.builder()
                .connectString(ZK_ADDRESS)
                .sessionTimeoutMs(10000)
                .connectionTimeoutMs(15000)
                .retryPolicy(retry)
                //用户认证
				.authorization("digest", "zhangsan:123456".getBytes())
                .build();

        //启动客户端(Start the client. Most mutator methods will not work until the client is started)
        client.start();

        System.out.println("zookeeper初始化连接成功:" + client);
    }
  • 完整代码示例如下:
package cn.smilehappiness.acl;

import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;

import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;

/**
 * <p>
 * Curator客户端ACl权限访问控制
 * <p/>
 *
 * @author smilehappiness
 * @Date 2020/6/21 20:58
 */
public class ZookeeperCuratorAcl {

    /**
     * 客户端连接地址
     */
    private static final String ZK_ADDRESS = "ip:2181";
    /**
     * 客户端根节点
     */
    private static final String ROOT_NODE = "/root";
    /**
     * 客户端子节点
     */
    private static final String ROOT_NODE_CHILDREN = "/hello/children";
    /**
     * 创建zookeeper连接实例
     */
    private static CuratorFramework client = null;
    /**
     * 重试策略
     * n:最多重试次数
     * sleepMsBetweenRetries:重试时间间隔,单位毫秒
     */
    private static final RetryPolicy retry = new RetryNTimes(3, 2000);

    static {
        // 创建Curator连接对象
        connectCuratorClient();
    }

    /**
     * <p>
     * 创建Curator连接对象
     * 针对设置过权限的节点,需要使用用户登录,否则会没有权限(org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /hello/children)
     * <p/>
     *
     * @param
     * @return
     * @Date 2020/6/21 12:29
     */
    public static void connectCuratorClient() {
        //创建zookeeper连接
        client = CuratorFrameworkFactory.builder()
                .connectString(ZK_ADDRESS)
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(100000)
                .retryPolicy(retry)
                //用户认证
                .authorization("digest", "zhangsan:123456789".getBytes())
                .build();

        //启动客户端(Start the client. Most mutator methods will not work until the client is started)
        client.start();

        System.out.println("zookeeper初始化连接成功:" + client);
    }

    public static void main(String[] args) throws Exception {
        //打印密文
        //printPwd();

        //创建节点
        createNode(ROOT_NODE_CHILDREN, "root data");

        //使用权限最大的zhangsan用户,把/hello/children节点,给zhaoliu用户设置读的权限,修改该节点所属用户以及权限,这样zhaoliu就可以读取节点了
        /*Id zhaoliu = new Id("digest", DigestAuthenticationProvider.generateDigest("zhaoliu:123456789"));
        List<ACL> aclList = new ArrayList<>();
        //赋值权限(可选值:READ、WRITE、CREATE、DELETE、ADMIN、ALL,即:cdrwa)
        aclList.add(new ACL(ZooDefs.Perms.READ, zhaoliu));
        aclList.add(new ACL(ZooDefs.Perms.WRITE, zhaoliu));
        client.setACL().withACL(aclList).forPath(ROOT_NODE_CHILDREN);*/

        //获取节点数据(zhaoliu只有写的权限,只有zhangsan用户,有所有的权限,可以读写改删)
        getNode(ROOT_NODE_CHILDREN);

        //设置(修改)节点数据(lisi用户没有修改权限:NoAuthException: KeeperErrorCode = NoAuth for /hello/children)
        updateNode(ROOT_NODE_CHILDREN, "update curator data");

        //删除指定节点(这个在原生zk里面,是不能直接删除有子节点的数据的)
        deleteNode(ROOT_NODE_CHILDREN);
    }

    /**
     * <p>
     * 打印密文
     * <p/>
     *
     * @param
     * @return void
     * @Date 2020/6/21 22:13
     */
    private static void printPwd() throws NoSuchAlgorithmException {
        System.out.println(DigestAuthenticationProvider.generateDigest("wangwu:123456"));//wangwu:HVn0+DK8rXzHTPwF2BhajA/Cn1M=
    }

    /**
     * <p>
     * 创建节点,并支持赋值数据内容
     * <p/>
     *
     * @param nodePath
     * @param data
     * @return void
     * @Date 2020/6/21 12:39
     */
    private static void createNode(String nodePath, String data) throws Exception {
        if (StringUtils.isBlank(nodePath)) {
            System.out.println("节点【" + nodePath + "】不能为空");
            return;
        }

        //对节点是否存在进行判断,否则会报错:【NodeExistsException: KeeperErrorCode = NodeExists for /root】
        Stat exists = client.checkExists().forPath(nodePath);
        if (null != exists) {
            System.out.println("节点【" + nodePath + "】已存在,不能新增");
            return;
        } else {
            System.out.println(StringUtils.join("节点【", nodePath, "】不存在,可以新增节点!"));
        }

        //创建节点,并为当前节点赋值内容
        if (StringUtils.isNotBlank(data)) {
            List<ACL> aclList = new ArrayList<>();
            Id zhangsan = new Id("digest", DigestAuthenticationProvider.generateDigest("zhangsan:123456789"));
            Id lisi = new Id("digest", DigestAuthenticationProvider.generateDigest("lisi:123456789"));
            Id zhaoliu = new Id("digest", DigestAuthenticationProvider.generateDigest("zhaoliu:123456789"));

            //赋值权限(可选值:READ、WRITE、CREATE、DELETE、ADMIN、ALL,即:cdrwa)
            aclList.add(new ACL(ZooDefs.Perms.ALL, zhangsan));
            aclList.add(new ACL(ZooDefs.Perms.READ, lisi));
            aclList.add(new ACL(ZooDefs.Perms.WRITE, zhaoliu));

            //创建节点时,就指定节点所属用户和权限
            String node = client.create()
                    .creatingParentsIfNeeded()
                    .withACL(aclList)
                    .forPath(nodePath, data.getBytes());
            System.out.println(node);
        }

    }

    /**
     * <p>
     * 获取节点数据
     * <p/>
     *
     * @param nodePath
     * @return void
     * @Date 2020/6/21 13:13
     */
    private static void getNode(String nodePath) throws Exception {
        //获取某个节点数据
        byte[] bytes = client.getData().forPath(nodePath);
        System.out.println(StringUtils.join("节点:【", nodePath, "】,数据:", new String(bytes)));
    }

    /**
     * <p>
     * 设置(修改)节点数据
     * <p/>
     *
     * @param nodePath
     * @param data
     * @return void
     * @Date 2020/6/21 13:46
     */
    private static void updateNode(String nodePath, String data) throws Exception {
        //指定版本号,更新节点,更新的时候如果指定数据版本的话,那么需要和zookeeper中当前数据的版本要一致,-1表示匹配任何版本
        Stat stat = client.setData().withVersion(-1).forPath(nodePath, data.getBytes());
        System.out.println(stat);
    }

    /**
     * <p>
     * 删除指定节点
     * <p>
     * <p/>
     *
     * @param nodePath
     * @return void
     * @Date 2020/6/21 13:50
     */
    private static void deleteNode(String nodePath) throws Exception {
        //级联删除节点(如果当前节点有子节点,子节点也可以一同删除)
        client.delete().deletingChildrenIfNeeded().forPath(nodePath);
    }

}

3.5 设置Zookeeper节点超级用户权限

因为测试的时候,有个/root节点,设置了权限,然后忘记了是哪个用户了,死活就删除不掉了,只能设置一个超级用户权限,来进行节点的操作。

3.5.1 背景

Zookeeper管理员会因为某些客户端对某些节点设置了权限,而导致在紧急的情况下无法修改这些节点感到困扰。在这种情况下,管理员可以通过Zookeeper超级用户模式访问这些节点,一旦设置了超级权限访问节点,使用超级用户登录后,后续的操作就不需要Check ACL了。

使用超级用户模式,可以通过Zookeeper的zookeeper.DigestAuthenticationProvider.superDigest参数开启。

3.5.2 对超级用户的密码加密

使用org.apache.zookeeper.server.auth.DigestAuthenticationProvider生成superDigest:

public void generate() {  
    try {  
       System.out.println(DigestAuthenticationProvider.generateDigest("super:superpw"));  
    } catch (NoSuchAlgorithmException e) {  
        // TODO Auto-generated catch block  
        e.printStackTrace();  
    }  
}  

输出super:superpw对应的加密参数为: super:g9oN2HttPfn8MMWJZ2r45Np/LIA=

3.5.3 超级用户加入到服务端启动参数

在zookeeper服务端的zkEnv.sh环境变量中加入以下参数,开启超级用户,重启zookeeper服务端:

SERVER_JVMFLAGS="-Dzookeeper.DigestAuthenticationProvider.superDigest=super:g9oN2HttPfn8MMWJZ2r45Np/LIA= $SERVER_JVMFLAGS"  

3.5.4 在命令行客户端使用超级用户登陆

使用客户端登陆到zookeeper服务端:
zkCli.sh -server ip:2181

然后执行命令切换到超级用户:
addauth digest super:superpw

获取Acl设置:
getAcl /root

'digest,'zhangsan:jA/7JI9gsuLp0ZQn5J5dcnDQkHA=
: a
'digest,'lisi:c86tNrln9GqhiYsCu/4W34U1vt4=
: r

这样就可以删除带有权限控制的节点了:
delete /root 或者 deleteall /root
注意: 这两种方式,都不能删除拥有子节点的那个节点,如果有子节点,必须先删除子节点,再删除父节点。

原文链接:https://www.jianshu.com/p/373d52375a65

4 Zookeeper集群部署

5 分布式系统生成全局唯一ID

5.1 背景

在大型分布式系统中,我们经常需要生成全局唯一标识,来保证系统单号的唯一性,比如:银行中商户订单号,银行支付接口时唯一编号、红包、优惠券等等。

系统中生成分布式ID的要求:

  • 全局唯一,不能重复(基本要求)
  • 递增,下一个ID大于上一个ID(有些需求)
  • 信息安全,非连续ID,避免恶意用户/竞争对手发现ID规则,从而猜出下一个ID或者根据ID总量猜出业务总量
  • 高可用,不能故障,可用性4个9或者5个9(99.99%、99.999%)
  • 高QPS,性能不能太差,否则容易造成线程堵塞
  • 平均延迟尽可能低

5.2 解决方案

解决方案有很多,本文主要介绍下基于Zookeeper的解决方案,其他的几种,后续有机会再详细介绍。

5.2.1 UUID方案

可以使用UUID的方式,生成全局唯一Id,可以线程安全的。

使用UUID的方法:

UUID.randomUUID()

这种方式虽然可以生成安全的全局唯一Id,但是,也只能满足一般的场景:

  • UUID太长,很多场景不适用
  • 有些场景希望id是数字的,UUID就不适用(有时候需要全局唯一编号是有序的)
  • 可读性不好

5.2.2 数据库自增ID方案

  • 数据库自增ID
  • 在创建表结构的时候,可以使用auto_increment标识生成自增的主键id(MySQL数据库)
  • 可以基于序列,生成自增的主键id(Oracle数据库)

这种方式也是可以的,但是有一些弊端:

  • 这种基于数据库的ID生成方案,比较依赖数据库单机的读写性能;(高并发条件下性能不是很好,数据库服务响应慢)
  • 对数据库依赖较大,数据库易发生性能瓶颈问题

5.2.3 Redis方案(推荐)

这种方案还是不错的,目前互联网公司使用的还是比较多的,后续会详细介绍这种方案,本文主要还是以Zookeeper为主线,实现全局唯一Id。

简单说下思路:
通过Redis原子操作命令INCRINCRBY(redis自增)实现递增,同时可使用Redis集群提高吞吐量,集群后每台Redis的初始值为1,2,3,4,5,步长为5

A:16111621
B:27121722
C:38131823
D:49141924
E:510152025

5.2.4 基于Twiitter的snowflake算法

https://github.com/twitter/snowflake
这里不详细介绍,有兴趣的童鞋们,可以去看一些,网上有教程,可以百度一下。

5.2.5 基于MongoDB的ObjectID

基于MongoDB,也可以实现,后续有机会,会更新完善下。

5.2.6 基于Zookeeper的方案(推荐)

ZooKeeper作为一个分布式的,开放源码的分布式应用程序协调服务,实现这种全局唯一Id,自然是比较容易的。

基于Zookeeper,大概有两种方案:

  • 通过持久化的顺序节点实现
  • 通过节点版本号

下面,基于Zookeeper,详细介绍下如何实现全局唯一Id的生成。

5.2.6.1 通过持久化的顺序节点实现全局唯一Id

思路: 每次创建节点时,创建一个带有序列号的,持久化的节点,这样,就可以实现全局惟一的编号。

代码示例如下:

package cn.smilehappiness.generatorID;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.*;

/**
 * <p>
 * 分布式下,基于Zookeeper生成全局唯一Id(基于Zookeeper序列号,通过持久化的顺序节点实现)
 * <p/>
 *
 * @author smilehappiness
 * @Date 2020/6/21 17:14
 */
public class GeneratorIdSequential {

    /**
     * 客户端连接地址
     */
    private static final String ZK_ADDRESS = "ip:2181";
    /**
     * 创建zookeeper连接实例
     */
    private static CuratorFramework client = null;
    /**
     * 客户端根节点
     */
    private static final String ROOT_NODE = "/root";
    /**
     * 客户端子节点
     */
    private static final String ROOT_NODE_CHILDREN = "/root/uniqueId/id";
    /**
     * 重试策略
     * n:最多重试次数
     * sleepMsBetweenRetries:重试时间间隔,单位毫秒
     */
    private static final RetryPolicy retry = new RetryNTimes(3, 2000);

    static {
        // 创建Curator连接对象
        connectCuratorClient();
    }

    /**
     * <p>
     * 创建Curator连接对象
     * <p/>
     *
     * @param
     * @return
     * @Date 2020/6/21 12:29
     */
    public static void connectCuratorClient() {
        //创建zookeeper连接
        client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, retry);

        //启动客户端(Start the client. Most mutator methods will not work until the client is started)
        client.start();

        System.out.println("zookeeper初始化连接成功:" + client);
    }

    private static String idGenerator() throws Exception {
        Stat stat = client.checkExists().forPath(ROOT_NODE_CHILDREN);
        //如果节点不存在
        if (null == stat) {
            String node = client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT_SEQUENTIAL)
                    .forPath(ROOT_NODE_CHILDREN);

            //System.out.println("node = " + node);

            //避免占用zookeeper空间,可以再返回有序节点,并处理完之后,把当前节点删除一下
            //client.delete().guaranteed().forPath(node);

            return node.replace(ROOT_NODE_CHILDREN, "");
        }

        return null;
    }

    /**
     * <p>
     * 全局唯一id生成方法测试-单线程
     * <p/>
     *
     * @param
     * @return void
     * @Date 2020/6/21 18:12
     */
    private static void testGeneratorIdSingle() throws Exception {
        for (int i = 0; i < 10; i++) {
            //通过生成器,生成唯一id
            String uniqueId = idGenerator();
            System.out.println("uniqueId = " + uniqueId);
        }
    }

    /**
     * <p>
     * 全局唯一id生成方法测试-多线程
     * <p/>
     *
     * @param
     * @return void
     * @Date 2020/6/21 18:30
     */
    private static void testGeneratorIdMultiThread() {
        //循环次数
        int count = 10;
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("uniqueId-generator-%d").build();
        ExecutorService executorService = new ThreadPoolExecutor(4, 2 * 4 + 1, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000), threadFactory);
        //设置倒计数器
        CountDownLatch downLatch = new CountDownLatch(count);
        for (int i = 0; i < count; i++) {
            executorService.execute(() -> {
                try {
                    //通过生成器,生成唯一id
                    String uniqueId = idGenerator();
                    System.out.println("当前线程名称:【" + Thread.currentThread().getName() + "】,生成的全局唯一uniqueId:【" + uniqueId + "】");

                    //每生成一次,就减一
                    downLatch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }

        try {
            downLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("全局唯一id生成成功啦...");
        executorService.shutdown();
    }

    public static void main(String[] args) throws Exception {
        //testGeneratorIdSingle();

        long startTime = System.currentTimeMillis();
        testGeneratorIdMultiThread();
        long endTime = System.currentTimeMillis();
        System.out.println("耗时:【" + (endTime - startTime) / 1000 + "】秒");
    }

}

5.2.6.2 基于Zookeeper节点版本号实现全局唯一Id

思路: 每次为节点赋值的时候,可以拿到Stat对象,通过该对象可以获取节点版本号,这样也可以实现全局唯一Id,因为没操作一次节点,这个版本号也是自增的。(基于当前时间+版本号实现

【代码示例】

package cn.smilehappiness.generatorID;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;

/**
 * <p>
 * 分布式下,基于Zookeeper生成全局唯一Id(基于Zookeeper版本号实现)
 * <p/>
 *
 * @author smilehappiness
 * @Date 2020/6/21 19:01
 */
public class GeneratorIdVersion {

    /**
     * 客户端连接地址
     */
    private static final String ZK_ADDRESS = "ip:2181";
    /**
     * 创建zookeeper连接实例
     */
    private static CuratorFramework client = null;
    /**
     * 客户端子节点
     */
    private static final String ROOT_NODE_CHILDREN = "/root/uniqueId-version";
    /**
     * 重试策略
     * n:最多重试次数
     * sleepMsBetweenRetries:重试时间间隔,单位毫秒
     */
    private static final RetryPolicy retry = new RetryNTimes(3, 2000);
    /**
     * 线程池对象
     */
    private static ThreadPoolExecutor threadPoolExecutor;

    static {
        //初始化线程池对象
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("uniqueId-generator-%d").build();
        threadPoolExecutor = new ThreadPoolExecutor(8, 2 * 8 + 1, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000), threadFactory);

        //当前队列任务执行情况
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
        executor.scheduleAtFixedRate(() -> {
            System.out.println("任务总线程数:【" + threadPoolExecutor.getTaskCount() + "】");
            System.out.println("执行完成线程数:【" + threadPoolExecutor.getCompletedTaskCount() + "】");
            System.out.println("当前活动线程数:【" + threadPoolExecutor.getActiveCount() + "】");
            System.out.println("当前剩余线程数:【" + threadPoolExecutor.getQueue().size() + "】");
        }, 1, 2, TimeUnit.SECONDS);

        // 创建Curator连接对象
        connectCuratorClient();
    }

    public static void main(String[] args) throws Exception {
        //testGeneratorIdSingle();

        long startTime = System.currentTimeMillis();
        testGeneratorIdMultiThread();
        long endTime = System.currentTimeMillis();
        System.out.println("耗时:【" + (endTime - startTime) / 1000 + "】秒");
    }

    /**
     * <p>
     * 创建Curator连接对象
     * <p/>
     *
     * @param
     * @return
     * @Date 2020/6/21 12:29
     */
    public static void connectCuratorClient() {
        //创建zookeeper连接
        client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, retry);

        //启动客户端(Start the client. Most mutator methods will not work until the client is started)
        client.start();

        System.out.println("zookeeper初始化连接成功:" + client);
    }

    private static String idGenerator() throws Exception {
        //如果节点不存在
        if (null == client.checkExists().forPath(ROOT_NODE_CHILDREN)) {
            String node = client.create()
                    .creatingParentsIfNeeded()
                    //创建持久化节点,可以省略
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(ROOT_NODE_CHILDREN);

            System.out.println("node = " + node);
        }

        //基于日期+zk版本号,生成全局唯一Id
        String DATE_DAY_PATTERN = "yyyyMMdd";
        String basePrefix = format(new Date(), DATE_DAY_PATTERN);

        Stat stat = client.setData().withVersion(-1).forPath(ROOT_NODE_CHILDREN);
        //返回赋值操作节点后,当前节点的版本号
        return basePrefix + stat.getVersion();
    }

    /**
     * <p>
     * 全局唯一id生成方法测试-单线程
     * <p/>
     *
     * @param
     * @return void
     * @Date 2020/6/21 18:12
     */
    private static void testGeneratorIdSingle() throws Exception {
        for (int i = 0; i < 10; i++) {
            //通过生成器,生成唯一id
            String uniqueId = idGenerator();
            System.out.println("生成的序列号uniqueId = " + uniqueId);
        }
    }

    /**
     * <p>
     * 全局唯一id生成方法测试-多线程
     * <p/>
     *
     * @param
     * @return void
     * @Date 2020/6/21 18:30
     */
    private static void testGeneratorIdMultiThread() {
        //等待时间
        final long awaitTime = 10 * 1000;
        //循环次数
        int count = 20;
        //设置倒计数器
        CountDownLatch countDownLatch = new CountDownLatch(1);

        for (int i = 0; i < count; i++) {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            //提交线程到线程池去执行(这里可以替换为lambda表达式)
            threadPoolExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        //等待,保证所有的线程就位,但是不运行,
                        countDownLatch.await();
                        System.out.println("Thread Name:【" + Thread.currentThread().getName() + "】, current time: " + System.currentTimeMillis());

                        //执行业务代码
                        try {
                            //Thread.sleep(1000);

                            //System.currentTimeMillis()或者i++,++i等多种写法,在多线程环境下生成惟一的Id都是有问题的,所以这里使用zk的版本号作为唯一编号
                            String uniqueId = idGenerator();
                            System.out.println("生成的序列号uniqueId = " + uniqueId);

                        } catch (Throwable e) {
                            System.out.println("Thread:" + Thread.currentThread().getName() + e.getMessage());
                        }

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

        // countDown()表示倒计算器-1,这里8个线程同时开始执行,那么就可以达到并发效果
        countDownLatch.countDown();

        // 关闭线程池的代码
        try {
            // 传达完毕信号
            threadPoolExecutor.shutdown();
            // 所有的任务都结束的时候,返回TRUE,如果未执行完毕,处于阻塞状态(注意:这里的awaitTime要根据实际情况设置合适的等待时间)
            if (!threadPoolExecutor.awaitTermination(awaitTime, TimeUnit.MILLISECONDS)) {
                // 超时的时候向线程池中所有的线程发出中断(interrupted)
                threadPoolExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            // awaitTermination方法被中断的时候,也中止线程池中全部线程的执行,
            System.out.println("awaitTermination interrupted: " + e);
            threadPoolExecutor.shutdownNow();
        }
    }

    /**
     * <p>
     * 将日期格式化为String
     * <p/>
     *
     * @param date
     * @param pattern
     * @return java.lang.String
     * @Date 2020/6/21 19:08
     */
    public static String format(Date date, String pattern) {
        if (date == null) {
            return null;
        } else {
            if (StringUtils.isBlank(pattern)) {
                pattern = "yyyy-MM-dd HH:mm:ss";
            }

            SimpleDateFormat dateFormat = new SimpleDateFormat(pattern);
            return dateFormat.format(date);
        }
    }

    /**
     * <p>
     * 将日期格式化为Date
     * <p/>
     *
     * @param strDate
     * @param pattern
     * @return java.lang.String
     * @Date 2020/6/21 19:08
     */
    public static Date parse(String strDate, String pattern) {
        if (StringUtils.isBlank(strDate)) {
            return null;
        } else {
            try {
                return (new SimpleDateFormat(pattern)).parse(strDate);
            } catch (ParseException var3) {
                return null;
            }
        }
    }

}

单线程执行结果:

zookeeper初始化连接成功:org.apache.curator.framework.imps.CuratorFrameworkImpl@754ba872
生成的序列号uniqueId = 2020062111
生成的序列号uniqueId = 2020062112
生成的序列号uniqueId = 2020062113
生成的序列号uniqueId = 2020062114
生成的序列号uniqueId = 2020062115
生成的序列号uniqueId = 2020062116
生成的序列号uniqueId = 2020062117
生成的序列号uniqueId = 2020062118
生成的序列号uniqueId = 2020062119
生成的序列号uniqueId = 2020062120

5.3 小结

以上主要介绍了几种分布式环境下,全局唯一Id的生成方式。详细介绍了基于Zookeeper的方案,其中也使用了两种多线程的方式,来模拟Id的唯一性,以上测试实例说明,在多线程环境下,基于Zookeeper的两种方案,都是可靠的。

6 基于Zookeeper实现分布式锁

更新中,请耐心等待…

写博客是为了记住自己容易忘记的东西,另外也是对自己工作的总结,希望尽自己的努力,做到更好,大家一起努力进步!

如果有什么问题,欢迎大家评论,一起探讨,代码如有问题,欢迎各位大神指正!

给自己的梦想添加一双翅膀,让它可以在天空中自由自在的飞翔!

本文链接http://element-ui.cn/news/show-387307.aspx