Java并发设计模式
本文全面系统地解析了Java并发编程中的五大核心设计模式——生产者-消费者模式、读写锁模式、线程本地存储(ThreadLocal)模式、Guarded Suspension模式与线程池模式,每种模式均深入阐述其设计原理、典型应用场景、可运行的完整代码示例(含异常处理与边界防护)、关键注意事项及常见陷阱,并辅以文本版模式选择决策指南与工程级黄金实践总结(如优先使用JUC工具类、防范内存泄漏、线程命名规范、监控可观测性等),强调“简单优于复杂”的设计哲学,同时前瞻性指出Java 21+虚拟线程等现代特性对传统并发模型的演进影响,为开发者提供从理论认知到生产落地的全链路并发解决方案,助力构建高效、安全、可维护的高并发系统。
一、并发设计模式概述
并发设计模式是专为解决多线程环境下的协作、同步、资源共享等问题而设计的模板化解决方案。它们结合锁机制、线程通信和资源管理技术,帮助开发者构建高效、安全、可维护的并发程序。
二、生产者-消费者模式
📌 模式原理
通过阻塞队列解耦数据生产与消费过程:生产者将数据放入队列,消费者从队列取出处理。自动平衡生产/消费速度,避免资源浪费或饥饿。
🌐 适用场景
- 消息队列系统(Kafka/RabbitMQ底层原理)
- Web服务器请求处理(Tomcat线程池)
- 日志异步收集、数据管道处理
- 任务调度系统(如定时任务分发)
💻 完整代码示例(含优雅终止)
package com.nn3n;
import java.util.concurrent.*;
public class AdvancedProducerConsumer {
private static final int QUEUE_CAPACITY = 10;
private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
// 生产者
static class Producer implements Runnable {
private final String name;
private final int itemsToProduce;
Producer(String name, int items) {
this.name = name;
this.itemsToProduce = items;
}
@Override
public void run() {
try {
for (int i = 0; i < itemsToProduce; i++) {
queue.put(i);
System.out.printf("[%s] Produced: %d | Queue size: %d%n",
name, i, queue.size());
Thread.sleep(100); // 模拟生产耗时
}
queue.put(-1); // 发送结束信号
System.out.println("[" + name + "] Production completed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("[" + name + "] Interrupted");
}
}
}
// 消费者
static class Consumer implements Runnable {
private final String name;
Consumer(String name) { this.name = name; }
@Override
public void run() {
try {
while (true) {
Integer item = queue.take();
if (item == -1) { // 检测结束信号
System.out.println("[" + name + "] Consumption terminated");
queue.put(-1); // 传递毒丸给其他消费者
break;
}
System.out.printf("[%s] Consumed: %d | Queue size: %d%n",
name, item, queue.size());
Thread.sleep(150); // 模拟消费耗时
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("[" + name + "] Interrupted");
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
// 启动2个生产者,3个消费者
executor.submit(new Producer("Producer-1", 15));
executor.submit(new Producer("Producer-2", 10));
executor.submit(new Consumer("Consumer-1"));
executor.submit(new Consumer("Consumer-2"));
executor.submit(new Consumer("Consumer-3"));
executor.shutdown();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
System.out.println("\n=== All tasks completed ===");
}
}
⚠️ 关键注意事项
- 队列容量:根据系统负载合理设置,避免OOM或生产者长时间阻塞
- 毒丸机制:优雅终止多消费者场景(示例中使用-1作为终止信号)
- 异常处理:必须捕获InterruptedException并恢复中断状态
- 替代方案:高吞吐场景可考虑Disruptor无锁队列
三、读写锁模式(Read-Write Lock)
📌 模式原理
ReentrantReadWriteLock提供读共享、写独占机制:允许多线程并发读,写操作需独占锁。显著提升读多写少场景的吞吐量。
🌐 适用场景
- 缓存系统(如本地缓存更新)
- 配置管理中心(频繁读取,偶发更新)
- 统计指标收集(多线程上报,定时聚合)
- 金融行情数据(高频读取,低频更新)
💻 完整代码示例(含锁降级)
package com.nn3n;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class CacheWithReadWriteLock {
private final Map<String, String> cache = new HashMap<>();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();
// 读操作(高并发安全)
public String get(String key) {
readLock.lock();
try {
return cache.get(key);
} finally {
readLock.unlock();
}
}
// 写操作(独占)
public void put(String key, String value) {
writeLock.lock();
try {
cache.put(key, value);
System.out.println("Updated: " + key + " = " + value);
} finally {
writeLock.unlock();
}
}
// 锁降级示例:先写后读(避免脏读)
public String getWithUpdate(String key, String newValue) {
writeLock.lock();
try {
cache.put(key, newValue);
// 降级为读锁:先获取读锁,再释放写锁
readLock.lock();
} finally {
writeLock.unlock(); // 释放写锁,保留读锁
}
try {
return cache.get(key); // 安全读取刚写入的值
} finally {
readLock.unlock();
}
}
// 模拟高并发场景
public static void main(String[] args) throws InterruptedException {
CacheWithReadWriteLock cache = new CacheWithReadWriteLock();
cache.put("config", "v1.0");
// 启动10个读线程 + 2个写线程
Runnable reader = () -> {
String val = cache.get("config");
System.out.println(Thread.currentThread().getName() + " read: " + val);
};
Runnable writer = () -> {
cache.put("config", "v2.0");
};
Thread[] threads = new Thread[12];
for (int i = 0; i < 10; i++) threads[i] = new Thread(reader, "Reader-" + i);
for (int i = 10; i < 12; i++) threads[i] = new Thread(writer, "Writer-" + (i-9));
for (Thread t : threads) t.start();
for (Thread t : threads) t.join();
}
}
⚠️ 关键注意事项
- 锁降级:写锁 → 读锁可行,反向会导致死锁
- 公平策略:new ReentrantReadWriteLock(true)可避免写线程饥饿
- 避免嵌套:不要在持有读锁时尝试获取写锁
- 性能权衡:写操作频繁时,性能可能低于synchronized
四、线程本地存储模式(ThreadLocal)
📌 模式原理
为每个线程提供独立变量副本,实现线程间数据隔离,避免同步开销。底层通过Thread内部的ThreadLocalMap实现。
🌐 适用场景
- Web应用:存储用户会话(UserContext)、事务ID(TraceId)
- 数据库连接:每个线程独占Connection(如MyBatis)
- SimpleDateFormat线程安全封装(避免创建开销)
- 跨方法链路传递上下文(如日志链路追踪)
💻 完整代码示例(含内存泄漏防护)
package com.nn3n;
public class SafeThreadLocalExample {
// 推荐:使用静态final + initialValue初始化
private static final ThreadLocal<UserContext> contextHolder =
ThreadLocal.withInitial(() -> new UserContext("default", "guest"));
// 用户上下文类(不可变对象更安全)
static class UserContext {
private final String userId;
private final String role;
UserContext(String userId, String role) {
this.userId = userId;
this.role = role;
}
@Override public String toString() {
return "UserContext{userId='" + userId + "', role='" + role + "'}";
}
}
// 设置上下文(通常在请求入口调用)
public static void setContext(String userId, String role) {
contextHolder.set(new UserContext(userId, role));
}
// 获取上下文
public static UserContext getContext() {
return contextHolder.get();
}
// 【关键】清理资源(在请求出口调用!)
public static void clearContext() {
contextHolder.remove(); // 防止内存泄漏
}
// 模拟Web请求处理流程
public static void handleRequest(String userId, String role) {
try {
setContext(userId, role);
System.out.println(Thread.currentThread().getName() +
" processing with: " + getContext());
// 模拟业务处理(跨多层方法调用仍可获取上下文)
serviceLayer();
daoLayer();
} finally {
clearContext(); // 必须在finally中清理!
}
}
private static void serviceLayer() {
System.out.println(" Service layer: " + getContext());
}
private static void daoLayer() {
System.out.println(" DAO layer: " + getContext());
}
public static void main(String[] args) {
// 模拟3个并发请求
for (int i = 1; i <= 3; i++) {
final int reqId = i;
new Thread(() ->
handleRequest("user-" + reqId, "role-" + reqId),
"Request-Thread-" + reqId
).start();
}
}
}
⚠️ 关键注意事项(必读!)
| 问题 | 解决方案 |
|---|---|
| 内存泄漏 | 线程池中必须调用remove()(示例中finally块) |
| 子线程继承 | 使用InheritableThreadLocal(慎用,有坑) |
| 线程复用污染 | Web容器线程池中,每次请求必须清理 |
| 过度使用 | 仅用于真正需要线程隔离的场景,避免滥用 |
五、Guarded Suspension模式(保护性暂挂)
📌 模式原理
当条件不满足时挂起线程,条件满足时唤醒。本质是wait()/notify()的经典应用,解决“等待-通知”协作问题。
🌐 适用场景
- 任务队列为空时消费者等待
- 资源池无可用资源时请求线程等待
- 异步结果未就绪时调用方等待(Future.get()底层)
- 事件驱动系统中的条件触发
💻 完整代码示例(含超时与虚假唤醒防护)
package com.nn3n;
import java.util.LinkedList;
import java.util.Queue;
public class GuardedTaskQueue {
private final Queue<String> taskQueue = new LinkedList<>();
private final int capacity;
public GuardedTaskQueue(int capacity) {
this.capacity = capacity;
}
// 提交任务(队列满时等待)
public synchronized void submit(String task) throws InterruptedException {
// 【关键】while循环防止虚假唤醒
while (taskQueue.size() >= capacity) {
System.out.println(Thread.currentThread().getName() +
" waiting: queue full");
wait(); // 释放锁并等待
}
taskQueue.offer(task);
System.out.println(Thread.currentThread().getName() +
" submitted: " + task + " | Size: " + taskQueue.size());
notifyAll(); // 唤醒等待的消费者
}
// 获取任务(队列空时等待,带超时)
public synchronized String take(long timeout) throws InterruptedException {
long startTime = System.currentTimeMillis();
long remaining = timeout;
while (taskQueue.isEmpty()) {
if (remaining <= 0) {
System.out.println(Thread.currentThread().getName() + " timeout!");
return null; // 超时返回
}
System.out.println(Thread.currentThread().getName() +
" waiting: queue empty");
wait(remaining); // 带超时等待
remaining = timeout - (System.currentTimeMillis() - startTime);
}
String task = taskQueue.poll();
System.out.println(Thread.currentThread().getName() +
" took: " + task + " | Size: " + taskQueue.size());
notifyAll(); // 唤醒等待的生产者
return task;
}
// 模拟生产者-消费者协作
public static void main(String[] args) {
GuardedTaskQueue queue = new GuardedTaskQueue(3);
// 生产者线程
Runnable producer = () -> {
for (int i = 1; i <= 5; i++) {
try {
queue.submit("Task-" + i);
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
// 消费者线程(带超时)
Runnable consumer = () -> {
for (int i = 0; i < 5; i++) {
try {
String task = queue.take(2000); // 2秒超时
if (task != null) Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
new Thread(producer, "Producer").start();
new Thread(consumer, "Consumer-1").start();
new Thread(consumer, "Consumer-2").start();
}
}
⚠️ 关键注意事项
- 必须用while检查条件:防止虚假唤醒(spurious wakeup)
- 使用notifyAll():避免信号丢失(多个等待线程时)
- 超时机制:避免永久阻塞(示例中take方法)
- 锁对象选择:使用明确的对象锁,避免this锁污染
六、线程池模式(Executor Framework)
📌 模式原理
复用线程资源,避免频繁创建/销毁线程的开销。通过任务队列、核心/最大线程数、拒绝策略等参数精细控制并发行为。
🌐 适用场景
- Web服务器请求处理(Tomcat Connector)
- 异步任务(邮件发送、日志写入)
- 并行计算(大数据分片处理)
- 定时任务调度(ScheduledExecutorService)
💻 完整代码示例(含监控与拒绝策略)
package com.nn3n;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class AdvancedThreadPoolDemo {
public static void main(String[] args) throws InterruptedException {
// 自定义线程工厂(命名规范+监控)
ThreadFactory namedFactory = new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "MyPool-Thread-" + count.getAndIncrement());
t.setDaemon(true); // 设置为守护线程
return t;
}
};
// 自定义拒绝策略:记录日志后由调用者线程执行
RejectedExecutionHandler handler = (r, executor) -> {
System.err.println("Task rejected: " + r.toString() +
" | Active: " + ((ThreadPoolExecutor)executor).getActiveCount());
r.run(); // 降级处理:由提交线程执行
};
// 创建精细化配置的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3), // 有界队列(防OOM)
namedFactory,
handler
);
// 提交任务并监控
for (int i = 1; i <= 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.printf("[%s] Start Task-%d | PoolSize: %d, Active: %d, Queue: %d%n",
Thread.currentThread().getName(),
taskId,
executor.getPoolSize(),
executor.getActiveCount(),
executor.getQueue().size());
try { Thread.sleep(500); }
catch (InterruptedException e) { Thread.currentThread().interrupt(); }
System.out.printf("[%s] Finish Task-%d%n",
Thread.currentThread().getName(), taskId);
});
}
// 关闭流程(优雅停机)
executor.shutdown();
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
System.err.println("Force shutdown!");
executor.shutdownNow();
}
System.out.println("\n=== Thread pool terminated ===");
System.out.printf("Completed Tasks: %d, Rejected: %d%n",
executor.getCompletedTaskCount(), 0); // 实际拒绝数需自定义统计
}
}
⚠️ 关键注意事项
| 项目 | 建议 |
|---|---|
| 线程池大小 | CPU密集型:N+1;IO密集型:2N(N=CPU核心数) |
| 队列选择 | 有界队列(防OOM)+ 拒绝策略;避免无界队列 |
| 拒绝策略 | 自定义策略记录监控指标(示例中降级处理) |
| 线程命名 | 必须自定义命名(便于日志排查) |
| 资源清理 | finally中关闭(示例中shutdown流程) |
| 禁止使用 | Executors.newFixedThreadPool(队列无界风险) |
七、模式选择决策树
并发设计模式选择指南:
-
是否需要为每个线程提供独立的数据副本?
→ 是:选择 ThreadLocal模式
(场景:用户上下文传递、数据库连接隔离、SimpleDateFormat封装)
→ 否:进入下一步 -
是否需要解耦数据生产与消费过程?
→ 是:选择 生产者-消费者模式
(场景:消息队列、日志异步收集、任务管道处理)
→ 否:进入下一步 -
是否存在共享资源且读操作远多于写操作?
→ 是:选择 读写锁模式
(场景:配置缓存、统计指标、金融行情数据)
→ 否:进入下一步 -
是否需要线程在条件不满足时挂起等待?
→ 是:选择 Guarded Suspension模式
(场景:任务队列空时等待、异步结果就绪等待、资源池获取)
→ 否:进入下一步 -
是否需要高效复用线程资源处理大量短时任务?
→ 是:选择 线程池模式
(场景:Web请求处理、异步任务调度、并行计算)
💡 补充说明:
- 实际系统常组合使用多种模式(如线程池内部结合生产者-消费者+Guarded Suspension)
- 优先考虑JDK并发工具类(BlockingQueue、ExecutorService等)而非手写底层同步
- 选择时需综合评估:线程安全需求、吞吐量要求、资源消耗、可维护性
- 新项目可评估Java 21+虚拟线程(Virtual Threads)对传统模式的简化替代
八、黄金实践总结
-
优先使用JUC工具类
java.util.concurrent包已封装成熟模式(如BlockingQueue、ExecutorService),避免手写wait/notify -
内存泄漏防控
- ThreadLocal:线程池中必须remove()
- 线程池:避免无界队列,设置合理拒绝策略
-
监控与可观测性
- 线程池:监控活跃线程数、队列大小、拒绝次数
- 锁:使用Lock.getQueueLength()诊断阻塞
-
避免常见陷阱
- 死锁:按固定顺序获取锁
- 活锁:引入随机退避
- 饥饿:公平锁慎用(性能代价高)
-
现代替代方案
- CompletableFuture:链式异步编程
- 虚拟线程(Java 21+):极大简化高并发模型
核心思想:并发设计模式需结合业务场景、数据特性、性能要求综合选择。始终牢记——简单优于复杂,明确优于隐晦。在保证正确性的前提下,优先选择JDK提供的成熟组件,而非重复造轮子。
- 感谢你赐予我前进的力量

