JDK 并发机制演进
1. JDK1 时代
synchronized
关键字
JVM 通过 管程(Monitor) 实现,依赖于操作系统互斥锁,性能差,锁粒度大
wait()/notify()
基于 Object
类的线程协作方法;必须包裹在同步块内,仅能唤醒一个随机线程
java
synchronized (lock) {
if (condition) lock.wait(); // 条件不满足时阻塞(应使用 while 检查)
// 可能虚假唤醒后直接执行后续逻辑
}
2. JDK5 革命
JAVA 并发大神 Doug Lea 带来了 java.util.concurrent
包
ReentrantLock
可重入锁
替代 synchronized
的显式锁,支持公平锁、非公平锁、可中断锁、尝试获取锁超时
java
class Ticket {
private int number = 50;
//非公平锁
// ReentrantLock lock = new ReentrantLock();
//公平锁
ReentrantLock lock = new ReentrantLock(true);
public void sale() {
lock.lock();
try {
if (number > 0) {
System.out.println(Thread.currentThread().getName() +
"卖出了" + (number--) + "票,剩余" + number + "票");
}
}finally {
lock.unlock();
}
}
}
Condition
条件变量
与显示锁 ReentrantLock
配合使用,用来替代 wait()/notify()
机制
java
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(()->{
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "come in"); //1️⃣
condition.await(); //调用 await() 方法后,线程阻塞,释放锁
System.out.println(Thread.currentThread().getName() + "被唤醒"); //4️⃣
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}},"t1").start();
new Thread(()->{
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+ " come in "); //2️⃣
condition.signal(); //调用 signal() 方法后,线程恢复,获取锁
System.out.println(Thread.currentThread().getName() + " 唤醒"); //3️⃣
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}},"t2").start();
LockSupport
LockSupport
是底层原语,直接操作线程的阻塞与唤醒,用于构建高级同步工具(AQS
,ReentrantLock
,甚至Condition
本身)
java
/**
* lockSupport 使用 park() 和 unpark() 来阻塞和唤醒线程
* park() 会消耗许可证,有则放行,没有则阻塞
* unpark() 会发放许可证
*
* 许可证不会累计,多次调用 unpark() 也只能发放一个 许可证
*/
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " come in");
LockSupport.park(); // park() 与 unpark() 没有顺序要求
System.out.println(Thread.currentThread().getName() + " 被唤醒");
}, "t1").start();
new Thread(()->{
System.out.println(Thread.currentThread().getName() + " come in");
LockSupport.unpark(t1);
System.out.println(Thread.currentThread().getName() + " 唤醒");
},"t2").start();
}
- 线程池框架 (Executor Service)
java
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
// CPU 密集型 = CPU 核心数;IO 密集型 = CPU 核心数 * 2
4, //核心线程数(池中保持存活的最小线程数)
8, //最大线程数
60, TimeUnit.SECONDS, //空闲线程存活时间
new ArrayBlockingQueue<>(100) //任务队列容量
);
//ExecutorService threadPool = Executors.newFixedThreadPool(4);
//提交任务
threadPool.submit(() -> System.out.println("Runnable 任务"));
Future<String> future = threadPool.submit(() -> "Callable 任务返回结果");
try {
String result = future.get(); //结果
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
- 原子类
java.util.concurrent.atomic
包提供了原子操作类,基于 CPU 原子指令实现(CAS),无锁算法
- JMM 模型重写
JDK5 彻底重写了 JAVA 内存模型(JMM)
- volatile 的新语义:确保变量的可见性和禁止指令重排序
final
字段初始化安全:对象构造完成后,final
字段的值对所有线程可见- Happens-Before 规则:明确定义多线程操作的执行顺序
java
// volatile 的正确使用示例
class SharedData {
private volatile boolean flag = false;
void writer() { // Thread A
data = 42; // 普通写入可能被重排序
flag = true; // volatile 写入,强制刷新内存屏障
}
void reader() { // Thread B
if (flag) { // volatile 读取,强制加载最新值
System.out.println(data); // 保证看到 data=42
}
}
}
3. JDK6 的锁优化:synchronized 重生
JVM 层面通过偏向锁、轻量级锁、重量级锁等优化,对 synchronized
进行性能优化
4. JDK7 分治与动态同步
ForkJoinPool
java
class SumTask extends RecursiveTask<Long> {
private final int[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 1000; // 当数组长度 ≤1000 时不再拆分
SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 基准条件:小规模数组直接循环计算
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// 分治步骤:拆分任务
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
leftTask.fork(); // 将左任务提交到队列异步执行
long rightResult = rightTask.compute(); // 当前线程立即处理右任务
long leftResult = leftTask.join(); // 等待左任务结果
return leftResult + rightResult; // 合并结果
}
}
}
public static void main(String[] args) {
int[] data = new int[100_0000]; // 假设是一个大型数组
//填充数组
for (int i = 0; i < data.length ; i++) {
data[i] = new Random().nextInt(10);
}
SumTask task = new SumTask(data, 0, data.length);
ForkJoinPool pool = new ForkJoinPool();
long total = pool.invoke(task); // 触发并行计算
System.out.println("总和:" + total);
}
5. JDK8 异步革命
CompletableFuture
函数式异步链:支持组合、异常处理、超时控制
创建异步任务
java
// 使用默认线程池
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
// 指定自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> futureWithExecutor = CompletableFuture.supplyAsync(
() -> "Hello", executor
);
thenApply / handle 链式处理结果
java
// thenApply 链式处理具有依赖性,下一步依赖上一步的结果
// thenApply 操作遇到异常就不会往下走 / handle 遇到异常也会把流程执行完
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> "10")
.thenApply(str -> {
// 将字符串转换为整数
return Integer.parseInt(str);
})
.thenApply(num -> {
// 计算平方
return num * num;
});
result.get(); // 返回 100
thenAccept/thenRun 消费结果
java
CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(s -> System.out.println(s + " World")); // 输出 "Hello World"
CompletableFuture.supplyAsync(() -> 42)
.thenRun(() -> System.out.println("Done!")); // 任务完成时通知
组合多个任务
java
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B");
//合并两个任务的结果
future1.thenCombine(future2, (a, b) -> a + b)
.thenAccept(System.out::println); // 输出 "AB"
- 并行流 (Parallel Stream)
隐式并行化:基于 ForkJoinPool.commonPool()
StampedLock
邮戳锁
乐观读模式:优化读多写少场景
java
private final StampedLock lock = new StampedLock();
Object read() {
long stamp = lock.tryOptimisticRead(); // 乐观读(不阻塞)
Object data = loadData(); // 业务操作
if (!lock.validate(stamp)) { // 检查是否有写操作
stamp = lock.readLock(); // 升级为悲观读锁
try { data = loadData(); }
finally { lock.unlockRead(stamp); }
}
return data;
}
6. JDK9 VarHandle
VarHandle
替代 Unsafe
的合法内存操作
7. JDK21 轻量级并发革命
- 虚拟线程 (适用于高并发 IO 操作)
虚拟线程是用户态线程,由 JVM 调度,没有内核态上下文切换,开销小,性能高
java
// 启动百万级虚拟线程(内存占用仅数GB)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 1_000_000; i++) {
executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1)); // 挂起时自动释放线程资源
return processRequest();
});
}
}
- 结构化并发 (替代传统
Future
)
强制作用域:确保子任务不会泄露到父任务之外
java
Response handleRequest() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> user = scope.fork(() -> queryUser());
Future<Order> order = scope.fork(() -> queryOrder());
scope.join(); // 等待所有子任务完成
scope.throwIfFailed(); // 任一子任务失败则整体失败
return new Response(user.resultNow(), order.resultNow());
} // 作用域结束:自动取消未完成子任务
}
- ScopedValue
虚拟线程场景下替代 ThreadLocal
,作用域结束后自动清理资源
java
// 声明作用域值
private static final ScopedValue<User> CURRENT_USER = ScopedValue.newInstance();
void processRequest(User user) {
ScopedValue.where(CURRENT_USER, user)
.run(() -> handleRequestImpl());
}
void handleRequestImpl() {
User user = CURRENT_USER.get(); // 仅在此作用域内有效
}