⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 guobinhit.blog.csdn.net/article/details/106592138 「CG国斌」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

CAS

什么是 CAS?

CAS(Compare And Swap),即比较并交换,是解决多线程并行情况下使用锁造成性能损耗的一种机制,CAS 操作包含三个操作数——内存位置V、预期原值A和新值B。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值;否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。

CAS 有效地说明了“我认为位置V应该包含值A,如果包含该值,则将B放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”在 Java 中,sun.misc.Unsafe类提供了硬件级别的原子操作来实现这个 CAS,java.util.concurrent包下的大量类都使用了这个Unsafe类的 CAS 操作。

CAS 的应用

java.util.concurrent.atomic包下的类大多是使用 CAS 操作来实现的,如AtomicIntegerAtomicBooleanAtomicLong等。下面以AtomicInteger的部分实现来大致讲解下这些原子类的实现。

public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;

// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();

private volatile int value;// 初始int大小
// 省略了部分代码...

// 带参数构造函数,可设置初始int大小
public AtomicInteger(int initialValue) {
value = initialValue;
}
// 不带参数构造函数,初始int大小为0
public AtomicInteger() {
}

// 获取当前值
public final int get() {
return value;
}

// 设置值为 newValue
public final void set(int newValue) {
value = newValue;
}

//返回旧值,并设置新值为 newValue
public final int getAndSet(int newValue) {
/**
* 这里使用for循环不断通过CAS操作来设置新值
* CAS实现和加锁实现的关系有点类似乐观锁和悲观锁的关系
* */
for (;;) {
int current = get();
if (compareAndSet(current, newValue))
return current;
}
}

// 原子的设置新值为update, expect为期望的当前的值
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

// 获取当前值current,并设置新值为current+1
public final int getAndIncrement() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}

// 此处省略部分代码,余下的代码大致实现原理都是类似的
}

一般来说,在竞争不是特别激烈的时候,使用该包下的原子操作性能比使用synchronized关键字的方式高效的多。通过查看getAndSet()方法,可知如果资源竞争十分激烈的话,这个for循环可能换持续很久都不能成功跳出。

在这种情况下,我们可能需要考虑如何降低对资源的竞争。在较多的场景下,我们可能会使用到这些原子类操作。一个典型应用就是计数,在多线程的情况下需要考虑线程安全问题,示例代码如下:

public class Counter {
private int count;
public Counter(){}
public int getCount(){
return count;
}
public void increase(){
count++;
}
}

上面这个类在多线程环境下会有线程安全问题,要解决这个问题最简单的方式可能就是加锁,优化代码如下:

public class Counter {
private int count;
public Counter(){}
public synchronized int getCount(){
return count;
}
public synchronized void increase(){
count++;
}
}

这是悲观锁的实现,如果我们需要获取这个资源,那么我们就给它加锁,其他线程都无法访问该资源,直到我们操作完后释放对该资源的锁。我们知道,悲观锁的效率是不如乐观锁的,上面说了atomic包下的原子类的实现是乐观锁方式,因此其效率会比使用synchronized关键字更高一些,推荐使用这种方式,代码如下:

public class Counter {
private AtomicInteger count = new AtomicInteger();
public Counter(){}
public int getCount(){
return count.get();
}
public void increase(){
count.getAndIncrement();
}
}

CAS 的缺点

CAS 虽然能够很高效的实现原子操作,但是 CAS 仍然存在三大问题。

ABA 问题

因为 CAS 需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA 问题的解决思路就是使用版本号,在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A就会变成1A-2B-3A

从 Java 1.5 开始 JDK 的atomic包里提供了一个类AtomicStampedReference来解决 ABA 问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

循环时间长开销大

CAS 自旋如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,一是它可以延迟流水线执行指令,使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零;二是它可以避免在退出循环的时候因内存顺序冲突而引起 CPU 流水线被清空,从而提高 CPU 的执行效率。

只能保证一个共享变量的原子操作

当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就需要用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。

比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用 CAS 来操作ij。从 Java 1.5 开始 JDK 提供了AtomicReference类来保证引用对象之间的原子性,我们可以把多个变量放在一个对象里来进行 CAS 操作。

AQS

什么是 AQS?

AQS(AbstractQueuedSynchronizer),即抽象队列同步器,是 JDK 下提供的一套用于实现基于 FIFO 等待队列的阻塞锁和相关的同步器的一个同步框架。这个抽象类被设计为作为一些可用原子int值来表示状态的同步器的基类。

