本文最后更新于 2025-06-06,文章内容可能已经过时。

生产者-消费者模型是多线程编程中的经典问题,用于解决多个线程之间协作和资源共享的问题。在 Java 中,我们可以使用多种方式实现该模型,包括 synchronizedwait/notifyReentrantLock 以及 BlockingQueue 等机制。

0、模型简介

生产者(Producer):负责生成数据并放入共享资源中。

消费者(Consumer):从共享资源中取出数据进行处理。

共享资源(Buffer / Queue):用于存放生产者生产的资源,供消费者消费。

以下随机采用JAVA语言做示例

一、使用 synchronized + wait() / notifyAll()

这是最基础的实现方式,使用 Java 内置的同步机制来控制线程之间的协作。

package com.nn3n.thread;

public class Buffer {

    private int value;
    private boolean isEmpty = true;

    // 生产者方法
    public synchronized void put(int value) {
        //虚假唤醒(Spurious Wakeup):
        //问题:线程可能在没有被 notify() 或 notifyAll() 的情况下被唤醒(如操作系统调度器导致)。
        //解决:使用 while 循环而非 if 检查条件(如 while (!isEmpty))。
        while (!isEmpty) {
            try {
                // 缓冲区满,等待消费者消费
                wait(); // 必须在 synchronized 块中调用 wait(),否则抛出 IllegalMonitorStateException
            } catch (InterruptedException e) {
                //中断处理:
                //问题:wait() 和 sleep() 可能被中断,导致线程提前退出。
                //解决:捕获 InterruptedException 并重新设置中断状态(Thread.currentThread().interrupt())
                Thread.currentThread().interrupt(); // 保留中断状态
                return;
            }
        }
        this.value = value;
        isEmpty = false;
        System.out.println("Produced: " + value);
        //唤醒策略:
        //问题:notify() 可能唤醒错误的线程(如唤醒另一个生产者而非消费者)。
        //解决:使用 notifyAll() 确保所有等待线程有机会竞争锁。
        notifyAll(); // 唤醒所有等待线程(生产者或消费者)
    }

    // 消费者方法
    public synchronized int take() {
        while (isEmpty) {
            try {
                // 缓冲区空,等待生产者生产
                wait(); // 必须在 synchronized 块中调用 wait()
            } catch (InterruptedException e) {
                //中断处理:
                //问题:wait() 和 sleep() 可能被中断,导致线程提前退出。
                //解决:捕获 InterruptedException 并重新设置中断状态(Thread.currentThread().interrupt())
                Thread.currentThread().interrupt(); // 保留中断状态
                return -1;
            }
        }
        isEmpty = true;
        System.out.println("Consumed: " + value);
        //唤醒策略:
        //问题:notify() 可能唤醒错误的线程(如唤醒另一个生产者而非消费者)。
        //解决:使用 notifyAll() 确保所有等待线程有机会竞争锁。
        notifyAll(); // 唤醒所有等待线程
        return value;
    }
}
package com.nn3n.thread;

public class Producer implements Runnable {

    private final Buffer buffer;

