关注:王有志,一个同享硬核Java技术的互金摸鱼侠。 欢迎你加入Java人的提桶跑路群:共同富裕的Java人

今日咱们来聊一聊AQS宗族中另一个重要成员Semaphore,我只收集到了一道关于Semaphore的面试题,问了问“是什么”和“怎样完成的”:

  • 什么是Semaphore?它是怎样完成的?

依照咱们的惯例,依旧是依照“是什么”,“怎样用”和“怎样完成的”这3步来剖析Semaphore。另外,今日供给了题解。

Semaphore的运用

Semaphore直译过来是信号量,是计算机科学中十分Old School的处理同步与互斥的机制**,与互斥锁不同的是它允许指定数量的线程或进程拜访同享资源

Semaphore处理同步与互斥的机制和咱们平时过地铁站的闸机十分类似。刷卡打开闸机(acquire操作),经过后(拜访临界区)闸机封闭(release操作),后边的人才能够持续刷卡,而在前一个人经过前,后边的人只能排队等候(行列机制)。当然,地铁站不可能只要一个闸机,拥有几个闸机,就允许几个人一起经过。

18.详解AQS家族的成员:Semaphore

信号量也是这样的,经过结构函数界说答应数量,运用时请求答应,处理完业务逻辑后开释答应:

// 信号量中界说1个答应
Semaphore semaphore = new Semaphore(1);
// 请求答应
semaphore.acquire();
......
// 开释答应
semaphore.release();

当咱们为Semaphore界说一个答应时,它和互斥锁相同,同一时间只允许一个线程进入临界区。可是当咱们界说了多个答应时,它与互斥锁的差异就体现出来了:

Semaphore semaphore = new Semaphore(3);
for(int i = 1; i < 5; i++) {
  int finalI = i;
  new Thread(()-> {
    try {
      semaphore.acquire();
      System.out.println("第[" + finalI + "]个线程获取到semaphore");
      TimeUnit.SECONDS.sleep(10);
      semaphore.release();
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }).start();
}

履行这段代码能够看到,同一时间3个线程都进入了临界区,只要第4个线程被挡在了临界区外。

Semaphore的完成原理

还记得在《AQS的此生,构建出JUC的根底》中提到的同步状况吗?咱们其时说它是某些同步器的计数器:

AQS中,state不只用作表示同步状况,也是某些同步器完成的计数器,如:Semaphore中允许经过的线程数量,ReentrantLock中可重入特性的完成,都依赖于state作为计数器的特性。

先来看Semaphore与AQS的联系:

18.详解AQS家族的成员:Semaphore

与ReentrantLock相同,Semaphore内部完成了继承自AQS的同步器抽象类Sync,并有FairSyncNonfairSync两个完成类。接下来咱们就经过剖析Semaphore的源码,来验证咱们之前的说法。

结构办法

Semaphore供给了两个结构办法:

public Semaphore(int permits) {
  sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
  sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

能够看到Semaphore和ReentrantLock的设计思路是共同的,Semaphore内部也完成了两个同步器FairSyncNonfairSync,别离完成公正形式和非公正形式,而Semaphore的结构本质上是结构同步器的完成。咱们以非公正形式的NonfairSync的完成为例:

public class Semaphore implements java.io.Serializable {
  static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
      super(permits);
    }
  }
  abstract static class Sync extends AbstractQueuedSynchronizer {
    Sync(int permits) {
      setState(permits);
    }
  }
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  protected final void setState(int newState) {
    state = newState;
  }
}

追根溯源,结构器的参数permits最终还是回归到了AQS的state身上,借助了state作为计数器的特性来完成Semaphore的功用。

acquire办法

现在咱们现已为Semaphore设置了一定数量的答应(permits),接下来咱们就需求经过Semaphore#acquire办法获取答应,进入Semaphore所“看护”的临界区:

public class Semaphore implements java.io.Serializable {
  public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
  }
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted()) {
      throw new InterruptedException();
    }
    if (tryAcquireShared(arg) < 0) {
      doAcquireSharedInterruptibly(arg);
    }
  }
}

这两步和ReentrantLock十分类似,先经过tryAcquireShared测验直接获取答应,失利后经过doAcquireSharedInterruptibly加入到等候行列中。

Semaphore中直接获取答应的逻辑十分简单:

static final class NonfairSync extends Sync {
  protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
  }
}
abstract static class Sync extends AbstractQueuedSynchronizer {
  final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
      // 获取可用答应数量
      int available = getState();
      // 计算答应数量
      int remaining = available - acquires;
      if (remaining < 0 || compareAndSetState(available, remaining)) {
        return remaining;
      }
    }
  }
}

