本文最后更新于 2025-11-19,文章内容可能已经过时。

WorkStealingPool 是 Java 8 引入的线程池,基于 ForkJoinPool 实现,采用"工作窃取算法"。空闲线程会从其他繁忙线程的任务队列中"窃取"任务执行,显著提高 CPU 利用率。

它特别适合处理大量细粒度任务,下面详细列举其使用场景及对应示例。

1. 并行计算密集型任务

适用场景:需要大量独立计算的场景,如数据分析、科学计算、图像处理等CPU密集型任务。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class ParallelComputationExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建WorkStealingPool,使用默认的并行度(CPU核心数)
        ExecutorService executor = Executors.newWorkStealingPool();
        
        // 模拟1000个独立计算任务
        IntStream.range(0, 1000).forEach(i -> executor.submit(() -> {
            // 模拟计算密集型任务
            double result = computeComplexFunction(i);
            System.out.println("任务 " + i + " 完成,结果: " + result + 
                              ",线程: " + Thread.currentThread().getName());
            return result;
        }));
        
        // 关闭线程池
        executor.shutdown();
        // 等待所有任务完成
        executor.awaitTermination(1, TimeUnit.HOURS);
        System.out.println("所有计算任务完成");
    }
    
    private static double computeComplexFunction(int input) {
        // 模拟复杂计算
        double result = 0;
        for (int i = 0; i < 1000000; i++) {
            result += Math.sin(input * i * 0.001);
        }
        return result;
    }
}

2. 递归任务分解(分治算法)

适用场景:可以递归分解的问题,如快速排序、归并排序、大规模数据搜索等。

import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class RecursiveTaskExample {
    static class QuickSortTask extends RecursiveTask<int[]> {
        private final int[] array;
        private final int left;
        private final int right;
        
        public QuickSortTask(int[] array, int left, int right) {
            this.array = array;
            this.left = left;
            this.right = right;
        }
        
        @Override
        protected int[] compute() {
            if (left < right) {
                int pivotIndex = partition(array, left, right);
                
                // 创建子任务
                QuickSortTask leftTask = new QuickSortTask(array, left, pivotIndex - 1);
                QuickSortTask rightTask = new QuickSortTask(array, pivotIndex + 1, right);
                
                // 异步执行左子任务
                leftTask.fork();
                // 同步执行右子任务
                rightTask.compute();
                // 等待左子任务完成
                leftTask.join();
            }
            return array;
        }
        
        private int partition(int[] arr, int low, int high) {
            int pivot = arr[high];
            int i = low - 1;
            for (int j = low; j < high; j++) {
                if (arr[j] <= pivot) {
                    i++;
                    int temp = arr[i];
                    arr[i] = arr[j];
                    arr[j] = temp;
                }
            }
            int temp = arr[i + 1];
            arr[i + 1] = arr[high];
            arr[high] = temp;
            return i + 1;
        }
    }
    
    public static void main(String[] args) {
        // 创建WorkStealingPool
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        
        // 生成一个大数组
        int[] largeArray = new int[1000000];
        for (int i = 0; i < largeArray.length; i++) {
            largeArray[i] = (int) (Math.random() * 1000000);
        }
        
        System.out.println("开始排序...");
        long startTime = System.currentTimeMillis();
        
        // 提交任务
        int[] sortedArray = pool.invoke(new QuickSortTask(largeArray, 0, largeArray.length - 1));
        
        long endTime = System.currentTimeMillis();
        System.out.println("排序完成,耗时: " + (endTime - startTime) + " 毫秒");
        
        // 验证前几个元素
        System.out.println("前10个元素: " + Arrays.toString(Arrays.copyOfRange(sortedArray, 0, 10)));
    }
}

3. 异步事件处理

适用场景:需要快速响应大量短小的异步事件,如日志处理、监控数据收集、用户行为分析等。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

public class AsyncEventProcessingExample {
    private static final AtomicLong eventsProcessed = new AtomicLong(0);
    