    public Producer(Buffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            buffer.put(i);
            try {
                Thread.sleep(500); // 模拟生产时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
package com.nn3n.thread;

public class Consumer implements Runnable {

    private final Buffer buffer;

    public Consumer(Buffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            int take = buffer.take();
            System.out.println("take = " + take);
            try {
                Thread.sleep(800); // 模拟消费时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
package com.nn3n.thread;

public class ProducerConsumerTest {

    public static void main(String[] args) {
        Buffer buffer = new Buffer();

        Thread producerThread = new Thread(new Producer(buffer), "Producer");
        Thread consumerThread = new Thread(new Consumer(buffer), "Consumer");

        producerThread.start();
        consumerThread.start();
    }
}
Produced: 1
Consumed: 1
take = 1
Produced: 2
Consumed: 2
take = 2
Produced: 3
Consumed: 3
take = 3
Produced: 4
Consumed: 4
take = 4
Produced: 5
Consumed: 5
take = 5

二、使用 ReentrantLockCondition

更灵活的锁机制,可以指定唤醒特定的线程。

package com.nn3n.thread;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Buffer {

    private Integer value;
    private boolean isEmpty = true;
    private final Lock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();

    public void put(Integer value) {
        lock.lock();
        try {
            while (!isEmpty) {
                notFull.await();
            }
            this.value = value;
            isEmpty = false;
            //signal() vs signalAll():
            //当前代码使用 signal(),仅唤醒一个等待线程。
            //如果存在多个生产者或消费者线程,可能需要使用 signalAll() 来确保所有相关线程有机会被唤醒。
            notEmpty.signalAll();
            System.out.println("Produced: " + value);
        } catch (InterruptedException e) {
            //中断处理:
            //问题:await() 和 sleep() 可能被中断。
            //解决:捕获 InterruptedException 并处理中断状态。
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }

    public Integer take() {
        lock.lock();
        try {
            while (isEmpty) {
                notEmpty.await();
            }
            int val = this.value;
            isEmpty = true;
            notFull.signalAll();
            System.out.println("Consumed: " + val);
            return val;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            lock.unlock();
        }
    }
}
package com.nn3n.thread;

public class Producer implements Runnable {

    private final Buffer buffer;

    public Producer(Buffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            buffer.put(i);
            try {
                Thread.sleep(500); // 模拟生产时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
package com.nn3n.thread;

public class Consumer implements Runnable {

    private final Buffer buffer;

    public Consumer(Buffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            int take = buffer.take();
            System.out.println("take = " + take);
            try {
                Thread.sleep(800); // 模拟消费时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
package com.nn3n.thread;

public class ProducerConsumerTest {

    public static void main(String[] args) {
        Buffer buffer = new Buffer();

        Thread producerThread = new Thread(new Producer(buffer), "Producer");
        Thread consumerThread = new Thread(new Consumer(buffer), "Consumer");

        producerThread.start();
        consumerThread.start();
    }
}
Produced: 1
Consumed: 1
take = 1
Produced: 2
Consumed: 2
take = 2
Produced: 3
Consumed: 3
take = 3
Produced: 4
Consumed: 4
take = 4
Produced: 5
Consumed: 5
take = 5

三、使用 BlockingQueue(推荐)

Java 提供了线程安全的阻塞队列接口 java.util.concurrent.BlockingQueue,例如 ArrayBlockingQueueLinkedBlockingQueue,非常适合用来实现生产者-消费者模型。

package com.nn3n.thread;

import java.util.concurrent.BlockingQueue;

public class BlockingQueueProducer implements Runnable {

    private final BlockingQueue<Integer> queue;

    public BlockingQueueProducer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 1; i <= 5; i++) {
                //阻塞行为:
                //问题:put() 和 take() 会阻塞线程,可能导致资源浪费。
                //解决:使用带超时的 offer() 和 poll() 方法(如 queue.offer(value, timeout, unit))。
                queue.put(i);
                System.out.println("Produced: " + i);
                Thread.sleep(500);
            }
        } catch (InterruptedException e) {
            //异常处理:
            //问题:put() 和 take() 方法可能抛出 InterruptedException。
            //解决:捕获异常并处理中断状态。
            Thread.currentThread().interrupt();
        }
    }
}
package com.nn3n.thread2;

import java.util.concurrent.BlockingQueue;

public class BlockingQueueConsumer implements Runnable {

    private final BlockingQueue<Integer> queue;

    public BlockingQueueConsumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 1; i <= 5; i++) {
                Integer value = queue.take();
                System.out.println("Consumed: " + value);
                Thread.sleep(800);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
package com.nn3n.thread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueTest {

    public static void main(String[] args) {
        //队列容量限制:
        //问题:如果生产者速度远快于消费者,可能导致队列溢出。
        //解决:设置合理容量(如 new LinkedBlockingQueue<>(10))。
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

        new Thread(new BlockingQueueProducer(queue), "Producer").start();
        new Thread(new BlockingQueueConsumer(queue), "Consumer").start();
    }
}
Consumed: 1
Produced: 1
Produced: 2
Consumed: 2
Produced: 3
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Consumed: 5

四、总结对比

实现方式是否需要手动同步是否支持超时灵活性推荐程度
synchronized + wait/notify⭐⭐
ReentrantLock + Condition⭐⭐⭐
BlockingQueue⭐⭐⭐⭐⭐
实现方式是否线程安全是否推荐适用场景
synchronized + wait/notify⭐⭐学习基础同步机制
ReentrantLock + Condition⭐⭐⭐需要更细粒度的锁控制
BlockingQueue⭐⭐⭐⭐⭐推荐生产环境使用