首先是获取并减少可用答应的数量,当答应数量小于0时回来一个负数,或经过CAS更新答应数量成功后,回来一个正数。此刻doAcquireSharedInterruptibly会将当时的请求Semaphore答应的线程添加到AQS的等候行列中。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
   // 创建同享形式的等候节点
   final Node node = addWaiter(Node.SHARED);
   try {
     for (;;) {
       final Node p = node.predecessor();
       if (p == head) {
         // 再次测验获取答应,并回来剩下答应数量
         int r = tryAcquireShared(arg);
         if (r >= 0) {
           // 获取成功,更新头节点
           setHeadAndPropagate(node, r);
           p.next = null;
           return;
         }
       }
       // 获取失利进入等候状况
       if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
         throw new InterruptedException();
       }
     }
   } catch (Throwable t) {
     cancelAcquire(node);
     throw t;
   }
 }
}

Semaphore的运用的doAcquireSharedInterruptiblyReentrantLock运用的acquireQueued办法中心逻辑一向,可是有纤细的完成不同:

  • 创建节点运用Node.SHARED形式;

  • 更新头节点运用了setHeadAndPropagate办法。

private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head;
  setHead(node);
  // 是否要唤醒等候中的节点
  if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared()) {
      // 唤醒等候中的节点
      doReleaseShared();
    }
  }
}

咱们知道在ReentrantLock中履行acquireQueued,当成功获取锁后,只需求履行setHead(node)即可,那么为什么Semaphore还要再进行唤醒?

假定有3个答应的Semaphore一起有T1,T2,T3和T4总计4个线程竞赛:

  • 它们一起进入nonfairTryAcquireShared办法,假定只要T1经过compareAndSetState(available, remaining)成功修正有用的答应数量,T1进入临界区;

  • T2,T3和T4进入doAcquireSharedInterruptibly办法,经过addWaiter(Node.SHARED)构建出AQS的等候行列(参阅AQS的此生中关于addWaiter办法的剖析);

  • 假定T2成为了头节点的直接后继节点,T2再次履行tryAcquireShared测验获取答应,T3和T4履行parkAndCheckInterrupt

  • T2成功获取答应并进入临界区,此刻Semaphore剩下1个答应,而T3和T4处于暂停状况中。

这种场景中,只要两个答应产生了作用,明显不符合咱们对的初衷,因此在履行setHeadAndPropagate更新头节点时,判别剩下答应的数量,当数量大于0时持续唤醒后继节点。

Tips

  • Semaphore在获取答应的流程与ReentrantLock加锁的进程高度类似~~

  • 下文剖析doReleaseShared是怎样唤醒等候中节点的。

release办法

Semaphore的release办法就十分简单了:

public class Semaphore implements java.io.Serializable {
  public void release() {
    sync.releaseShared(1);
  }
  abstract static class Sync extends AbstractQueuedSynchronizer {
    protected final boolean tryReleaseShared(int releases) {
      for (;;) {
        int current = getState();
        // 计算答应数量
        int next = current + releases;
        if (next < current) {
          throw new Error("Maximum permit count exceeded");
        }
        // 经过CAS更新答应数量
        if (compareAndSetState(current, next)) {
            return true;
        }
      }
    }
  }
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
      doReleaseShared();
      return true;
    }
    return false;
  }
  private void doReleaseShared() {
    for (;;) {
      Node h = head;
      // 判别AQS的等候行列是否为空
      if (h != null && h != tail) {
        int ws = h.waitStatus;
        // 判别当时节点是否处于待唤醒的状况
        if (ws == Node.SIGNAL) {
          if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)){
            continue;
          }
          unparkSuccessor(h);
        } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) {
          // 状况为0时,更新节点的状况为无条件传达
          continue;
        }
      }
      if (h == head) {
        break;
      }
    }
  }
}

咱们能够看到Semaphore的release办法分了两部分:

  • tryReleaseShared办法更新Semaphore的有用答应数量;

  • doReleaseShared唤醒处于等候中的节点。

唤醒的逻辑并不杂乱,依旧是对节点状况waitStatus的判别,来确认是否需求履行unparkSuccessor,当状况为ws == 0,会将节点的状况更新为Node.PROPAGAT,即无条件传达。

Tips:与ReentrantLock所不同的是,Semaphore并不支持Node.CONDITION状况,相同的ReentrantLock也不支持Node.PROPAGATE状况。

结语

关于Semaphore的内容到这儿就结束了,今日咱们只具体剖析了非公正形式下中心办法的完成,至于公正形式的完成,以及其它办法的完成,就留个咱们自行探索了。

好了,期望本文能够带给你一些协助,咱们下次再见!最终欢迎咱们关注王有志的专栏《Java面试都问啥?》。