    public static void main(String[] args) throws InterruptedException {
        // 创建具有固定并行度的WorkStealingPool
        ExecutorService executor = Executors.newWorkStealingPool(8);
        
        System.out.println("开始处理异步事件...");
        long startTime = System.currentTimeMillis();
        
        // 模拟10000个异步事件
        for (int i = 0; i < 10000; i++) {
            int eventId = i;
            executor.submit(() -> {
                // 模拟事件处理
                processEvent(eventId);
                eventsProcessed.incrementAndGet();
                
                // 每处理1000个事件,打印进度
                if (eventsProcessed.get() % 1000 == 0) {
                    System.out.println("已处理事件数: " + eventsProcessed.get());
                }
            });
        }
        
        // 关闭线程池
        executor.shutdown();
        // 等待所有任务完成
        executor.awaitTermination(1, TimeUnit.MINUTES);
        
        long endTime = System.currentTimeMillis();
        System.out.println("所有事件处理完成,总事件数: " + eventsProcessed.get() + 
                          ",总耗时: " + (endTime - startTime) + " 毫秒");
    }
    
    private static void processEvent(int eventId) {
        // 模拟事件处理逻辑
        try {
            // 随机延迟,模拟不同的处理时间
            Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        // 实际处理逻辑
        // 例如:记录日志、更新指标、发送通知等
    }
}

4. 批量数据处理

适用场景:需要高效处理大量数据,如ETL过程、数据转换、批量导入导出等。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class BatchDataProcessingExample {
    // 模拟数据处理服务
    static class DataProcessor {
        public ProcessedData process(RawData rawData) {
            // 模拟数据处理
            try {
                Thread.sleep(50); // 模拟处理时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            
            // 实际处理逻辑
            return new ProcessedData(
                rawData.getId(),
                rawData.getContent().toUpperCase(),
                rawData.getTimestamp() + "_PROCESSED"
            );
        }
    }
    
    static class RawData {
        private final int id;
        private final String content;
        private final String timestamp;
        
        public RawData(int id, String content, String timestamp) {
            this.id = id;
            this.content = content;
            this.timestamp = timestamp;
        }
        
        public int getId() { return id; }
        public String getContent() { return content; }
        public String getTimestamp() { return timestamp; }
    }
    
    static class ProcessedData {
        private final int id;
        private final String processedContent;
        private final String processedTimestamp;
        
        public ProcessedData(int id, String processedContent, String processedTimestamp) {
            this.id = id;
            this.processedContent = processedContent;
            this.processedTimestamp = processedTimestamp;
        }
        
        @Override
        public String toString() {
            return "ProcessedData{" +
                    "id=" + id +
                    ", content='" + processedContent + '\'' +
                    ", timestamp='" + processedTimestamp + '\'' +
                    '}';
        }
    }
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 创建WorkStealingPool
        ExecutorService executor = Executors.newWorkStealingPool();
        DataProcessor processor = new DataProcessor();
        
        // 生成模拟数据
        List<RawData> rawDataList = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            rawDataList.add(new RawData(i, "data-item-" + i, "2023-06-" + (i % 30 + 1)));
        }
        
        System.out.println("开始批量处理数据,总记录数: " + rawDataList.size());
        long startTime = System.currentTimeMillis();
        
        // 提交所有任务
        List<Future<ProcessedData>> futures = rawDataList.stream()
                .map(rawData -> executor.submit(() -> processor.process(rawData)))
                .collect(Collectors.toList());
        
        // 收集结果
        List<ProcessedData> results = new ArrayList<>();
        for (Future<ProcessedData> future : futures) {
            results.add(future.get());
        }
        
        // 关闭线程池
        executor.shutdown();
        
        long endTime = System.currentTimeMillis();
        System.out.println("数据处理完成,总耗时: " + (endTime - startTime) + " 毫秒");
        System.out.println("处理结果示例 (前5条):");
        results.stream().limit(5).forEach(System.out::println);
    }
}

