Java JUC包功能
本文最后更新于 2026-01-08,文章内容可能已经过时。
Java JUC(java.util.concurrent)包通过线程池(ExecutorService)、高级锁(ReentrantLock/ReadWriteLock)、同步器(CountDownLatch/CyclicBarrier)、并发集合(ConcurrentHashMap/BlockingQueue)、原子变量(AtomicInteger/LongAdder)及异步工具(CompletableFuture/ForkJoinPool)等核心组件,为高并发场景提供高效、安全的并发编程解决方案。它解决了传统synchronized的性能瓶颈和复杂性问题,适用于线程池管理、资源竞争控制、多线程协调、高并发数据结构及异步任务处理等场景,显著提升系统吞吐量与可维护性,是构建高性能Java并发应用的基石。
Java JUC (Java Util Concurrent) 是Java标准库中用于并发编程的核心工具包,位于java.util.concurrent包及其子包内。它提供了丰富的并发工具类和框架,旨在简化多线程编程,提高程序的并发性能和线程安全性。以下将详细介绍JUC的各个功能模块,包括适用场景和代码示例。
一、Executor框架:线程池管理
1.1 核心概念
JUC的基石,将任务提交与执行解耦,提供了线程生命周期管理和任务调度功能。
1.2 主要实现类
- ThreadPoolExecutor:可自定义参数的线程池
- Executors:工厂类,提供常用线程池的便捷创建方法
1.3 线程池类型及适用场景
| 线程池类型 | 创建方式 | 适用场景 | 优缺点 |
|---|---|---|---|
| 固定线程池 | Executors.newFixedThreadPool(n) | 负载稳定的任务,如Web服务器处理请求 | 优点:线程数固定,资源可控;缺点:无法应对突发流量 |
| 缓存线程池 | Executors.newCachedThreadPool() | 短任务、突发流量场景,如HTTP请求处理 | 优点:动态创建线程,高效处理短任务;缺点:线程数无上限,可能OOM |
| 单线程池 | Executors.newSingleThreadExecutor() | 需要按序执行的任务,如日志写入 | 优点:保证任务顺序执行;缺点:无法并行处理 |
| 定时/周期任务池 | Executors.newScheduledThreadPool(n) | 定时任务、周期性任务,如定时清理、定时报告 | 优点:支持schedule和scheduleAtFixedRate;缺点:适合短任务,长任务可能导致线程堆积 |
1.4 代码示例
// 创建固定大小线程池(5个核心线程)
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedPool.execute(() -> {
System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟工作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
fixedPool.shutdown(); // 优雅关闭
1.5 注意事项
- 避免使用无界队列(如new LinkedBlockingQueue()),可能导致OOM
- 推荐使用有界队列 + 拒绝策略(如AbortPolicy)
- 线程池大小建议:CPU密集型任务 = 核心数;I/O密集型任务 = 核心数 × 2
二、锁机制:超越synchronized
2.1 ReentrantLock
功能:可重入锁,支持公平/非公平、超时获取、可中断等特性。
适用场景:
- 需要控制锁的公平性:new ReentrantLock(true)
- 需要尝试获取锁:tryLock()
- 需要可中断的锁获取:lockInterruptibly()
- 需要条件等待:Condition
public class ReentrantLockExample {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try {
count++;
System.out.println("Count: " + count);
condition.signal(); // 唤醒等待线程
} finally {
lock.unlock();
}
}
public void waitForCount(int target) throws InterruptedException {
lock.lock();
try {
while (count < target) {
condition.await(); // 等待条件满足
}
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantLockExample example = new ReentrantLockExample();
// 线程1:增加计数
new Thread(() -> {
for (int i = 0; i < 5; i++) {
example.increment();
}
}).start();
// 线程2:等待计数达到5
new Thread(() -> {
try {
example.waitForCount(5);
System.out.println("Count reached 5!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
2.2 ReadWriteLock
功能:读写锁,允许多个读线程同时访问,但写操作独占。
适用场景:读多写少的场景,如缓存系统、配置管理。
public class ReadWriteLockExample {
private final Map<String, String> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
public String get(String key) {
lock.readLock().lock();
try {
return cache.get(key);
} finally {
lock.readLock().unlock();
}
}
public void put(String key, String value) {
lock.writeLock().lock();
try {
cache.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
}
2.3 StampedLock
功能:带版本戳的锁,结合了乐观读锁、悲观读锁和写锁。
适用场景:高并发读多写少的场景,性能优于ReadWriteLock。
public class StampedLockExample {
private final StampedLock lock = new StampedLock();
private int value;
public int read() {
long stamp = lock.tryOptimisticRead(); // 乐观读
int result = value;
if (!lock.validate(stamp)) { // 检查是否被修改
stamp = lock.readLock(); // 悲观读
try {
result = value;
} finally {
lock.unlockRead(stamp);
}
}
return result;
}
public void write(int newValue) {
long stamp = lock.writeLock();
try {
value = newValue;
} finally {
lock.unlockWrite(stamp);
}
}
}
三、同步器(Synchronizers)
3.1 CountDownLatch
功能:允许一个或多个线程等待其他一组线程完成操作。
适用场景:多线程初始化、批量任务完成等待。
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
System.out.println("Task started");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Task completed");
latch.countDown(); // 减少计数
});
}
latch.await(); // 等待所有任务完成
System.out.println("All tasks completed");
executor.shutdown();
}
}
3.2 CyclicBarrier
功能:让一组线程相互等待,到达一个公共屏障点后继续执行。
适用场景:并行计算、多阶段任务。
public class CyclicBarrierExample {
public static void main(String[] args) {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println("All threads have reached the barrier, starting next phase");
});
for (int i = 0; i < parties; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " is waiting at barrier");
barrier.await(); // 等待其他线程到达
System.out.println(Thread.currentThread().getName() + " passed the barrier");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
3.3 Semaphore
功能:控制同时访问某一资源的线程数量,类似信号量。
适用场景:限制并发资源访问,如数据库连接池、线程池、停车场管理。
public class SemaphoreExample {
public static void main(String[] args) {
// 限制同时最多2个线程访问资源
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println(Thread.currentThread().getName() + " is using the resource");
Thread.sleep(2000); // 模拟使用资源
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
System.out.println(Thread.currentThread().getName() + " released the resource");
}
}, "Thread-" + i).start();
}
}
}
3.4 Exchanger
功能:严格2个线程之间交换数据对象,避免共享队列。
适用场景:双线程数据交换,如生产者-消费者缓冲区交换。
public class ExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
String data = "Data from Thread-A";
String received = exchanger.exchange(data);
System.out.println("Thread-A received: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
new Thread(() -> {
try {
String data = "Data from Thread-B";
String received = exchanger.exchange(data);
System.out.println("Thread-B received: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
3.5 Phaser
功能:灵活的同步屏障,支持动态注册和注销参与者。
适用场景:动态参与者数量的同步场景。
public class PhaserExample {
public static void main(String[] args) {
Phaser phaser = new Phaser(3); // 3个参与者
for (int i = 0; i < 3; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " is waiting");
phaser.arriveAndAwaitAdvance(); // 等待所有线程到达
System.out.println(Thread.currentThread().getName() + " passed the barrier");
}, "Thread-" + i).start();
}
// 动态添加参与者
phaser.register();
new Thread(() -> {
System.out.println("Thread-3 is waiting");
phaser.arriveAndAwaitAdvance();
System.out.println("Thread-3 passed the barrier");
}).start();
}
}
四、并发集合(Concurrent Collections)
4.1 ConcurrentHashMap
功能:高性能线程安全的Map实现,采用分段锁机制。
适用场景:高并发读写Map,如缓存、配置管理。
public class ConcurrentHashMapExample {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 多线程写入
ExecutorService executor = Executors.newFixedThreadPool(4);
for (int i = 0; i < 10; i++) {
final int index = i;
executor.submit(() -> {
map.put("key-" + index, index);
});
}
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 读取数据
System.out.println("Map size: " + map.size());
System.out.println("Value for key-5: " + map.get("key-5"));
}
}
4.2 CopyOnWriteArrayList
功能:写时复制的List,读操作不加锁,写操作复制整个数组。
适用场景:读多写少的List,如事件监听器列表。
public class CopyOnWriteArrayListExample {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// 读操作(无需锁)
new Thread(() -> {
for (int i = 0; i < list.size(); i++) {
System.out.println("Reading: " + list.get(i));
}
}).start();
// 写操作(复制数组)
new Thread(() -> {
for (int i = 0; i < 10; i++) {
list.add("Item " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}
4.3 BlockingQueue
功能:线程安全的队列,支持阻塞操作(put/take)。
适用场景:生产者-消费者模式,线程间通信。
public class BlockingQueueExample {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<>(10); // 有界队列
// 生产者
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
queue.put("Item " + i);
System.out.println("Produced: Item " + i);
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
// 消费者
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
String item = queue.take();
System.out.println("Consumed: " + item);
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}
4.4 ConcurrentLinkedQueue
功能:无锁的队列实现,适用于高并发场景。
适用场景:高并发生产者-消费者模式,如日志处理。
public class ConcurrentLinkedQueueExample {
public static void main(String[] args) {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
// 生产者
new Thread(() -> {
for (int i = 0; i < 10; i++) {
queue.add("Item " + i);
System.out.println("Produced: Item " + i);
}
}).start();
// 消费者
new Thread(() -> {
while (true) {
String item = queue.poll();
if (item != null) {
System.out.println("Consumed: " + item);
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}).start();
}
}
五、原子变量(Atomic Variables)
5.1 AtomicInteger/AtomicLong
功能:提供原子整数/长整数操作,避免锁的开销。
适用场景:高并发计数器,如请求计数、ID生成器。
public class AtomicExample {
public static void main(String[] args) {
AtomicInteger counter = new AtomicInteger(0);
// 多线程增加计数
ExecutorService executor = Executors.newFixedThreadPool(4);
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
counter.incrementAndGet();
});
}
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Final count: " + counter.get());
}
}
5.2 AtomicReference
功能:对引用类型进行原子更新。
适用场景:原子更新对象引用,如状态管理。
public class AtomicReferenceExample {
public static void main(String[] args) {
AtomicReference<String> reference = new AtomicReference<>("Initial");
// 原子更新
boolean success = reference.compareAndSet("Initial", "Updated");
System.out.println("Update successful: " + success);
System.out.println("Current value: " + reference.get());
}
}
5.3 AtomicStampedReference
功能:带版本戳的原子引用,解决ABA问题。
适用场景:需要版本控制的引用更新。
public class AtomicStampedReferenceExample {
public static void main(String[] args) {
AtomicStampedReference<String> reference = new AtomicStampedReference<>("Initial", 0);
// 模拟ABA问题
boolean success = reference.compareAndSet("Initial", "New", 0, 1);
System.out.println("First update: " + success);
// 恢复为初始值
reference.compareAndSet("New", "Initial", 1, 2);
// 尝试再次更新,因为版本不同而失败
success = reference.compareAndSet("Initial", "Final", 2, 3);
System.out.println("Second update: " + success);
System.out.println("Current value: " + reference.getReference());
System.out.println("Current stamp: " + reference.getStamp());
}
}
六、高级并发工具
6.1 CompletableFuture
功能:异步编程,支持链式调用,处理非阻塞I/O。
适用场景:异步任务处理、多任务组合、回调处理。
public class CompletableFutureExample {
public static void main(String[] args) {
// 异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello";
});
// 链式调用
CompletableFuture<String> result = future
.thenApply(s -> s + " World")
.thenApply(s -> s + " from CompletableFuture");
// 处理结果
result.thenAccept(System.out::println); // 输出: Hello World from CompletableFuture
// 等待完成
result.join();
}
}
6.2 ForkJoinPool
功能:工作窃取算法的线程池,适用于递归任务。
适用场景:分治算法,如归并排序、并行计算。
public class ForkJoinPoolExample {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
// 任务:计算1到1000的和
SumTask task = new SumTask(1, 1000);
int result = pool.invoke(task);
System.out.println("Sum: " + result);
}
static class SumTask extends RecursiveTask<Integer> {
private final int start;
private final int end;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= 100) {
// 基本情况:直接计算
int sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
// 分治:拆分任务
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(start, mid);
SumTask rightTask = new SumTask(mid + 1, end);
leftTask.fork(); // 异步执行左子任务
int rightResult = rightTask.compute(); // 同步执行右子任务
int leftResult = leftTask.join(); // 等待左子任务完成
return leftResult + rightResult;
}
}
}
}
6.3 LongAdder
功能:高并发下性能更好的计数器,采用分段锁机制。
适用场景:高并发计数场景,如日志统计、请求计数。
public class LongAdderExample {
public static void main(String[] args) {
LongAdder counter = new LongAdder();
ExecutorService executor = Executors.newFixedThreadPool(4);
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
for (int j = 0; j < 100; j++) {
counter.increment();
}
});
}
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Final count: " + counter.sum());
}
}
七、JUC最佳实践
- 避免死锁:使用tryLock()和超时机制
- 性能调优:
- CPU密集型任务:线程数 = 核心数
- I/O密集型任务:线程数 = 核心数 × 2
- 资源管理:使用try-finally确保锁和资源正确释放
- 错误处理:在并发操作中处理InterruptedException
- 避免无界队列:使用有界队列+拒绝策略
- 选择合适工具:
- 高并发计数:LongAdder > AtomicInteger
- 读多写少:ConcurrentHashMap > Collections.synchronizedMap
- 任务分治:ForkJoinPool > ThreadPoolExecutor
八、总结
Java JUC包提供了全面的并发编程工具,涵盖了从线程池管理、锁机制到高级同步器和并发集合的各个方面。正确使用这些工具可以显著提高程序的并发性能和可维护性,同时减少死锁、竞态条件等并发问题。
理解JUC的核心组件和适用场景是编写高效、安全并发程序的关键。在实际开发中,应根据具体业务需求选择合适的并发工具,避免过度设计或简单问题复杂化。
JUC的设计目标是提供更高级别的并发原语,减少因直接使用低级别并发机制(如synchronized关键字、wait()和notify()方法)带来的复杂性和风险,使并发编程更加简单、高效和安全。
- 感谢你赐予我前进的力量

