多线程之生产者消费者模型
本文最后更新于 2025-06-06,文章内容可能已经过时。
生产者-消费者模型是多线程编程中的经典问题,用于解决多个线程之间协作和资源共享的问题。在 Java 中,我们可以使用多种方式实现该模型,包括
synchronized
、wait/notify
、ReentrantLock
以及BlockingQueue
等机制。
0、模型简介
生产者(Producer):负责生成数据并放入共享资源中。
消费者(Consumer):从共享资源中取出数据进行处理。
共享资源(Buffer / Queue):用于存放生产者生产的资源,供消费者消费。
以下随机采用JAVA语言做示例
一、使用 synchronized
+ wait()
/ notifyAll()
这是最基础的实现方式,使用 Java 内置的同步机制来控制线程之间的协作。
package com.nn3n.thread;
public class Buffer {
private int value;
private boolean isEmpty = true;
// 生产者方法
public synchronized void put(int value) {
//虚假唤醒(Spurious Wakeup):
//问题:线程可能在没有被 notify() 或 notifyAll() 的情况下被唤醒(如操作系统调度器导致)。
//解决:使用 while 循环而非 if 检查条件(如 while (!isEmpty))。
while (!isEmpty) {
try {
// 缓冲区满,等待消费者消费
wait(); // 必须在 synchronized 块中调用 wait(),否则抛出 IllegalMonitorStateException
} catch (InterruptedException e) {
//中断处理:
//问题:wait() 和 sleep() 可能被中断,导致线程提前退出。
//解决:捕获 InterruptedException 并重新设置中断状态(Thread.currentThread().interrupt())
Thread.currentThread().interrupt(); // 保留中断状态
return;
}
}
this.value = value;
isEmpty = false;
System.out.println("Produced: " + value);
//唤醒策略:
//问题:notify() 可能唤醒错误的线程(如唤醒另一个生产者而非消费者)。
//解决:使用 notifyAll() 确保所有等待线程有机会竞争锁。
notifyAll(); // 唤醒所有等待线程(生产者或消费者)
}
// 消费者方法
public synchronized int take() {
while (isEmpty) {
try {
// 缓冲区空,等待生产者生产
wait(); // 必须在 synchronized 块中调用 wait()
} catch (InterruptedException e) {
//中断处理:
//问题:wait() 和 sleep() 可能被中断,导致线程提前退出。
//解决:捕获 InterruptedException 并重新设置中断状态(Thread.currentThread().interrupt())
Thread.currentThread().interrupt(); // 保留中断状态
return -1;
}
}
isEmpty = true;
System.out.println("Consumed: " + value);
//唤醒策略:
//问题:notify() 可能唤醒错误的线程(如唤醒另一个生产者而非消费者)。
//解决:使用 notifyAll() 确保所有等待线程有机会竞争锁。
notifyAll(); // 唤醒所有等待线程
return value;
}
}
package com.nn3n.thread;
public class Producer implements Runnable {
private final Buffer buffer;
public Producer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
for (int i = 1; i <= 5; i++) {
buffer.put(i);
try {
Thread.sleep(500); // 模拟生产时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
package com.nn3n.thread;
public class Consumer implements Runnable {
private final Buffer buffer;
public Consumer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
for (int i = 1; i <= 5; i++) {
int take = buffer.take();
System.out.println("take = " + take);
try {
Thread.sleep(800); // 模拟消费时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
package com.nn3n.thread;
public class ProducerConsumerTest {
public static void main(String[] args) {
Buffer buffer = new Buffer();
Thread producerThread = new Thread(new Producer(buffer), "Producer");
Thread consumerThread = new Thread(new Consumer(buffer), "Consumer");
producerThread.start();
consumerThread.start();
}
}
Produced: 1
Consumed: 1
take = 1
Produced: 2
Consumed: 2
take = 2
Produced: 3
Consumed: 3
take = 3
Produced: 4
Consumed: 4
take = 4
Produced: 5
Consumed: 5
take = 5
二、使用 ReentrantLock
和 Condition
更灵活的锁机制,可以指定唤醒特定的线程。
package com.nn3n.thread;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Buffer {
private Integer value;
private boolean isEmpty = true;
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public void put(Integer value) {
lock.lock();
try {
while (!isEmpty) {
notFull.await();
}
this.value = value;
isEmpty = false;
//signal() vs signalAll():
//当前代码使用 signal(),仅唤醒一个等待线程。
//如果存在多个生产者或消费者线程,可能需要使用 signalAll() 来确保所有相关线程有机会被唤醒。
notEmpty.signalAll();
System.out.println("Produced: " + value);
} catch (InterruptedException e) {
//中断处理:
//问题:await() 和 sleep() 可能被中断。
//解决:捕获 InterruptedException 并处理中断状态。
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public Integer take() {
lock.lock();
try {
while (isEmpty) {
notEmpty.await();
}
int val = this.value;
isEmpty = true;
notFull.signalAll();
System.out.println("Consumed: " + val);
return val;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
lock.unlock();
}
}
}
package com.nn3n.thread;
public class Producer implements Runnable {
private final Buffer buffer;
public Producer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
for (int i = 1; i <= 5; i++) {
buffer.put(i);
try {
Thread.sleep(500); // 模拟生产时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
package com.nn3n.thread;
public class Consumer implements Runnable {
private final Buffer buffer;
public Consumer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
for (int i = 1; i <= 5; i++) {
int take = buffer.take();
System.out.println("take = " + take);
try {
Thread.sleep(800); // 模拟消费时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
package com.nn3n.thread;
public class ProducerConsumerTest {
public static void main(String[] args) {
Buffer buffer = new Buffer();
Thread producerThread = new Thread(new Producer(buffer), "Producer");
Thread consumerThread = new Thread(new Consumer(buffer), "Consumer");
producerThread.start();
consumerThread.start();
}
}
Produced: 1
Consumed: 1
take = 1
Produced: 2
Consumed: 2
take = 2
Produced: 3
Consumed: 3
take = 3
Produced: 4
Consumed: 4
take = 4
Produced: 5
Consumed: 5
take = 5
三、使用 BlockingQueue
(推荐)
Java 提供了线程安全的阻塞队列接口 java.util.concurrent.BlockingQueue
,例如 ArrayBlockingQueue
、LinkedBlockingQueue
,非常适合用来实现生产者-消费者模型。
package com.nn3n.thread;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueProducer implements Runnable {
private final BlockingQueue<Integer> queue;
public BlockingQueueProducer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 1; i <= 5; i++) {
//阻塞行为:
//问题:put() 和 take() 会阻塞线程,可能导致资源浪费。
//解决:使用带超时的 offer() 和 poll() 方法(如 queue.offer(value, timeout, unit))。
queue.put(i);
System.out.println("Produced: " + i);
Thread.sleep(500);
}
} catch (InterruptedException e) {
//异常处理:
//问题:put() 和 take() 方法可能抛出 InterruptedException。
//解决:捕获异常并处理中断状态。
Thread.currentThread().interrupt();
}
}
}
package com.nn3n.thread2;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueConsumer implements Runnable {
private final BlockingQueue<Integer> queue;
public BlockingQueueConsumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 1; i <= 5; i++) {
Integer value = queue.take();
System.out.println("Consumed: " + value);
Thread.sleep(800);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
package com.nn3n.thread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueTest {
public static void main(String[] args) {
//队列容量限制:
//问题:如果生产者速度远快于消费者,可能导致队列溢出。
//解决:设置合理容量(如 new LinkedBlockingQueue<>(10))。
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
new Thread(new BlockingQueueProducer(queue), "Producer").start();
new Thread(new BlockingQueueConsumer(queue), "Consumer").start();
}
}
Consumed: 1
Produced: 1
Produced: 2
Consumed: 2
Produced: 3
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Consumed: 5
四、总结对比
实现方式 | 是否需要手动同步 | 是否支持超时 | 灵活性 | 推荐程度 |
---|---|---|---|---|
synchronized + wait/notify | 是 | 否 | 低 | ⭐⭐ |
ReentrantLock + Condition | 是 | 是 | 高 | ⭐⭐⭐ |
BlockingQueue | 否 | 是 | 高 | ⭐⭐⭐⭐⭐ |
实现方式 | 是否线程安全 | 是否推荐 | 适用场景 |
---|---|---|---|
synchronized + wait/notify | ✅ | ⭐⭐ | 学习基础同步机制 |
ReentrantLock + Condition | ✅ | ⭐⭐⭐ | 需要更细粒度的锁控制 |
BlockingQueue | ✅ | ⭐⭐⭐⭐⭐ | 推荐生产环境使用 |
- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 软件从业者Hort
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果