5. 图像/视频处理

适用场景:需要并行处理多个图像或视频帧,如图像滤镜应用、视频转码、缩略图生成等。

import java.awt.image.BufferedImage;
import java.util.concurrent.*;
import javax.imageio.ImageIO;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class ImageProcessingExample {
    // 模拟图像处理
    static class ImageProcessor {
        public BufferedImage applyFilter(BufferedImage image, String filterType) {
            // 实际应用中这里会有真正的图像处理逻辑
            System.out.println("正在应用 " + filterType + " 滤镜到图像,线程: " + 
                              Thread.currentThread().getName());
            
            // 模拟处理时间
            try {
                Thread.sleep(200 + (int)(Math.random() * 300));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            
            // 简单返回原图,实际应用中会返回处理后的图像
            return image;
        }
        
        public boolean saveImage(BufferedImage image, String outputPath) {
            try {
                ImageIO.write(image, "jpg", new File(outputPath));
                return true;
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            }
        }
    }
    
    static class ImageTask implements Callable<String> {
        private final String inputPath;
        private final String outputPath;
        private final String filterType;
        private final ImageProcessor processor;
        
        public ImageTask(String inputPath, String outputPath, String filterType, ImageProcessor processor) {
            this.inputPath = inputPath;
            this.outputPath = outputPath;
            this.filterType = filterType;
            this.processor = processor;
        }
        
        @Override
        public String call() throws Exception {
            try {
                // 读取图像
                BufferedImage image = ImageIO.read(new File(inputPath));
                
                // 应用滤镜
                BufferedImage processedImage = processor.applyFilter(image, filterType);
                
                // 保存图像
                if (processor.saveImage(processedImage, outputPath)) {
                    return "成功处理: " + inputPath + " -> " + outputPath;
                } else {
                    return "保存失败: " + outputPath;
                }
            } catch (Exception e) {
                return "处理失败: " + inputPath + " - " + e.getMessage();
            }
        }
    }
    
    public static void main(String[] args) {
        // 创建WorkStealingPool
        ExecutorService executor = Executors.newWorkStealingPool();
        ImageProcessor processor = new ImageProcessor();
        
        // 模拟需要处理的图像列表
        List<String> imageFiles = List.of(
            "/path/to/image1.jpg",
            "/path/to/image2.jpg",
            "/path/to/image3.jpg",
            "/path/to/image4.jpg",
            "/path/to/image5.jpg",
            "/path/to/image6.jpg",
            "/path/to/image7.jpg",
            "/path/to/image8.jpg"
        );
        
        System.out.println("开始并行处理图像...");
        long startTime = System.currentTimeMillis();
        
        // 为每张图像创建处理任务
        List<Future<String>> futures = new ArrayList<>();
        for (int i = 0; i < imageFiles.size(); i++) {
            String inputPath = imageFiles.get(i);
            String outputPath = "/path/to/output/image" + (i + 1) + "_filtered.jpg";
            String filterType = i % 2 == 0 ? "grayscale" : "sepia";
            
            futures.add(executor.submit(new ImageTask(inputPath, outputPath, filterType, processor)));
        }
        
        // 等待所有任务完成并获取结果
        List<String> results = futures.stream()
                .map(future -> {
                    try {
                        return future.get();
                    } catch (Exception e) {
                        return "任务执行异常: " + e.getMessage();
                    }
                })
                .collect(Collectors.toList());
        
        // 关闭线程池
        executor.shutdown();
        
        long endTime = System.currentTimeMillis();
        System.out.println("图像处理完成,总耗时: " + (endTime - startTime) + " 毫秒");
        results.forEach(System.out::println);
        
        System.out.println("注意:此示例使用了模拟路径,实际使用时需要替换为有效的图像路径");
    }
}

6. Web爬虫/数据抓取

适用场景:需要同时抓取多个网页内容,解析HTML,提取数据的场景。

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.select.Elements;

public class WebCrawlerExample {
    static class WebPageFetcher {
        public String fetchContent(String url) throws IOException {
            System.out.println("正在抓取: " + url + ",线程: " + Thread.currentThread().getName());
            // 模拟网络延迟
            try {
                Thread.sleep(300 + (int)(Math.random() * 500));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            
            // 实际使用Jsoup获取网页内容
            // Document doc = Jsoup.connect(url).get();
            // return doc.html();
            
            // 模拟返回内容
            return "<html><body><h1>模拟内容 - " + url + "</h1></body></html>";
        }
        
        public List<String> extractLinks(String htmlContent) {
            // 实际使用Jsoup解析HTML提取链接
            // Document doc = Jsoup.parse(htmlContent);
            // Elements links = doc.select("a[href]");
            // return links.stream().map(link -> link.attr("href")).collect(Collectors.toList());
            
            // 模拟返回链接
            List<String> links = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                links.add("https://example.com/page/" + (int)(Math.random() * 100));
            }
            return links;
        }
    }
    
    static class CrawlTask implements Callable<String> {
        private final String url;
        private final WebPageFetcher fetcher;
        
        public CrawlTask(String url, WebPageFetcher fetcher) {
            this.url = url;
            this.fetcher = fetcher;
        }
        
        @Override
        public String call() throws Exception {
            try {
                // 获取网页内容
                String content = fetcher.fetchContent(url);
                
                // 提取链接
                List<String> links = fetcher.extractLinks(content);
                
                return "成功抓取: " + url + ",发现链接数: " + links.size();
            } catch (Exception e) {
                return "抓取失败: " + url + " - " + e.getMessage();
            }
        }
    }
    
    public static void main(String[] args) {
        // 创建WorkStealingPool
        ExecutorService executor = Executors.newWorkStealingPool();
        WebPageFetcher fetcher = new WebPageFetcher();
        
        // 起始URL列表
        List<String> seedUrls = List.of(
            "https://example.com",
            "https://example.org",
            "https://example.net",
            "https://news.example.com",
            "https://blog.example.com",
            "https://shop.example.com",
            "https://forum.example.com",
            "https://wiki.example.com"
        );
        
        System.out.println("开始并行爬取网页...");
        long startTime = System.currentTimeMillis();
        
        // 提交所有爬取任务
        List<Future<String>> futures = new ArrayList<>();
        for (String url : seedUrls) {
            futures.add(executor.submit(new CrawlTask(url, fetcher)));
        }
        
        // 收集结果
        for (Future<String> future : futures) {
            try {
                System.out.println(future.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        // 关闭线程池
        executor.shutdown();
        
        long endTime = System.currentTimeMillis();
        System.out.println("爬取任务完成,总耗时: " + (endTime - startTime) + " 毫秒");
        System.out.println("注意:此示例使用了模拟爬取,实际应用需要处理URL去重、深度控制、robots.txt等");
    }
}

最佳实践与注意事项

  1. CPU核心数设置:默认使用Runtime.getRuntime().availableProcessors()作为并行度,可手动指定:

    Executors.newWorkStealingPool(4); // 指定4个线程
    
  2. 内存管理:WorkStealingPool使用无界队列,大量任务可能导致内存溢出,应合理控制任务数量。

  3. 异常处理:在提交的任务中添加异常处理,避免单个任务失败影响整个线程池。

  4. 资源释放:使用完成后务必调用shutdown(),避免资源泄露。

  5. 任务粒度:任务应足够细粒度但不应过小,避免线程切换开销超过任务执行时间。

  6. I/O密集型任务:对于I/O密集型任务,可能需要更多线程,考虑使用带限制的线程池。

  7. 监控与调优:监控线程池状态,根据实际应用场景调整参数。

通过以上示例和最佳实践,可以充分发挥WorkStealingPool在多核CPU环境下的并行处理能力,显著提升应用性能。