AQS作为Java并发编程的基石,在Java同步工具中有广泛应用,例如:ReentrantLock
, Semaphore
, CountDownLatch
ReentrantReadWriteLock
, ThreadPoolExecutor
一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现 tryAcquire / tryRelease
, tryAcquireShared / tryReleaseShared
中的一组即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。ReentrantLock是独占锁。
AQS核心思想是:如果被请求的共享资源(state)空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
CLH:Craig, Landin, and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
主要原理图如下:
AQS使用一个Volatile的int类型的成员变量state
来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。
获得锁
acquire()是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。获取到资源后,线程就可以去执行其临界区代码了。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果再有线程要获取锁,依次在队列中往后排队即可。
tryAcquire()方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。需自定义同步器去实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现吗。AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS)!!!至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!!
addWaiter(Node)此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。
private Node addWaiter(Node mode) {
//以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//尝试快速方式直接放到队尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失败则通过enq入队。
enq(node);
return node;
}
enq(Node)方法用于将node加入队尾。
private Node enq(final Node node) {
//CAS"自旋",直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。聪明的你立刻应该能想到该线程下一部该干什么了吧:进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标记是否成功拿到资源
try {
boolean interrupted = false;//标记等待过程中是否被中断过
//又是一个“自旋”!
for (;;) {
final Node p = node.predecessor();//拿到前驱
//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
failed = false; // 成功获取资源
return interrupted;//返回等待过程中是否被中断过
}
//如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
}
} finally {
if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
cancelAcquire(node);
}
}
Node2自旋查看前置节点,直到前置节点执行完成状态修改为CANCELLED,然后断开前置节点的链接,获取资源开始执行。
再来总结下它的流程吧:
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
由于此函数是重中之重,我再用流程图总结一下:
释放锁
release()方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到头结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列里的下一个线程
return true;
}
return false;
}
逻辑并不复杂。它调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点!!
tryRelease()方法尝试去释放指定量的资源。需自定义同步器去实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。
private void unparkSuccessor(Node node) {
//这里,node一般为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0)//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一个需要唤醒的结点s
if (s == null || s.waitStatus > 0) {//如果为空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
这个函数并不复杂。一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程。
AQS实现非公平和公平
公平锁和非公平锁在于hasQueuedPredecessors()这个方法。hasQueuedPredecessors是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回False,说明当前线程可以争取共享资源;如果返回True,说明队列中存在有效节点,当前线程必须加入到等待队列中。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 头节点不是尾节点
// 第一个节点不为空
// 当前节点是头节点
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
线程在doAcquireNanos
方法中获取锁时,会先加入同步队列,之后根据情况再陷入阻塞。当阻塞后的节点一段时间后醒来时,这时候来了更多的新线程来抢锁,这些新线程还没有加入到同步队列中去,也就是在tryAcquire方法中获取锁。
在公平锁
下,这些新线程会发现同步队列中存在节点等待,那么这些新线程将无法获取到锁,去排队;
而在非公平锁
下,这些新线程会跟排队苏醒的线程进行锁争抢,失败的去同步队列中排队。因此这里的公平与否,针对的其实是苏醒线程与还未加入同步队列的线程,而对于已经在同步队列中阻塞的线程而言,它们内部自身其实是公平的,因为它们是按顺序被唤醒的,这是根据AQS节点唤醒机制和同步队列的FIFO特性决定的。
自定义同步工具
了解AQS基本原理以后,按照上面所说的AQS知识点,自己实现一个同步工具。
public class LeeLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire (int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease (int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively () {
return getState() == 1;
}
}
private Sync sync = new Sync();
public void lock () {
sync.acquire(1);
}
public void unlock () {
sync.release(1);
}
}
通过我们自己定义的Lock完成一定的同步功能。
public class LeeMain {
static int count = 0;
static LeeLock leeLock = new LeeLock();
public static void main (String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run () {
try {
leeLock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
leeLock.unlock();
}
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}
上述代码每次运行结果都会是20000。通过简单的几行代码就能实现同步功能,这就是AQS的强大之处。
评论