如果我们看过类似CountDownLatch类的源码实现,会发现其内部有一个继承了AbstractQueuedSynchronizer的内部类Sync。可见CountDownLatch是基于 AQS 框架来实现的一个同步器,类似的同步器在 JUC 下还有不少,如Semaphore等。

AQS 的应用

如上所述,AQS 管理一个关于状态信息的单一整数,该整数可以表现任何状态。比如,Semaphore用它来表现剩余的许可数,ReentrantLock用它来表现拥有它的线程已经请求了多少次锁;FutureTask用它来表现任务的状态等。

/* To use this class as the basis of a synchronizer, redefine the
* following methods, as applicable, by inspecting and/or modifying
* the synchronization state using {@link #getState}, {@link
* #setState} and/or {@link #compareAndSetState}:
*
* <ul>
* <li> {@link #tryAcquire}
* <li> {@link #tryRelease}
* <li> {@link #tryAcquireShared}
* <li> {@link #tryReleaseShared}
* <li> {@link #isHeldExclusively}
* </ul>
* /

如 JDK 的文档中所说,使用 AQS 来实现一个同步器需要覆盖实现如下几个方法,并且使用getStatesetStatecompareAndSetState这三个方法来操作状态。

boolean tryAcquire(int arg)
boolean tryRelease(int arg)
int tryAcquireShared(int arg)
boolean tryReleaseShared(int arg)
boolean isHeldExclusively()

以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法,支持独占(排他)获取锁的同步器应该实现tryAcquiretryReleaseisHeldExclusively;而支持共享获取的同步器应该实现tryAcquireSharedtryReleaseSharedisHeldExclusively

下面以CountDownLatch举例说明基于 AQS 实现同步器,CountDownLatch用同步状态持有当前计数,countDown方法调用 release从而导致计数器递减;当计数器为 0 时,解除所有线程的等待;await调用acquire,如果计数器为 0,acquire会立即返回,否则阻塞。通常用于某任务需要等待其他任务都完成后才能继续执行的情景。源码如下:

public class CountDownLatch {
/**
* 基于AQS的内部Sync
* 使用AQS的state来表示计数count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
// 使用AQS的getState()方法设置状态
setState(count);
}

int getCount() {
// 使用AQS的getState()方法获取状态
return getState();
}

// 覆盖在共享模式下尝试获取锁
protected int tryAcquireShared(int acquires) {
// 这里用状态state是否为0来表示是否成功,为0的时候可以获取到返回1,否则不可以返回-1
return (getState() == 0) ? 1 : -1;
}

// 覆盖在共享模式下尝试释放锁
protected boolean tryReleaseShared(int releases) {
// 在for循环中Decrement count直至成功;
// 当状态值即count为0的时候,返回false表示 signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

private final Sync sync;

// 使用给定计数值构造CountDownLatch
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

// 让当前线程阻塞直到计数count变为0,或者线程被中断
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// 阻塞当前线程,除非count变为0或者等待了timeout的时间。当count变为0时,返回true
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// count递减
public void countDown() {
sync.releaseShared(1);
}

// 获取当前count值
public long getCount() {
return sync.getCount();
}

public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}

AQS 实现原理浅析

AQS 的实现主要在于维护一个volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被阻塞时会进入此队列,此队列称之为CLH队列)。CLH 队列中的每个节点是对线程的一个封装,包含线程基本信息,状态,等待的资源类型等。

CLH结构如下:

*      +------+  prev +-----+       +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+

下面简单看下获取资源的代码:

 public final void acquire(int arg) {
// 首先尝试获取,不成功的话则将其加入到等待队列,再for循环获取
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 从clh中选一个线程获取占用资源
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 当节点的先驱是head的时候,就可以尝试获取占用资源了tryAcquire
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 如果获取到资源,则将当前节点设置为头节点head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果获取失败的话,判断是否可以休息,可以的话就进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private Node addWaiter(Node mode) {
// 封装当前线程和模式为新的节点,并将其加入到队列中
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// tail为null,说明还没初始化,此时需进行初始化工作
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 否则的话,将当前线程节点作为tail节点加入到CLH中去
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

参考资料:

  • ava并发编程——CAS
  • ava并发之AQS详解
  • AVA并发编程: CAS和AQS
文章目录
  1. 1. CAS
    1. 1.1. 什么是 CAS?
    2. 1.2. CAS 的应用
    3. 1.3. CAS 的缺点
      1. 1.3.0.1. ABA 问题
      2. 1.3.0.2. 循环时间长开销大
      3. 1.3.0.3. 只能保证一个共享变量的原子操作
  • 2. AQS
    1. 2.1. 什么是 AQS?
    2. 2.2. AQS 的应用
    3. 2.3. AQS 实现原理浅析