Skip to content

JDK 并发机制演进

1. JDK1 时代

  • synchronized 关键字

JVM 通过 管程(Monitor) 实现,依赖于操作系统互斥锁,性能差,锁粒度大

  • wait()/notify()

基于 Object 类的线程协作方法;必须包裹在同步块内,仅能唤醒一个随机线程

java
synchronized (lock) {
    if (condition) lock.wait(); // 条件不满足时阻塞(应使用 while 检查)
    // 可能虚假唤醒后直接执行后续逻辑
}

2. JDK5 革命

JAVA 并发大神 Doug Lea 带来了 java.util.concurrent

道格李.png

  • ReentrantLock 可重入锁

替代 synchronized 的显式锁,支持公平锁、非公平锁、可中断锁、尝试获取锁超时

java
class Ticket {

    private int number = 50;
    
    //非公平锁
//    ReentrantLock lock = new ReentrantLock();
    //公平锁
    ReentrantLock lock = new ReentrantLock(true);

    public void sale() {
        lock.lock();
        try {
            if (number > 0) {
                System.out.println(Thread.currentThread().getName() + 
                        "卖出了" + (number--) + "票,剩余" + number + "票");
            }
        }finally {
            lock.unlock();
        }
    }
}
  • Condition 条件变量

与显示锁 ReentrantLock 配合使用,用来替代 wait()/notify() 机制

java
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

new Thread(()->{
    lock.lock();
    try {
        System.out.println(Thread.currentThread().getName() + "come in"); //1️⃣
        condition.await(); //调用 await() 方法后,线程阻塞,释放锁
        System.out.println(Thread.currentThread().getName() + "被唤醒"); //4️⃣
    }catch (Exception e) {
        e.printStackTrace();
    }finally {
        lock.unlock();
    }},"t1").start();

new Thread(()->{
    lock.lock();
    try {
           System.out.println(Thread.currentThread().getName()+ " come in "); //2️⃣
    condition.signal(); //调用 signal() 方法后,线程恢复,获取锁
       System.out.println(Thread.currentThread().getName() + " 唤醒"); //3️⃣
    }catch (Exception e) {
    e.printStackTrace();
   }finally {
           lock.unlock();
   }},"t2").start();
  • LockSupport

LockSupport底层原语,直接操作线程的阻塞与唤醒,用于构建高级同步工具(AQS,ReentrantLock,甚至Condition本身)

java
/**
 *  lockSupport 使用 park() 和 unpark() 来阻塞和唤醒线程
 *  park() 会消耗许可证,有则放行,没有则阻塞
 *  unpark() 会发放许可证
 *
 *  许可证不会累计,多次调用 unpark() 也只能发放一个 许可证
 */
public static void main(String[] args) {
    Thread t1 = new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + " come in");
        LockSupport.park(); // park() 与 unpark() 没有顺序要求
        System.out.println(Thread.currentThread().getName() + " 被唤醒");
    }, "t1").start();

    new Thread(()->{
        System.out.println(Thread.currentThread().getName() + " come in");
        LockSupport.unpark(t1);
        System.out.println(Thread.currentThread().getName() + " 唤醒");
    },"t2").start();
}
  • 线程池框架 (Executor Service)
java
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        // CPU 密集型 = CPU 核心数;IO 密集型 = CPU 核心数 * 2
        4,  //核心线程数(池中保持存活的最小线程数) 
        8, //最大线程数
        60, TimeUnit.SECONDS, //空闲线程存活时间
        new ArrayBlockingQueue<>(100) //任务队列容量
);

//ExecutorService threadPool = Executors.newFixedThreadPool(4);

//提交任务
threadPool.submit(() -> System.out.println("Runnable 任务"));

Future<String> future = threadPool.submit(() -> "Callable 任务返回结果");
try {
    String result = future.get(); //结果
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
}
  • 原子类

java.util.concurrent.atomic 包提供了原子操作类,基于 CPU 原子指令实现(CAS),无锁算法

  • JMM 模型重写

JDK5 彻底重写了 JAVA 内存模型(JMM)

  1. volatile 的新语义:确保变量的可见性和禁止指令重排序
  2. final 字段初始化安全:对象构造完成后,final 字段的值对所有线程可见
  3. Happens-Before 规则:明确定义多线程操作的执行顺序
java
// volatile 的正确使用示例
class SharedData {
    private volatile boolean flag = false;

    void writer() { // Thread A
        data = 42;     // 普通写入可能被重排序
        flag = true;   // volatile 写入,强制刷新内存屏障
    }

    void reader() { // Thread B
        if (flag) {  // volatile 读取,强制加载最新值
            System.out.println(data); // 保证看到 data=42
        }
    }
}

3. JDK6 的锁优化:synchronized 重生

