本文最后更新于 2026-01-08,文章内容可能已经过时。

Java JUC(java.util.concurrent)包通过线程池(ExecutorService)、高级锁(ReentrantLock/ReadWriteLock)、同步器(CountDownLatch/CyclicBarrier)、并发集合(ConcurrentHashMap/BlockingQueue)、原子变量(AtomicInteger/LongAdder)及异步工具(CompletableFuture/ForkJoinPool)等核心组件,为高并发场景提供高效、安全的并发编程解决方案。它解决了传统synchronized的性能瓶颈和复杂性问题,适用于线程池管理、资源竞争控制、多线程协调、高并发数据结构及异步任务处理等场景,显著提升系统吞吐量与可维护性,是构建高性能Java并发应用的基石。

Java JUC (Java Util Concurrent) 是Java标准库中用于并发编程的核心工具包,位于java.util.concurrent包及其子包内。它提供了丰富的并发工具类和框架,旨在简化多线程编程,提高程序的并发性能和线程安全性。以下将详细介绍JUC的各个功能模块,包括适用场景和代码示例。

一、Executor框架:线程池管理

1.1 核心概念

JUC的基石,将任务提交与执行解耦,提供了线程生命周期管理和任务调度功能。

1.2 主要实现类

  • ThreadPoolExecutor:可自定义参数的线程池
  • Executors:工厂类,提供常用线程池的便捷创建方法

1.3 线程池类型及适用场景

线程池类型创建方式适用场景优缺点
固定线程池Executors.newFixedThreadPool(n)负载稳定的任务,如Web服务器处理请求优点:线程数固定,资源可控;缺点:无法应对突发流量
缓存线程池Executors.newCachedThreadPool()短任务、突发流量场景,如HTTP请求处理优点:动态创建线程,高效处理短任务;缺点:线程数无上限,可能OOM
单线程池Executors.newSingleThreadExecutor()需要按序执行的任务,如日志写入优点:保证任务顺序执行;缺点:无法并行处理
定时/周期任务池Executors.newScheduledThreadPool(n)定时任务、周期性任务,如定时清理、定时报告优点:支持schedule和scheduleAtFixedRate;缺点:适合短任务,长任务可能导致线程堆积

1.4 代码示例

