AQS(AbstractQueuedSynchronizer)作为Java并发框架的核心基础,通过统一的状态管理与CLH队列机制,使开发者能高效构建如ReentrantLock(可重入锁)、Semaphore(信号量)、CountDownLatch(倒计时器)、ReentrantReadWriteLock(读写锁)等同步组件,避免重复实现同步逻辑,显著提升高并发场景下(如库存扣减、资源限流、多线程协调、缓存读写)的代码复用性、性能与可靠性,是解决复杂并发问题的基石性框架。

AQS(AbstractQueuedSynchronizer)是Java并发编程的基石框架,通过state状态管理和CLH队列机制,为构建各种同步组件提供底层支持。

一、ReentrantLock(可重入锁)

使用场景

  • 高并发场景下的线程安全控制
  • 需要可重入性(同一个线程可以多次获取同一把锁)
  • 需要公平锁或非公平锁的选择

项目示例:电商库存扣减

public class Inventory {
    private int stock;
    private final ReentrantLock lock = new ReentrantLock(); // 使用ReentrantLock

    public boolean deduct() {
        lock.lock(); // 获取锁
        try {
            if (stock > 0) {
                stock--;
                return true;
            }
            return false;
        } finally {
            lock.unlock(); // 确保锁一定会被释放
        }
    }
    
    // 使用示例
    public static void main(String[] args) {
        Inventory inventory = new Inventory();
        inventory.stock = 100; // 初始库存100
        
        // 模拟多个线程同时扣减库存
        for (int i = 0; i < 50; i++) {
            new Thread(() -> {
                boolean success = inventory.deduct();
                System.out.println(Thread.currentThread().getName() + 
                                  " 库存扣减" + (success ? "成功" : "失败"));
            }).start();
        }
    }
}

说明

  • 在高并发场景下保证库存扣减的原子性
  • 通过lock()和unlock()确保同一时刻只有一个线程可以修改库存
  • 使用try-finally确保锁一定会被释放,避免死锁

二、Semaphore(信号量)

使用场景

  • 资源池管理(如数据库连接池、线程池)
  • 限流控制(限制同时访问资源的线程数量)
  • 控制并发访问数量

项目示例:限流器实现

public class RateLimiter {
    private final Semaphore semaphore;
    
    public RateLimiter(int permits) {
        this.semaphore = new Semaphore(permits, true); // 公平模式
    }
    
    public void acquire() throws InterruptedException {
        semaphore.acquire(); // 获取许可
    }
    
    public void release() {
        semaphore.release(); // 释放许可
    }
    
    // 使用示例:模拟限流
    public void accessResource() {
        try {
            semaphore.acquire();
            // 访问受限资源(如数据库、API等)
            System.out.println(Thread.currentThread().getName() + " 正在访问资源");
            Thread.sleep(500); // 模拟资源访问时间
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            semaphore.release();
        }
    }
    
    public static void main(String[] args) {
        RateLimiter limiter = new RateLimiter(3); // 最多3个并发
        
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                limiter.accessResource();
            }).start();
        }
    }
}

说明

  • 创建Semaphore时指定许可数量(如3个许可)
  • 每个线程调用acquire()获取许可,如果没有可用许可则阻塞
  • 操作完成后调用release()释放许可
  • 公平模式(true)确保线程按FIFO顺序获取许可

三、CountDownLatch(倒计时器)

使用场景

  • 多线程任务协调(等待所有线程完成)
  • 主线程等待多个子线程完成后再继续执行
  • 任务初始化阶段等待多个组件启动完成

项目示例:多线程数据加载

public class DataLoader {
    private final CountDownLatch latch;
    
    public DataLoader(int threadCount) {
        this.latch = new CountDownLatch(threadCount);
    }
    
