锁机制全景
操作系统的锁
要想吃透 Java 中的锁,就必须先了解操作系统的锁。
互斥量 mutex
操作系统使用 互斥量(mutex)
来防止多个线程同时进入临界区,确保共享资源的线程安全。
Linux 系统使用 pthread_mutex_t
实现:
- 加锁(lock):如果
pthread_mutex_t
值为 1 ,获取锁,并将值设为0 ; 如果锁被占用,线程阻塞,加入等待队列 - 解锁(unlock):将
pthread_mutex_t
值设为 1,唤醒等待队列中的一个线程(或所有线程,取决于实现)
加锁 、 解锁示例
// Linux pthread 示例
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void critical_section() {
pthread_mutex_lock(&mutex); // 阻塞直到获取锁
// ... 临界区代码 ...
pthread_mutex_unlock(&mutex);
}
条件变量 cond
条件变量 cond
必须与互斥量配合使用,允许线程在某个条件不满足时主动阻塞
,并在条件满足时被唤醒。
操作系统提供如下 api:
pthread_cond_wait(&cond,&mtx);//---内部处理流程:解锁—>阻塞休眠—>唤醒—>加锁
pthread_cond_signal(&cond);//--激活一个等待线程。
pthread_cond_broadcast(&cond);//--则激活所有等待线程
操作示例
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
bool condition = false;
// 等待线程
void* waiter(void* arg) {
pthread_mutex_lock(&mutex);
while (!condition) { // ⚠️ 必须用循环检查条件(防止虚假唤醒)
pthread_cond_wait(&cond, &mutex); // 自动释放锁并阻塞
}
// 条件满足后重新持有锁
pthread_mutex_unlock(&mutex);
return NULL;
}
// 通知线程
void* notifier(void* arg) {
pthread_mutex_lock(&mutex);
condition = true;
pthread_cond_signal(&cond); // 唤醒一个等待线程
// pthread_cond_broadcast(&cond); // 唤醒所有等待线程
pthread_mutex_unlock(&mutex);
return NULL;
}
Synchronized
Synchronized
关键字是 JDK1.0 就自带的内置锁,利用 monitor
管程模式 将同步机制(加锁/解锁)和线程通信(等待/通知)统一封装,避免开发者直接操作底层系统的API(如 pthread_mutex_t
),降低复杂性。
当创建一个重量级锁时(jdk 1.6 引入了锁升级),JVM 创建 ObjectMonitor
对象,将对象的 Mark Word 替换为指向 ObjectMonitor
的指针。
// JVM 底层 C++ 代码
class ObjectMonitor {
void* volatile _header; // 存储对象头的 Mark Word(在重量级锁时被覆盖)
volatile intptr_t _count; // 锁重入次数
void* volatile _owner; // 持有锁的线程指针
ObjectWaiter* volatile _EntryList; // 竞争锁的阻塞线程队列(Contending Threads)
ObjectWaiter* volatile _WaitSet; // 因 wait() 而等待的线程队列(Waiting Threads)
volatile intptr_t _waiters; // 等待线程数
// 其他字段...
};
Monitor 与 synchronized 的隐式绑定
//随便创建一个对象
Object object = new Object();
public void m1() {
//对象锁
synchronized (object) {
System.out.println("hello synchronized lock");
}
}
这段代码反编译后,可以看到 JVM 通过 monitorenter
和 monitorexit
来实现同步(即 Monitor
机制)。
monitorenter
的底层逻辑
重量级锁阶段,调用 ObjectMonitor::enter()
,将线程加入 _EntryList
阻塞队列
// HotSpot enter() 简化逻辑
void ObjectMonitor::enter(TRAPS) {
Thread * const Self = THREAD;
if (TryFastEnter(Self)) return; // 尝试快速获取(偏向锁/轻量级锁路径)
slow_enter(Self); // 进入重量级锁逻辑(最终创建或绑定 ObjectMonitor)
}
Monitor 的线程协作
当调用 Java 中的 Object.wait()
方法时,JVM 会调用 ObjectMonitor::wait()
方法,将线程加入 _WaitSet
等待队列。
本质是调用操作系统的 pthread_cond_wait()
让出 CPU,线程状态转换为 WAITING
。
synchronized (lock) {
while (!condition) {
lock.wait(); // ⚠️ 必须在 synchronized 块内调用
}
}
// 对应的 JVM 实现
void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
ObjectMonitor* monitor = get_monitor(obj());
monitor->wait(millis, false, CHECK); // 线程进入 _WaitSet
}
当调用 Object.notify()
方法时, JVM 会调用 ObjectMonitor::notify()
方法,将线程从 _WaitSet
移到 _EntryList
参与锁竞争。
本质是调用操作系统的 pthread_cond_signal()
,唤醒一个等待线程,参与锁的竞争。
lock.notify();
// JVM 具体实现
void ObjectMonitor::notify(TRAPS) {
if (_WaitSet == NULL) return;
DequeueWaiter(THREAD); // 从 _WaitSet 移动到 _EntryList,等待竞争
}
对象内存布局
想要理解 synchronized
的锁升级过程,必须先了解 JVM 对象的内存布局。
Java 对象的结构分为:
- 对象头
- 对象标记 (MarkWord) :包含对象哈希码、GC标记、GC次数、同步锁标记等
- 类型指针 :指向方法区类模板的类元信息
- 实例数据 : 存放类的属性(field),也包含父类的属性
- 对齐填充 : 虚拟机要求对象的起始地址必须是 8 字节的整数倍,如果不是进行补充
JOL (Java Object Layout) 是 openjdk 团队提供的一个查看 JVM 对象内存布局的工具
JOL maven 依赖
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.17</version>
</dependency>
使用 JOL 查看对象内存布局
public static void main(String[] args) {
Object object = new Object();
System.out.println(ClassLayout.parseInstance(object).toPrintable());
MyObject myObject = new MyObject();
System.out.println(ClassLayout.parseInstance(myObject).toPrintable());
}
// 对象头 16 字节 + 实例数据 4 + 1 字节 = 21 个字节 + 对齐填充 = 24 个字节 (忽略压缩指针的影响)
class MyObject {
int age;
boolean flag;
}
//java.lang.Object object internals:
//OFF SZ TYPE DESCRIPTION VALUE
// 0 8 (object header: mark) 0x0000000000000001 (non-biasable; age: 0)
// 8 4 (object header: class) 0x00000e80
// 12 4 (object alignment gap)
//Instance size: 16 bytes
//Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
//com.ttdxg.juc.MyObject object internals:
//OFF SZ TYPE DESCRIPTION VALUE
// 0 8 (object header: mark) 0x0000000000000001 (non-biasable; age: 0)
// 8 4 (object header: class) 0x0109c400
// 12 4 int MyObject.age 0
// 16 1 boolean MyObject.flag false
// 17 7 (object alignment gap)
//Instance size: 24 bytes
- 发现
Object
对象头标记为 8 个字节,对象头类型指针为 4 个字节,最终对齐填充为 16 个字节 - 为什么类型指针为 4 个字节而不是 8 个字节? (JVM 默认开启压缩指针)
使用 -XX:+PrintCommandLineFlags
将会打印显示设置或修改默认值的 JVM 参数
PS E:\venti-java> java -XX:+PrintCommandLineFlags -version
-XX:ConcGCThreads=2 -XX:G1ConcRefinementThreads=8 -XX:GCDrainStackTargetSize=64 -XX:InitialHeapSize=265261568
-XX:MarkStackSize=4194304 -XX:MaxHeapSize=4244185088 -XX:MinHeapSize=6815736
-XX:+PrintCommandLineFlags -XX:ReservedCodeCacheSize=251658240 -XX:+SegmentedCodeCache
-XX:+UseCompressedOops -XX:+UseG1GC -XX:-UseLargePagesIndividualAllocation
java version "21.0.6" 2025-01-21 LTS
Java(TM) SE Runtime Environment (build 21.0.6+8-LTS-188)
Java HotSpot(TM) 64-Bit Server VM (build 21.0.6+8-LTS-188, mixed mode, sharing)
- 发现
-XX:+UseCompressedOops
默认启用对象压缩指针 - 当
UseCompressedOops
开启时,UseCompressedClassPointers
通常会隐式启用
// 对象压缩指针
-XX:+UseCompressedOops
// 类压缩指针
-XX:+UseCompressedClassPointers
Synchronized 锁升级
- jdk8 中锁转换的流程:
无锁(001) -> 偏向锁(101) -> 轻量级锁(00) -> 重量级锁(10)
- jdk21 中已经禁用了偏向锁:
无锁 -> 轻量级锁 -> 重量级锁
1. 无锁
- 场景:对象未发送过同步操作,不涉及锁竞争
- 表现:对象头的 MarkWord 记录对象的哈希码(调用才产生)或锁状态
2. 轻量级锁
- 触发条件:线程首次尝试通过
synchronized
进入同步块时 - 实现机制:
- CAS 自旋:JVM 在线程栈帧中创建锁记录(Lock Record),并尝试通过 CAS 操作将对象头中的 MarkWord 指向该锁记录
- 成功:线程获得锁,对象头的 MarkWord 更新为指向锁记录的指针,锁标记设置为轻量级锁状态 (
00
) - 失败:自旋尝试获取锁,若失败达到设置上限,升级为重量级锁
3. 重量级锁
- 触发条件:当轻量级锁自旋次数达到上限时,会升级为重量级锁;对象头的 MarkWord 更新为指向互斥量的指针,锁标记设置为重量级锁状态 (
10
) - 实现机制:在编译时会将同步块的开始位置插入
monitor enter
指令,在结束位置插入monitor exit
指令。Monitor
的owner
会存放占有线程的 Id
为什么废弃偏向锁
- 多线程竞争常态化:现代应用的多核并行特性使无竞争场景极少,偏向锁的优化收益降低
- 撤销成本高:偏向锁需在发生竞争时撤销状态,频繁撤销会导致性能倒退
锁升级的实际示例
Object lock = new Object();
// 线程1
new Thread(() -> {
synchronized (lock) { // 首次进入,升级为轻量级锁
// 业务逻辑
}
}).start();
// 线程2
new Thread(() -> {
synchronized (lock) { // 轻量级锁自旋失败后,膨胀为重量级锁
// 业务逻辑
}
}).start();
锁升级过程中 hashCode 去哪儿了?
- 无锁状态下:当对象的
hashCode()
方法第一次被调用时,hashCode
会存储在对象头的MarkWord
中;计算过hashCode
的对象无法升级为偏向锁,直接升级为轻量级锁 - 偏向锁状态下:在偏向锁状态下调用
hashCode()
,会立即撤销偏向锁,升级为重量级锁 - 轻量级锁状态下:
MarkWord
存储指向栈帧中的Lock Record
的指针,Lock Record
中存储了hashCode
- 重量级锁状态下:
MarkWord
存储的重量级锁指针,指向的Monitor
有字段记录非加锁状态下的MarkWord
,锁释放后会写回对象头
CAS 比较并交换
CAS (Compare And Swap) 比较并交换,是一种无锁原子操作,不需要传统锁限制,是 CPU 提供的原语指令 cmpxchg
。
CAS (compare and swap) 比较并交换,它包含三个操作数
- 内存位置 (V)
- 旧的值 (A)
- 更新值 (B)
CAS 是将工作内存的值和旧的值作比较
,如果相同则更新内存位置的值为更新值,否则不更新或重试,重试的行为称为自旋
JAVA 通过Unsafe
类实现 CAS ,其内部方法是 native
方法,使用 C 的指针,可以直接操作内存。不过 JDK 23 就要弃用了,改为 VarHandle
类实现 CAS:
Unsafe 类的源码 : 比较并交换 自旋
@IntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
自旋锁
CAS 实现自旋锁示例
public class SpinLock {
// 原子引用(指向当前持有锁的线程,用于实现自旋锁)
private AtomicReference<Thread> owner = new AtomicReference<>();
/**
* 尝试获取锁(自旋)
*/
public void lock() {
Thread current = Thread.currentThread();
// 🔥 自旋:通过 CAS 尝试将 owner 从 null 置为当前线程
while (!owner.compareAndSet(null, current)) {
// 自旋中(CPU 忙等,适合短时间锁竞争)
}
// 成功获取锁后退出循环
}
/**
* 释放锁
*/
public void unlock() {
Thread current = Thread.currentThread();
// 只有锁持有者才能解锁(防止非法释放)
owner.compareAndSet(current, null);
}
}
ABA 问题
CAS 自旋比传统锁更高效,其高效性来自 CPU 硬件的直接支持,但也带来了 ABA
问题
ABA 问题 (结果不变,过程有问题)
- 线程 1 从内存中取出 A
- 这时候线程2也从内存中取出 A ,并且线程2进行一些操作将值变为 B ,然后线程2 又将内存中的数据变为 A
- 线程 1 进行 CAS 操作发现内存中仍然是 A ,预期以为数据没问题
使用
AtomicStampedReference
解决 ABA 问题
// 使用AtomicStampedReference
AtomicStampedReference<String> ref = new AtomicStampedReference<>("初始值", 0);
// 更新时检查值和版本号
int[] stampHolder = new int[1];
String current = ref.get(stampHolder);
if (ref.compareAndSet(current, "新值", stampHolder[0], stampHolder[0] + 1)) {
System.out.println("更新成功");
}
AQS 抽象队列同步器
AQS
(AbstractQueuedSynchronizer
) 抽象的队列同步器,是 JUC 的基石,学 JUC 不知道 AQS
犹如学 JAVA 不知道 JVM !
Doug Lee 提出统一规范并简化了锁的实现,将其抽象出来
,屏蔽了同步状态管理、同步队列的管理和维护、阻塞线程排队和通知、唤醒机制等,是 JUC 中锁和同步组件实现的公共基础部分。
ReentrantLock
、ReentrantReadWriteLock
、Semaphore
、CountDownLatch
等都依赖 AQS
。
阻塞线程需要放入等待队列,等待线程释放然后竞争锁。 AQS 中使用改进的 CLH 队列作为等待队列,是一种特殊的双向链表
。
AQS 通过 state
状态位表示同步状态,通过 CAS 实现原子更新, AQS 的实质就是 state
状态位 + CLH 双端队列
// 状态操作方法
protected final int getState() { return state; }
protected final void setState(int newState) { state = newState; }
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS 源码分析
接下来通过 ReentrantLock
的非公平锁为例,分析源码过程
总体流程
ReentrantLock.lock()
-> NonfairSync.lock()
-> 直接尝试 CAS 抢锁(非公平性)
-> 若抢锁成功:设置当前线程为独占所有者
-> 若失败:调用 AQS.acquire(1)
-> tryAcquire(): 非公平方式再次尝试获取锁
-> addWaiter(): 将线程封装为 Node 加入 CLH 队列
-> acquireQueued(): 自旋或阻塞线程,等待唤醒
NonfairSync.lock()
方法
final void lock() {
if (compareAndSetState(0, 1)) // 直接 CAS 尝试修改锁状态(0 → 1)
setExclusiveOwnerThread(Thread.currentThread()); // 成功则设置独占线程
else
acquire(1); // 失败则进入 AQS 队列流程
}
AQS.acquire(1)
流程
public final void acquire(int arg) {
if (!tryAcquire(arg) && //尝试获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //加入队列并阻塞
selfInterrupt();
}
NonfairSync
中的tryAcquire(1)
实现
// AQS 中使用模板模式不提供实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//ReentrantLock 进行提供具体实现
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 获取当前锁状态
if (c == 0) { //如果锁未被持有,再次尝试 CAS 获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} //如果锁已被持有,判断当前线程是否为锁持有者 (可重入锁逻辑)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter()
:线程封装为 Node 并入队
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 入队 双向链表结构
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { //头节点未初始化
// 初始化头节点 哨兵节点
if (compareAndSetHead(new Node())) /
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // CAS 插入队尾
t.next = node;
return t;
}
}
}
}
acquireQueued()
:自旋或阻塞线程,等待唤醒
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获得当前节点的前置节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire()
:判断当前线程是否需要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //获取前置节点的等待状态 1 取消 -1 等待 -2 阻塞
if (ws == Node.SIGNAL) // SIGNAL (-1)状态,线程已经准备好,就等资源释放了
return true;
if (ws > 0) { // 前置节点状态为取消,则从队列中删除该节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // CAS 设置前置节点为 SIGNAL (-1)状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
:阻塞当前线程,等待唤醒
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
至此,其他线程已经放入 CLH 队列,并被 LockSupport.park(this)
阻塞
unlock()
释放资源
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// h 头节点此时为哨兵节点 waitStatus = -1 true
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //哨兵节点的 next 节点被唤醒
}
tryRelease()
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 1 - 1 = 0
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null); //释放锁
}
setState(c); // 锁状态重新设置为 0
return free;
}
非公平锁情况
按照上面的流程, CLH 队列中的线程会排队 FIFO 唤醒,但是这是非公平锁,允许插队
线程取消流程
/**
* 核心作用:将节点标记为取消状态,并尝试将其移出同步队列。
* 应用场景:线程因中断或超时放弃等待时触发。
*/
private void cancelAcquire(Node node) {
// 边界检查:节点为空直接返回
if (node == null)
return;
// 清空节点关联的线程,便于GC回收
node.thread = null;
// --- 步骤1:找到有效前驱节点 ---
Node pred = node.prev;
// 跳过所有已被取消的前驱节点(waitStatus>0表示CANCELLED状态)
while (pred.waitStatus > 0) {
// 更新当前节点的前驱指针,形成新的链表关系
node.prev = pred = pred.prev;
}
// predNext是前驱节点原来的下一跳(可能是当前节点)
Node predNext = pred.next;
// --- 步骤2:标记当前节点为取消状态 ---
// 直接写入,无需CAS,因为必须确保标记成功
node.waitStatus = Node.CANCELLED;
// --- 步骤3:处理当前节点是队尾的情况 ---
// 如果当前节点是队列尾部,尝试通过CAS更新尾指针到前驱节点
if (node == tail && compareAndSetTail(node, pred)) {
// 更新前驱节点的next指针为null(移除当前节点)
compareAndSetNext(pred, predNext, null);
} else {
// --- 步骤4:非队尾节点的处理逻辑 ---
int ws; // 前驱节点的状态
if (pred != head && // 前驱不是头节点(头节点为虚节点)
( // 检查前驱状态是否有效:
(ws = pred.waitStatus) == Node.SIGNAL || // 前驱状态已是SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) // 尝试设置为SIGNAL成功
) &&
pred.thread != null) { // 前驱关联的线程存活
// 获取当前节点的后继
Node next = node.next;
if (next != null && next.waitStatus <= 0) {
// 将前驱的next指针直接绕过当前节点,指向其后继
compareAndSetNext(pred, predNext, next);
}
} else {
// 不满足条件时,唤醒后继线程(确保状态传播)
unparkSuccessor(node);
}
// --- 步骤5:清理操作帮助GC ---
// 将节点的next指向自己,断开外部引用链(防止内存泄漏)
node.next = node; // help GC
}
}
ReentrantLock
ReentrantLock
是 JUC 提供的可重入互斥锁,它提供了比 synchronized
关键字更灵活、更强大的锁操作能力。
- 可重入性:同一个线程可以多次获取同一把锁
lock.lock();
try {
// 在锁内部可以再次获取同一把锁
lock.lock();
try {
// 临界区代码
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
- 公平性选择:支持公平和非公平两种模式
// 公平锁
ReentrantLock fairLock = new ReentrantLock(true);
// 非公平锁
ReentrantLock unfairLock = new ReentrantLock(false); // 默认
- 尝试获取锁:
tryLock()
// tryLock() 实现轮询
public void pollingTask() {
while (!Thread.currentThread().isInterrupted()) {
if (lock.tryLock()) {
try {
// 执行需要加锁的任务
executeTask();
break; // 任务完成后退出循环
} finally {
lock.unlock();
}
} else {
// 锁被占用,执行其他工作或等待
doOtherWork();
Thread.sleep(100); // 适当休眠避免忙等待
}
}
}
- 可中断获取
lockInterruptibly()
长时间等待的任务需要被外部取消
public class CancellableTask {
private final ReentrantLock taskLock = new ReentrantLock();
public void executeTask() throws InterruptedException {
taskLock.lockInterruptibly(); // 可中断获取锁
try {
while (!Thread.currentThread().isInterrupted()) {
// 执行任务逻辑
doWork();
}
} finally {
if (taskLock.isHeldByCurrentThread()) {
taskLock.unlock();
}
}
}
// 在另一个线程中可以调用thread.interrupt()来取消任务
}
- 条件变量
Condition
Condition notEmpty = lock.newCondition();
// 等待方
lock.lock();
try {
while (conditionNotMet) {
notEmpty.await(); // 释放锁并等待
}
// 条件满足后继续执行
} finally {
lock.unlock();
}
// 通知方
lock.lock();
try {
// 改变条件
notEmpty.signal(); // 或 signalAll()
} finally {
lock.unlock();
}
ReentrantReadWriteLock 可重入读写锁
读写锁(ReentrantReadWriteLock
) 是普通锁 (ReentryLock
)的一种演进,主要在读多写少场景下实现更高的吞吐量。它通过读锁(共享锁)
和 写锁(排他锁)
实现更高的并发性:
- 读锁(ReadLock):允许多个线程同时读取共享资源,但阻塞所有写操作
- 写锁(WriteLock):独占资源,同一时间仅允许一个线程执行写操作,阻塞所有读/写操作
在持有写锁
的情况下,主动获取读锁
,随后释放写锁的过程,将锁的权限从写锁降级为读锁的过程,称为锁降级
,保证后续读到的数据是自己最新更新的数据。
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.writeLock().lock(); // 1. 获取写锁
try {
// 更新数据...
lock.readLock().lock(); // 2. 在持有写锁时获取读锁(锁降级关键步骤)
} finally {
lock.writeLock().unlock(); // 3. 释放写锁(此时仍持有读锁)
}
// 如果不在释放写锁之前获取读锁,则没办法确定释放写锁之后自己能第一时间抢到读锁
// 如果自己没有第一时间抢到,则无法保证后续读到的数据是自己最新更新的数据
try {
// 安全读取数据...
} finally {
lock.readLock().unlock(); // 4. 释放读锁
}
为什么不能锁升级?
线程A 持有读锁,尝试获取写锁,需等待所有读锁释放;若线程B也如此,会导致死锁!
锁饥饿
读多写少的情况下,大量线程尝试获取读锁,读写互斥导致读锁没有释放完的情况下,也许写锁永远无法获取到。
StampedLock 邮戳锁
邮戳锁是一种更高效的读写锁,在ReentrantReadWriteLock
的基础上添加了乐观读模式,专为极端读多写少场景设计。
邮戳锁通过乐观读机制将读操作性能推向极致,但也增加了复杂性,建议优先使用传统读写锁,仅在性能瓶颈明确时考虑 StampedLock
核心特性
- 写锁: 独占锁,阻塞所有读/写操作
- 悲观读锁: 类似传统读锁,读操作共享,阻塞写锁
- 乐观读锁: 无锁化读,通过检查邮戳(时间戳)验证数据是否被修改
邮戳锁示例
StampedLock lock = new StampedLock();
// 获取写锁(返回一个邮戳标识)
long writeStamp = lock.writeLock();
try {
// 执行写操作
} finally {
lock.unlockWrite(writeStamp); // 释放写锁需要对应邮戳
}
// 获取悲观读锁(阻塞写锁)
long readStamp = lock.readLock();
try {
// 执行读操作
} finally {
lock.unlockRead(readStamp);
}
// 尝试乐观读(不阻塞写锁)
long optimisticStamp = lock.tryOptimisticRead();
// 读取数据...
if (!lock.validate(optimisticStamp)) { // 验证邮戳是否过期
// 数据被修改,乐观读锁升级为悲观读锁
readStamp = lock.readLock();
try {
// 重新读取数据
} finally {
lock.unlockRead(readStamp);
}
}
Semaphore 信号量
Semphore
信号量可以控制同时访问特定资源的线程数量,比如非常适合限制数据库连接数、文件操作数(防止内存溢出)等场景
Semaphore sem = new Semaphore(3); // 允许3个线程同时访问
sem.acquire(); // 获取许可
try {
// 临界区代码
} finally {
sem.release(); // 释放许可
}
CountDownLatch 计数器
CountDownLatch
计数器允许一个或多个线程等待,直到其他线程完成操作。
大数据分片处理
public class DataProcessor {
private final CountDownLatch taskLatch;
private final ResultAggregator aggregator;
public Result process(DataChunk[] chunks) {
taskLatch = new CountDownLatch(chunks.length);
aggregator = new ResultAggregator();
ExecutorService executor = Executors.newFixedThreadPool(8);
for (DataChunk chunk : chunks) {
executor.submit(() -> {
try {
ChunkResult result = processChunk(chunk);
aggregator.collect(result);
} finally {
taskLatch.countDown();
}
});
}
taskLatch.await(); // 等待所有分片处理完成
return aggregator.getFinalResult();
}
}
LockSupport 锁支持
LockSupport
是底层原语,直接操作线程的阻塞与唤醒,用于构建高级同步工具(AQS
,ReentrantLock
,甚至Condition
本身)
/**
* lockSupport 使用 park() 和 unpark() 来阻塞和唤醒线程
* park() 会消耗许可证,有则放行,没有则阻塞
* unpark() 会发放许可证
*
* 许可证不会累计,多次调用 unpark() 也只能发放一个 许可证
*/
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " come in");
LockSupport.park(); // park() 与 unpark() 没有顺序要求
System.out.println(Thread.currentThread().getName() + " 被唤醒");
}, "t1").start();
new Thread(()->{
System.out.println(Thread.currentThread().getName() + " come in");
LockSupport.unpark(t1);
System.out.println(Thread.currentThread().getName() + " 唤醒");
},"t2").start();
}