// 创建固定大小线程池(5个核心线程)
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
    final int taskId = i;
    fixedPool.execute(() -> {
        System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName());
        try {
            Thread.sleep(1000); // 模拟工作
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}
fixedPool.shutdown(); // 优雅关闭

1.5 注意事项

  • 避免使用无界队列(如new LinkedBlockingQueue()),可能导致OOM
  • 推荐使用有界队列 + 拒绝策略(如AbortPolicy)
  • 线程池大小建议:CPU密集型任务 = 核心数;I/O密集型任务 = 核心数 × 2

二、锁机制:超越synchronized

2.1 ReentrantLock

功能:可重入锁,支持公平/非公平、超时获取、可中断等特性。

适用场景

  • 需要控制锁的公平性:new ReentrantLock(true)
  • 需要尝试获取锁:tryLock()
  • 需要可中断的锁获取:lockInterruptibly()
  • 需要条件等待:Condition
public class ReentrantLockExample {
    private int count = 0;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    public void increment() {
        lock.lock();
        try {
            count++;
            System.out.println("Count: " + count);
            condition.signal(); // 唤醒等待线程
        } finally {
            lock.unlock();
        }
    }

    public void waitForCount(int target) throws InterruptedException {
        lock.lock();
        try {
            while (count < target) {
                condition.await(); // 等待条件满足
            }
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ReentrantLockExample example = new ReentrantLockExample();
        
        // 线程1:增加计数
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                example.increment();
            }
        }).start();
        
        // 线程2:等待计数达到5
        new Thread(() -> {
            try {
                example.waitForCount(5);
                System.out.println("Count reached 5!");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

2.2 ReadWriteLock

功能:读写锁,允许多个读线程同时访问,但写操作独占。

适用场景:读多写少的场景,如缓存系统、配置管理。

public class ReadWriteLockExample {
    private final Map<String, String> cache = new HashMap<>();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    
    public String get(String key) {
        lock.readLock().lock();
        try {
            return cache.get(key);
        } finally {
            lock.readLock().unlock();
        }
    }
    
    public void put(String key, String value) {
        lock.writeLock().lock();
        try {
            cache.put(key, value);
        } finally {
            lock.writeLock().unlock();
        }
    }
}

2.3 StampedLock

功能:带版本戳的锁,结合了乐观读锁、悲观读锁和写锁。

适用场景:高并发读多写少的场景,性能优于ReadWriteLock。

public class StampedLockExample {
    private final StampedLock lock = new StampedLock();
    private int value;
    
    public int read() {
        long stamp = lock.tryOptimisticRead(); // 乐观读
        int result = value;
        if (!lock.validate(stamp)) { // 检查是否被修改
            stamp = lock.readLock(); // 悲观读
            try {
                result = value;
            } finally {
                lock.unlockRead(stamp);
            }
        }
        return result;
    }
    
    public void write(int newValue) {
        long stamp = lock.writeLock();
        try {
            value = newValue;
        } finally {
            lock.unlockWrite(stamp);
        }
    }
}

三、同步器(Synchronizers)

3.1 CountDownLatch

功能:允许一个或多个线程等待其他一组线程完成操作。

适用场景:多线程初始化、批量任务完成等待。

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        for (int i = 0; i < 3; i++) {
            executor.submit(() -> {
                System.out.println("Task started");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Task completed");
                latch.countDown(); // 减少计数
            });
        }
        
        latch.await(); // 等待所有任务完成
        System.out.println("All tasks completed");
        executor.shutdown();
    }
}

3.2 CyclicBarrier

功能:让一组线程相互等待,到达一个公共屏障点后继续执行。

适用场景:并行计算、多阶段任务。

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int parties = 3;
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
            System.out.println("All threads have reached the barrier, starting next phase");
        });
        
        for (int i = 0; i < parties; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " is waiting at barrier");
                    barrier.await(); // 等待其他线程到达
                    System.out.println(Thread.currentThread().getName() + " passed the barrier");
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

3.3 Semaphore

功能:控制同时访问某一资源的线程数量,类似信号量。

适用场景:限制并发资源访问,如数据库连接池、线程池、停车场管理。

public class SemaphoreExample {
    public static void main(String[] args) {
        // 限制同时最多2个线程访问资源
        Semaphore semaphore = new Semaphore(2);
        
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                    System.out.println(Thread.currentThread().getName() + " is using the resource");
                    Thread.sleep(2000); // 模拟使用资源
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release(); // 释放许可
                    System.out.println(Thread.currentThread().getName() + " released the resource");
                }
            }, "Thread-" + i).start();
        }
    }
}

3.4 Exchanger

功能:严格2个线程之间交换数据对象,避免共享队列。

适用场景:双线程数据交换,如生产者-消费者缓冲区交换。

