本文正在参与「金石方案 . 瓜分6万现金大奖」

锁是为了在多线程的场景中保证数据安全而添加的一种手法,Java中常用的有CountdownLatch,ReentrantLock等单运用中的锁,在现在处处都是分布式的场景需求下就不能满意了,所以就出现了分布式锁。

不同的物理节点有各自的线程,可是他们会拜访同一个资源,可是不允许同一时刻拜访,所以就有了分布式锁

例如

咱们能够经过数据库编写sql来完成分布式锁,可是这种在高并发下功能会出问题,

还有常用的redis完成分布式锁,这个是咱们用的最多的一种高功能高并发的完成方式。

今日介绍的一种是经过中间件zookeeper完成分布式锁,也是支撑高功能高并发的。

想想完成一个锁想到哪些要害点 ?

争抢锁:只要一个人能够获取锁
取得锁的节点挂了,暂时节点 会主动开释
取得锁的人,能够主动开释锁
锁被开释,删除 其他人怎样知道
主动轮训,监听心跳:存在延迟,节点多的状况话压力很大。

依据zk的节点看看是否满意锁

创立耐久化节点

zk是创立节点保存数据的,相同节点只允许创立一次,所以咱们能够经过成功创立节点完成获取锁的状况。

要害代码

zk.create("/lock", threadId.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

结果:

ZooKeeper系列:实现分布式锁

如图只要一个线程获取锁,其他线程都出现异常:NodeExists for /lock ,能够保证同一时刻只要一个线程获取锁。然后取得锁的线程逻辑履行结束后应该删除锁。

存在的问题:假如线程溃散了,锁就无法开释了,最终导致死锁

耐久化节点不行,耐久化次序节点自然也不行了

创立暂时节点

暂时节点:在客户端断开衔接的时分就会主动删除

要害代码:

zk.create("/lock", threadId.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

CreateMode.EPHEMERAL意思便是创立暂时节点 ,也便是当线程溃散,无法主动开释锁的时分,会主动删除,防止死锁。

可是

还存在的问题:没有获取锁的线程会出现错误,则需要不断重试,经过死循环直到获取锁。

暂时节点+watch

watch:设置监听回调,当监听的节点或其子节点有变更,则会告诉客户端,可参考上一篇

要害代码

zk.create("/lock", threadId.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zk.getChildren("/testLock",true, this,"ajisun");

getChildren 获取父节点/testlock下的子节点并设置监听。当子节点/lock被删除 就会触发回调,再次创立节点。

经过watch 防止运用死循环设置堵塞,看似还不错哦。

可是

还还存在问题:一切客户端都去监听同一个父节点,当锁开释的时分,也会告诉一切的客户端,带来的压力仍是很大。

创立暂时次序节点+watch

暂时次序节点:每个线程都会创立一个暂时且有序的节点,相互不抵触

代码如下,十个线程模仿十个客户端

public static void main(String[] args) throws KeeperException, InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    ZooKeeper zk = null;
    try {
      zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/testLock", 3000, new DefaultWatch().setCountDownLatch(countDownLatch));
      countDownLatch.await();
     } catch (Exception e) {
      e.printStackTrace();
     }
    CountDownLatch countDown = new CountDownLatch(10);
    for (int i = 0; i < 10; i++) {
      ZooKeeper finalZk = zk;
      new Thread() {
        @Override
        public void run() {
          String threadId = Thread.currentThread().getId() + "";
          try {
            finalZk.create("/lock", threadId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
           } catch (Exception e) {
            e.printStackTrace();
           }
          countDown.countDown();
         }
       }.start();
     }
    try {
      countDown.await();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    List<String> list = zk.getChildren("/", false);
    list.forEach(s -> System.out.println(s));
   }

CreateMode.EPHEMERAL_SEQUENTIAL意思便是创立暂时次序节点。

输出节点如下

lock0000000110
lock0000000114
lock0000000113
lock0000000112
lock0000000111
lock0000000107
lock0000000106
lock0000000105
lock0000000109
lock0000000108

每次只要一个客户端能够加锁成功,假如一起有100个客户端,当其中一个开释锁后,告诉剩余99个客户端,然后99个客户端一起抢锁,其实只要一个会成功,剩余的98个仅仅陪跑的,做无用功,白白浪费系统资源。

既然每次只要一个会加锁成功,当一个客户端开释锁的时分,只告诉一个客户端不就能够了吗。

怎样做到呢?

便是用到暂时次序节点这个特色,不在监听父节点,而是监听前一个节点。

首要创立节点成功后,获取父节点下的一切子节点,因为各个节点是有次序,能够依照从小到大的次序排列后,然后判断自己的节点是不是最小的,假如是则获取锁,不是则监听前一个节点。

要害代码如下

public class WatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
  ZooKeeper zk;
  String threadId;
  CountDownLatch cc = new CountDownLatch(1);
  String pathName;
 
 // set/get省掉
  public void tryLock() {
    try {
      //  创立暂时有序的锁
      zk.create("/lock", threadId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "abc");
      cc.await();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
   }
  public void unLock() {
    try {
      zk.delete(pathName, -1);
     } catch (InterruptedException e) {
      e.printStackTrace();
     } catch (KeeperException e) {
      e.printStackTrace();
     }
   }
​
  @Override
  public void process(WatchedEvent event) {
    // 假如第一个锁开释了,只要第二个收到回调事情
    // 假如是其他的挂了,对应的后一个也能收到告诉
    switch (event.getType()) {
      case None:
        break;
      case NodeCreated:
        break;
      case NodeDeleted:
        zk.getChildren("/", false, this, "bcd");
        break;
      case NodeDataChanged:
        break;
      case NodeChildrenChanged:
        break;
     }
   }
  // create call back
  @Override
  public void processResult(int rc, String path, Object ctx, String name) {
    if (name != null) {
      System.out.println(threadId + "create node:" + name);
      pathName = name;
      // 获取一切创立的目录,即参与锁争夺的线程
      zk.getChildren("/", false, this, "bcd");
     }
   }
​
  /**
   * getChildren call back
   * pathName= /lock00000000003
   * children=[lock0000000002,lock0000000008,lock0000000005,lock0000000003]
   * 进入这个回调之后 阐明已经创立节点成功,能够看的已经创立的一切节点
   */@Override
  public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
​
    Collections.sort(children);
    // 所在位置
    int i = children.indexOf(pathName.substring(1));
​
    // 判断是不是第一个
    if (i == 0) {
      System.out.println(threadId + " first");
      cc.countDown();
     } else {
      // 不是第一个则监控前一个是否存在,假如前一个删除了需要回调我这个session
      zk.exists("/" + children.get(i - 1), this, this, "xyz");
     }
   }
​
  /**
   * @param rc
   * @param path
   * @param ctx
   * @param stat
   */
  @Override
  public void processResult(int rc, String path, Object ctx, Stat stat) {
​
   }
}

调用方:

  public void lock() {
    for (int i = 0; i < 10; i++) {
      new Thread() {
        @Override
        public void run() {
          WatchCallBack watchCallBack = new WatchCallBack();
          watchCallBack.setZk(zk);
          String threadId = Thread.currentThread().getId() + "";
          watchCallBack.setThreadId(threadId);
          // 抢锁
          watchCallBack.tryLock();
          //干活
          System.out.println(threadId + " working......");
          try {
            TimeUnit.SECONDS.sleep(1);
           } catch (InterruptedException e) {
            e.printStackTrace();
           }
          //开释锁
          watchCallBack.unLock();
         }
       }.start();
     }
   }

总结总结

zk是经过暂时节点,防止死锁问题(session消失,节点消失,锁开释)

经过次序节点,完成阻塞功能(暂时次序的节点数据)。

经过watch,watch前一个节点,最小的取得锁,一旦最小的锁开释,zk只会给下一个节点回调。防止了抢锁带来的不必要的损耗和压力。

我是纪先生,用输出倒逼输入而继续学习,继续分享技能系列文章,以及全网值得保藏好文,欢迎关注大众号,做一个继续生长的技能人。