    public void loadData() {
        // 启动多个数据加载线程
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    // 模拟数据加载过程
                    Thread.sleep(1000 + (int)(Math.random() * 500));
                    System.out.println(Thread.currentThread().getName() + " 数据加载完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown(); // 计数器减1
                }
            }).start();
        }
    }
    
    public void waitForAllDataLoaded() throws InterruptedException {
        latch.await(); // 等待所有数据加载完成
        System.out.println("所有数据加载完成,可以继续处理");
    }
    
    public static void main(String[] args) throws InterruptedException {
        DataLoader loader = new DataLoader(5);
        loader.loadData();
        loader.waitForAllDataLoaded();
        System.out.println("开始处理数据...");
    }
}

说明

  • CountDownLatch初始化时指定计数器值(如5个线程)
  • 每个线程完成工作后调用countDown()减少计数
  • 主线程调用await()等待计数器归零

四、ReentrantReadWriteLock(读写锁)

使用场景

  • 读多写少的并发访问场景
  • 读操作频繁,写操作较少的场景
  • 需要提高读操作的并发性能

项目示例:缓存系统

public class Cache {
    private final Map<String, Object> cache = new HashMap<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();
    
    public Object get(String key) {
        readLock.lock(); // 获取读锁
        try {
            return cache.get(key);
        } finally {
            readLock.unlock();
        }
    }
    
    public void put(String key, Object value) {
        writeLock.lock(); // 获取写锁
        try {
            cache.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }
    
    public void remove(String key) {
        writeLock.lock();
        try {
            cache.remove(key);
        } finally {
            writeLock.unlock();
        }
    }
    
    // 使用示例
    public static void main(String[] args) {
        Cache cache = new Cache();
        
        // 模拟读操作线程
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    cache.get("key" + j);
                }
            }).start();
        }
        
        // 模拟写操作线程
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                cache.put("key" + i, "value" + i);
            }
        }).start();
    }
}

说明

  • 读锁(readLock)是共享的,允许多个线程同时读
  • 写锁(writeLock)是独占的,同一时间只能有一个线程写
  • 读操作不会阻塞其他读操作,但会阻塞写操作
  • 写操作会阻塞所有读操作和写操作

五、自定义同步器

使用场景

  • 需要实现特定的同步逻辑
  • 需要定制化同步行为
  • 无法直接使用现有同步器满足需求

项目示例:简易互斥锁

public class MyMutex {
    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int acquires) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        
        @Override
        protected boolean tryRelease(int releases) {
            if (getState() == 0)
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }
    
    private final Sync sync = new Sync();
    
    public void lock() {
        sync.acquire(1);
    }
    
    public void unlock() {
        sync.release(1);
    }
    
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
    
    // 使用示例
    public static void main(String[] args) {
        MyMutex mutex = new MyMutex();
        
        // 模拟多个线程竞争锁
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                mutex.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " 获取到锁");
                    Thread.sleep(1000); // 模拟操作
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    mutex.unlock();
                }
            }).start();
        }
    }
}

说明

  • 通过继承AQS并重写tryAcquire和tryRelease方法实现同步逻辑
  • 重写tryAcquire:尝试获取资源,成功返回true,失败返回false
  • 重写tryRelease:尝试释放资源,成功返回true,失败返回false
  • 通过acquire和release方法调用AQS的模板方法

六、AQS在实际项目中的价值总结

  1. 代码复用性:AQS提供了一套标准的同步框架,避免了重复实现同步逻辑
  2. 性能优化:通过CLH队列和CAS操作,实现了高效的线程调度和状态管理
  3. 灵活性:支持独占模式和共享模式,满足不同场景的同步需求
  4. 可扩展性:开发者可以基于AQS轻松实现自定义同步器
  5. 可维护性:将同步逻辑与业务逻辑分离,提高了代码的可读性和可维护性

在实际项目开发中,正确理解和使用AQS能够有效解决多线程并发问题,提高系统的稳定性和性能。通过以上示例,你可以根据具体业务需求选择合适的AQS实现,或者基于AQS实现自定义的同步器。