public class ExchangerExample {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        
        new Thread(() -> {
            try {
                String data = "Data from Thread-A";
                String received = exchanger.exchange(data);
                System.out.println("Thread-A received: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
        
        new Thread(() -> {
            try {
                String data = "Data from Thread-B";
                String received = exchanger.exchange(data);
                System.out.println("Thread-B received: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

3.5 Phaser

功能:灵活的同步屏障,支持动态注册和注销参与者。

适用场景:动态参与者数量的同步场景。

public class PhaserExample {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3); // 3个参与者
        
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " is waiting");
                phaser.arriveAndAwaitAdvance(); // 等待所有线程到达
                System.out.println(Thread.currentThread().getName() + " passed the barrier");
            }, "Thread-" + i).start();
        }
        
        // 动态添加参与者
        phaser.register();
        new Thread(() -> {
            System.out.println("Thread-3 is waiting");
            phaser.arriveAndAwaitAdvance();
            System.out.println("Thread-3 passed the barrier");
        }).start();
    }
}

四、并发集合(Concurrent Collections)

4.1 ConcurrentHashMap

功能:高性能线程安全的Map实现,采用分段锁机制。

适用场景:高并发读写Map,如缓存、配置管理。

public class ConcurrentHashMapExample {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        
        // 多线程写入
        ExecutorService executor = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executor.submit(() -> {
                map.put("key-" + index, index);
            });
        }
        
        executor.shutdown();
        try {
            executor.awaitTermination(1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        // 读取数据
        System.out.println("Map size: " + map.size());
        System.out.println("Value for key-5: " + map.get("key-5"));
    }
}

4.2 CopyOnWriteArrayList

功能:写时复制的List,读操作不加锁,写操作复制整个数组。

适用场景:读多写少的List,如事件监听器列表。

public class CopyOnWriteArrayListExample {
    public static void main(String[] args) {
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        
        // 读操作(无需锁)
        new Thread(() -> {
            for (int i = 0; i < list.size(); i++) {
                System.out.println("Reading: " + list.get(i));
            }
        }).start();
        
        // 写操作(复制数组)
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                list.add("Item " + i);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }
}

4.3 BlockingQueue

功能:线程安全的队列,支持阻塞操作(put/take)。

适用场景:生产者-消费者模式,线程间通信。

public class BlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new LinkedBlockingQueue<>(10); // 有界队列
        
        // 生产者
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    queue.put("Item " + i);
                    System.out.println("Produced: Item " + i);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
        
        // 消费者
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    String item = queue.take();
                    System.out.println("Consumed: " + item);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }
}

4.4 ConcurrentLinkedQueue

功能:无锁的队列实现,适用于高并发场景。

适用场景:高并发生产者-消费者模式,如日志处理。

public class ConcurrentLinkedQueueExample {
    public static void main(String[] args) {
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        
        // 生产者
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                queue.add("Item " + i);
                System.out.println("Produced: Item " + i);
            }
        }).start();
        
        // 消费者
        new Thread(() -> {
            while (true) {
                String item = queue.poll();
                if (item != null) {
                    System.out.println("Consumed: " + item);
                } else {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }).start();
    }
}

五、原子变量(Atomic Variables)

5.1 AtomicInteger/AtomicLong

功能:提供原子整数/长整数操作,避免锁的开销。

适用场景:高并发计数器,如请求计数、ID生成器。

public class AtomicExample {
    public static void main(String[] args) {
        AtomicInteger counter = new AtomicInteger(0);
        
        // 多线程增加计数
        ExecutorService executor = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 1000; i++) {
            executor.submit(() -> {
                counter.incrementAndGet();
            });
        }
        
        executor.shutdown();
        try {
            executor.awaitTermination(1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("Final count: " + counter.get());
    }
}

5.2 AtomicReference

功能:对引用类型进行原子更新。

适用场景:原子更新对象引用,如状态管理。

public class AtomicReferenceExample {
    public static void main(String[] args) {
        AtomicReference<String> reference = new AtomicReference<>("Initial");
        
        // 原子更新
        boolean success = reference.compareAndSet("Initial", "Updated");
        System.out.println("Update successful: " + success);
        System.out.println("Current value: " + reference.get());
    }
}

5.3 AtomicStampedReference

功能:带版本戳的原子引用,解决ABA问题。

适用场景:需要版本控制的引用更新。

public class AtomicStampedReferenceExample {
    public static void main(String[] args) {
        AtomicStampedReference<String> reference = new AtomicStampedReference<>("Initial", 0);
        
        // 模拟ABA问题
        boolean success = reference.compareAndSet("Initial", "New", 0, 1);
        System.out.println("First update: " + success);
        
        // 恢复为初始值
        reference.compareAndSet("New", "Initial", 1, 2);
        
        // 尝试再次更新,因为版本不同而失败
        success = reference.compareAndSet("Initial", "Final", 2, 3);
        System.out.println("Second update: " + success);
        
        System.out.println("Current value: " + reference.getReference());
        System.out.println("Current stamp: " + reference.getStamp());
    }
}

六、高级并发工具

6.1 CompletableFuture

功能:异步编程,支持链式调用,处理非阻塞I/O。

适用场景:异步任务处理、多任务组合、回调处理。

public class CompletableFutureExample {
    public static void main(String[] args) {
        // 异步任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Hello";
        });
        