JVM 层面通过偏向锁、轻量级锁、重量级锁等优化,对 synchronized 进行性能优化

4. JDK7 分治与动态同步

  • ForkJoinPool
java
class SumTask extends RecursiveTask<Long> {
    private final int[] array;
    private final int start;
    private final int end;
    private static final int THRESHOLD = 1000; // 当数组长度 ≤1000 时不再拆分

    SumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // 基准条件:小规模数组直接循环计算
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            // 分治步骤:拆分任务
            int mid = (start + end) / 2;
            SumTask leftTask = new SumTask(array, start, mid);
            SumTask rightTask = new SumTask(array, mid, end);

            leftTask.fork(); // 将左任务提交到队列异步执行
            long rightResult = rightTask.compute(); // 当前线程立即处理右任务
            long leftResult = leftTask.join(); // 等待左任务结果

            return leftResult + rightResult; // 合并结果
        }
    }
}

public static void main(String[] args) {
    int[] data = new int[100_0000]; // 假设是一个大型数组
    //填充数组
    for (int i = 0; i < data.length ; i++) {
        data[i] = new Random().nextInt(10);
    }

    SumTask task = new SumTask(data, 0, data.length);
    ForkJoinPool pool = new ForkJoinPool();
    long total = pool.invoke(task); // 触发并行计算
    System.out.println("总和:" + total);
}

5. JDK8 异步革命

  • CompletableFuture

函数式异步链:支持组合、异常处理、超时控制

创建异步任务

java
// 使用默认线程池
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Hello"; 
});

// 指定自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> futureWithExecutor = CompletableFuture.supplyAsync(
    () -> "Hello", executor
);

thenApply / handle 链式处理结果

java
// thenApply 链式处理具有依赖性,下一步依赖上一步的结果
// thenApply 操作遇到异常就不会往下走 / handle 遇到异常也会把流程执行完
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> "10")
    .thenApply(str -> {
        // 将字符串转换为整数
        return Integer.parseInt(str);
    })
    .thenApply(num -> {
        // 计算平方
        return num * num;
    });

result.get(); // 返回 100

thenAccept/thenRun 消费结果

java
CompletableFuture.supplyAsync(() -> "Hello")
    .thenAccept(s -> System.out.println(s + " World")); // 输出 "Hello World"

CompletableFuture.supplyAsync(() -> 42)
    .thenRun(() -> System.out.println("Done!")); // 任务完成时通知

组合多个任务

java
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B");

//合并两个任务的结果
future1.thenCombine(future2, (a, b) -> a + b)
       .thenAccept(System.out::println); // 输出 "AB"
  • 并行流 (Parallel Stream)

隐式并行化:基于 ForkJoinPool.commonPool()

  • StampedLock 邮戳锁

乐观读模式:优化读多写少场景

java
private final StampedLock lock = new StampedLock();

Object read() {
    long stamp = lock.tryOptimisticRead(); // 乐观读(不阻塞)
    Object data = loadData();              // 业务操作
    if (!lock.validate(stamp)) {           // 检查是否有写操作
        stamp = lock.readLock();           // 升级为悲观读锁
        try { data = loadData(); } 
        finally { lock.unlockRead(stamp); }
    }
    return data;
}

6. JDK9 VarHandle

VarHandle 替代 Unsafe 的合法内存操作

7. JDK21 轻量级并发革命

  • 虚拟线程 (适用于高并发 IO 操作)

虚拟线程是用户态线程,由 JVM 调度,没有内核态上下文切换,开销小,性能高

java
// 启动百万级虚拟线程(内存占用仅数GB)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    for (int i = 0; i < 1_000_000; i++) {
        executor.submit(() -> {
            Thread.sleep(Duration.ofSeconds(1)); // 挂起时自动释放线程资源
            return processRequest();
        });
    }
}
  • 结构化并发 (替代传统 Future )

强制作用域:确保子任务不会泄露到父任务之外

java
Response handleRequest() throws Exception {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Future<String> user = scope.fork(() -> queryUser());
        Future<Order> order = scope.fork(() -> queryOrder());
        
        scope.join();             // 等待所有子任务完成
        scope.throwIfFailed();    // 任一子任务失败则整体失败
        
        return new Response(user.resultNow(), order.resultNow());
    } // 作用域结束:自动取消未完成子任务
}
  • ScopedValue

虚拟线程场景下替代 ThreadLocal,作用域结束后自动清理资源

java
// 声明作用域值
private static final ScopedValue<User> CURRENT_USER = ScopedValue.newInstance();

void processRequest(User user) {
    ScopedValue.where(CURRENT_USER, user)
               .run(() -> handleRequestImpl());
}

void handleRequestImpl() {
    User user = CURRENT_USER.get(); // 仅在此作用域内有效
}