        // 链式调用
        CompletableFuture<String> result = future
            .thenApply(s -> s + " World")
            .thenApply(s -> s + " from CompletableFuture");
        
        // 处理结果
        result.thenAccept(System.out::println); // 输出: Hello World from CompletableFuture
        
        // 等待完成
        result.join();
    }
}

6.2 ForkJoinPool

功能:工作窃取算法的线程池,适用于递归任务。

适用场景:分治算法,如归并排序、并行计算。

public class ForkJoinPoolExample {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        
        // 任务:计算1到1000的和
        SumTask task = new SumTask(1, 1000);
        int result = pool.invoke(task);
        
        System.out.println("Sum: " + result);
    }
    
    static class SumTask extends RecursiveTask<Integer> {
        private final int start;
        private final int end;
        
        public SumTask(int start, int end) {
            this.start = start;
            this.end = end;
        }
        
        @Override
        protected Integer compute() {
            if (end - start <= 100) {
                // 基本情况:直接计算
                int sum = 0;
                for (int i = start; i <= end; i++) {
                    sum += i;
                }
                return sum;
            } else {
                // 分治:拆分任务
                int mid = (start + end) / 2;
                SumTask leftTask = new SumTask(start, mid);
                SumTask rightTask = new SumTask(mid + 1, end);
                
                leftTask.fork(); // 异步执行左子任务
                int rightResult = rightTask.compute(); // 同步执行右子任务
                int leftResult = leftTask.join(); // 等待左子任务完成
                
                return leftResult + rightResult;
            }
        }
    }
}

6.3 LongAdder

功能:高并发下性能更好的计数器,采用分段锁机制。

适用场景:高并发计数场景,如日志统计、请求计数。

public class LongAdderExample {
    public static void main(String[] args) {
        LongAdder counter = new LongAdder();
        
        ExecutorService executor = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 1000; i++) {
            executor.submit(() -> {
                for (int j = 0; j < 100; j++) {
                    counter.increment();
                }
            });
        }
        
        executor.shutdown();
        try {
            executor.awaitTermination(1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("Final count: " + counter.sum());
    }
}

七、JUC最佳实践

  1. 避免死锁:使用tryLock()和超时机制
  2. 性能调优
    • CPU密集型任务:线程数 = 核心数
    • I/O密集型任务:线程数 = 核心数 × 2
  3. 资源管理:使用try-finally确保锁和资源正确释放
  4. 错误处理:在并发操作中处理InterruptedException
  5. 避免无界队列:使用有界队列+拒绝策略
  6. 选择合适工具
    • 高并发计数:LongAdder > AtomicInteger
    • 读多写少:ConcurrentHashMap > Collections.synchronizedMap
    • 任务分治:ForkJoinPool > ThreadPoolExecutor

八、总结

Java JUC包提供了全面的并发编程工具,涵盖了从线程池管理、锁机制到高级同步器和并发集合的各个方面。正确使用这些工具可以显著提高程序的并发性能和可维护性,同时减少死锁、竞态条件等并发问题。

理解JUC的核心组件和适用场景是编写高效、安全并发程序的关键。在实际开发中,应根据具体业务需求选择合适的并发工具,避免过度设计或简单问题复杂化。

JUC的设计目标是提供更高级别的并发原语,减少因直接使用低级别并发机制(如synchronized关键字、wait()和notify()方法)带来的复杂性和风险,使并发编程更加简单、高效和安全。