Java WorkStealingPool
本文最后更新于 2025-11-19,文章内容可能已经过时。
WorkStealingPool是 Java 8 引入的线程池,基于 ForkJoinPool 实现,采用"工作窃取算法"。空闲线程会从其他繁忙线程的任务队列中"窃取"任务执行,显著提高 CPU 利用率。它特别适合处理大量细粒度任务,下面详细列举其使用场景及对应示例。
1. 并行计算密集型任务
适用场景:需要大量独立计算的场景,如数据分析、科学计算、图像处理等CPU密集型任务。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class ParallelComputationExample {
public static void main(String[] args) throws InterruptedException {
// 创建WorkStealingPool,使用默认的并行度(CPU核心数)
ExecutorService executor = Executors.newWorkStealingPool();
// 模拟1000个独立计算任务
IntStream.range(0, 1000).forEach(i -> executor.submit(() -> {
// 模拟计算密集型任务
double result = computeComplexFunction(i);
System.out.println("任务 " + i + " 完成,结果: " + result +
",线程: " + Thread.currentThread().getName());
return result;
}));
// 关闭线程池
executor.shutdown();
// 等待所有任务完成
executor.awaitTermination(1, TimeUnit.HOURS);
System.out.println("所有计算任务完成");
}
private static double computeComplexFunction(int input) {
// 模拟复杂计算
double result = 0;
for (int i = 0; i < 1000000; i++) {
result += Math.sin(input * i * 0.001);
}
return result;
}
}
2. 递归任务分解(分治算法)
适用场景:可以递归分解的问题,如快速排序、归并排序、大规模数据搜索等。
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class RecursiveTaskExample {
static class QuickSortTask extends RecursiveTask<int[]> {
private final int[] array;
private final int left;
private final int right;
public QuickSortTask(int[] array, int left, int right) {
this.array = array;
this.left = left;
this.right = right;
}
@Override
protected int[] compute() {
if (left < right) {
int pivotIndex = partition(array, left, right);
// 创建子任务
QuickSortTask leftTask = new QuickSortTask(array, left, pivotIndex - 1);
QuickSortTask rightTask = new QuickSortTask(array, pivotIndex + 1, right);
// 异步执行左子任务
leftTask.fork();
// 同步执行右子任务
rightTask.compute();
// 等待左子任务完成
leftTask.join();
}
return array;
}
private int partition(int[] arr, int low, int high) {
int pivot = arr[high];
int i = low - 1;
for (int j = low; j < high; j++) {
if (arr[j] <= pivot) {
i++;
int temp = arr[i];
arr[i] = arr[j];
arr[j] = temp;
}
}
int temp = arr[i + 1];
arr[i + 1] = arr[high];
arr[high] = temp;
return i + 1;
}
}
public static void main(String[] args) {
// 创建WorkStealingPool
ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
// 生成一个大数组
int[] largeArray = new int[1000000];
for (int i = 0; i < largeArray.length; i++) {
largeArray[i] = (int) (Math.random() * 1000000);
}
System.out.println("开始排序...");
long startTime = System.currentTimeMillis();
// 提交任务
int[] sortedArray = pool.invoke(new QuickSortTask(largeArray, 0, largeArray.length - 1));
long endTime = System.currentTimeMillis();
System.out.println("排序完成,耗时: " + (endTime - startTime) + " 毫秒");
// 验证前几个元素
System.out.println("前10个元素: " + Arrays.toString(Arrays.copyOfRange(sortedArray, 0, 10)));
}
}
3. 异步事件处理
适用场景:需要快速响应大量短小的异步事件,如日志处理、监控数据收集、用户行为分析等。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
public class AsyncEventProcessingExample {
private static final AtomicLong eventsProcessed = new AtomicLong(0);
public static void main(String[] args) throws InterruptedException {
// 创建具有固定并行度的WorkStealingPool
ExecutorService executor = Executors.newWorkStealingPool(8);
System.out.println("开始处理异步事件...");
long startTime = System.currentTimeMillis();
// 模拟10000个异步事件
for (int i = 0; i < 10000; i++) {
int eventId = i;
executor.submit(() -> {
// 模拟事件处理
processEvent(eventId);
eventsProcessed.incrementAndGet();
// 每处理1000个事件,打印进度
if (eventsProcessed.get() % 1000 == 0) {
System.out.println("已处理事件数: " + eventsProcessed.get());
}
});
}
// 关闭线程池
executor.shutdown();
// 等待所有任务完成
executor.awaitTermination(1, TimeUnit.MINUTES);
long endTime = System.currentTimeMillis();
System.out.println("所有事件处理完成,总事件数: " + eventsProcessed.get() +
",总耗时: " + (endTime - startTime) + " 毫秒");
}
private static void processEvent(int eventId) {
// 模拟事件处理逻辑
try {
// 随机延迟,模拟不同的处理时间
Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 实际处理逻辑
// 例如:记录日志、更新指标、发送通知等
}
}
4. 批量数据处理
适用场景:需要高效处理大量数据,如ETL过程、数据转换、批量导入导出等。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class BatchDataProcessingExample {
// 模拟数据处理服务
static class DataProcessor {
public ProcessedData process(RawData rawData) {
// 模拟数据处理
try {
Thread.sleep(50); // 模拟处理时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 实际处理逻辑
return new ProcessedData(
rawData.getId(),
rawData.getContent().toUpperCase(),
rawData.getTimestamp() + "_PROCESSED"
);
}
}
static class RawData {
private final int id;
private final String content;
private final String timestamp;
public RawData(int id, String content, String timestamp) {
this.id = id;
this.content = content;
this.timestamp = timestamp;
}
public int getId() { return id; }
public String getContent() { return content; }
public String getTimestamp() { return timestamp; }
}
static class ProcessedData {
private final int id;
private final String processedContent;
private final String processedTimestamp;
public ProcessedData(int id, String processedContent, String processedTimestamp) {
this.id = id;
this.processedContent = processedContent;
this.processedTimestamp = processedTimestamp;
}
@Override
public String toString() {
return "ProcessedData{" +
"id=" + id +
", content='" + processedContent + '\'' +
", timestamp='" + processedTimestamp + '\'' +
'}';
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建WorkStealingPool
ExecutorService executor = Executors.newWorkStealingPool();
DataProcessor processor = new DataProcessor();
// 生成模拟数据
List<RawData> rawDataList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
rawDataList.add(new RawData(i, "data-item-" + i, "2023-06-" + (i % 30 + 1)));
}
System.out.println("开始批量处理数据,总记录数: " + rawDataList.size());
long startTime = System.currentTimeMillis();
// 提交所有任务
List<Future<ProcessedData>> futures = rawDataList.stream()
.map(rawData -> executor.submit(() -> processor.process(rawData)))
.collect(Collectors.toList());
// 收集结果
List<ProcessedData> results = new ArrayList<>();
for (Future<ProcessedData> future : futures) {
results.add(future.get());
}
// 关闭线程池
executor.shutdown();
long endTime = System.currentTimeMillis();
System.out.println("数据处理完成,总耗时: " + (endTime - startTime) + " 毫秒");
System.out.println("处理结果示例 (前5条):");
results.stream().limit(5).forEach(System.out::println);
}
}
5. 图像/视频处理
适用场景:需要并行处理多个图像或视频帧,如图像滤镜应用、视频转码、缩略图生成等。
import java.awt.image.BufferedImage;
import java.util.concurrent.*;
import javax.imageio.ImageIO;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class ImageProcessingExample {
// 模拟图像处理
static class ImageProcessor {
public BufferedImage applyFilter(BufferedImage image, String filterType) {
// 实际应用中这里会有真正的图像处理逻辑
System.out.println("正在应用 " + filterType + " 滤镜到图像,线程: " +
Thread.currentThread().getName());
// 模拟处理时间
try {
Thread.sleep(200 + (int)(Math.random() * 300));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 简单返回原图,实际应用中会返回处理后的图像
return image;
}
public boolean saveImage(BufferedImage image, String outputPath) {
try {
ImageIO.write(image, "jpg", new File(outputPath));
return true;
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
}
static class ImageTask implements Callable<String> {
private final String inputPath;
private final String outputPath;
private final String filterType;
private final ImageProcessor processor;
public ImageTask(String inputPath, String outputPath, String filterType, ImageProcessor processor) {
this.inputPath = inputPath;
this.outputPath = outputPath;
this.filterType = filterType;
this.processor = processor;
}
@Override
public String call() throws Exception {
try {
// 读取图像
BufferedImage image = ImageIO.read(new File(inputPath));
// 应用滤镜
BufferedImage processedImage = processor.applyFilter(image, filterType);
// 保存图像
if (processor.saveImage(processedImage, outputPath)) {
return "成功处理: " + inputPath + " -> " + outputPath;
} else {
return "保存失败: " + outputPath;
}
} catch (Exception e) {
return "处理失败: " + inputPath + " - " + e.getMessage();
}
}
}
public static void main(String[] args) {
// 创建WorkStealingPool
ExecutorService executor = Executors.newWorkStealingPool();
ImageProcessor processor = new ImageProcessor();
// 模拟需要处理的图像列表
List<String> imageFiles = List.of(
"/path/to/image1.jpg",
"/path/to/image2.jpg",
"/path/to/image3.jpg",
"/path/to/image4.jpg",
"/path/to/image5.jpg",
"/path/to/image6.jpg",
"/path/to/image7.jpg",
"/path/to/image8.jpg"
);
System.out.println("开始并行处理图像...");
long startTime = System.currentTimeMillis();
// 为每张图像创建处理任务
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < imageFiles.size(); i++) {
String inputPath = imageFiles.get(i);
String outputPath = "/path/to/output/image" + (i + 1) + "_filtered.jpg";
String filterType = i % 2 == 0 ? "grayscale" : "sepia";
futures.add(executor.submit(new ImageTask(inputPath, outputPath, filterType, processor)));
}
// 等待所有任务完成并获取结果
List<String> results = futures.stream()
.map(future -> {
try {
return future.get();
} catch (Exception e) {
return "任务执行异常: " + e.getMessage();
}
})
.collect(Collectors.toList());
// 关闭线程池
executor.shutdown();
long endTime = System.currentTimeMillis();
System.out.println("图像处理完成,总耗时: " + (endTime - startTime) + " 毫秒");
results.forEach(System.out::println);
System.out.println("注意:此示例使用了模拟路径,实际使用时需要替换为有效的图像路径");
}
}
6. Web爬虫/数据抓取
适用场景:需要同时抓取多个网页内容,解析HTML,提取数据的场景。
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.select.Elements;
public class WebCrawlerExample {
static class WebPageFetcher {
public String fetchContent(String url) throws IOException {
System.out.println("正在抓取: " + url + ",线程: " + Thread.currentThread().getName());
// 模拟网络延迟
try {
Thread.sleep(300 + (int)(Math.random() * 500));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 实际使用Jsoup获取网页内容
// Document doc = Jsoup.connect(url).get();
// return doc.html();
// 模拟返回内容
return "<html><body><h1>模拟内容 - " + url + "</h1></body></html>";
}
public List<String> extractLinks(String htmlContent) {
// 实际使用Jsoup解析HTML提取链接
// Document doc = Jsoup.parse(htmlContent);
// Elements links = doc.select("a[href]");
// return links.stream().map(link -> link.attr("href")).collect(Collectors.toList());
// 模拟返回链接
List<String> links = new ArrayList<>();
for (int i = 0; i < 3; i++) {
links.add("https://example.com/page/" + (int)(Math.random() * 100));
}
return links;
}
}
static class CrawlTask implements Callable<String> {
private final String url;
private final WebPageFetcher fetcher;
public CrawlTask(String url, WebPageFetcher fetcher) {
this.url = url;
this.fetcher = fetcher;
}
@Override
public String call() throws Exception {
try {
// 获取网页内容
String content = fetcher.fetchContent(url);
// 提取链接
List<String> links = fetcher.extractLinks(content);
return "成功抓取: " + url + ",发现链接数: " + links.size();
} catch (Exception e) {
return "抓取失败: " + url + " - " + e.getMessage();
}
}
}
public static void main(String[] args) {
// 创建WorkStealingPool
ExecutorService executor = Executors.newWorkStealingPool();
WebPageFetcher fetcher = new WebPageFetcher();
// 起始URL列表
List<String> seedUrls = List.of(
"https://example.com",
"https://example.org",
"https://example.net",
"https://news.example.com",
"https://blog.example.com",
"https://shop.example.com",
"https://forum.example.com",
"https://wiki.example.com"
);
System.out.println("开始并行爬取网页...");
long startTime = System.currentTimeMillis();
// 提交所有爬取任务
List<Future<String>> futures = new ArrayList<>();
for (String url : seedUrls) {
futures.add(executor.submit(new CrawlTask(url, fetcher)));
}
// 收集结果
for (Future<String> future : futures) {
try {
System.out.println(future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭线程池
executor.shutdown();
long endTime = System.currentTimeMillis();
System.out.println("爬取任务完成,总耗时: " + (endTime - startTime) + " 毫秒");
System.out.println("注意:此示例使用了模拟爬取,实际应用需要处理URL去重、深度控制、robots.txt等");
}
}
最佳实践与注意事项
-
CPU核心数设置:默认使用Runtime.getRuntime().availableProcessors()作为并行度,可手动指定:
Executors.newWorkStealingPool(4); // 指定4个线程 -
内存管理:WorkStealingPool使用无界队列,大量任务可能导致内存溢出,应合理控制任务数量。
-
异常处理:在提交的任务中添加异常处理,避免单个任务失败影响整个线程池。
-
资源释放:使用完成后务必调用shutdown(),避免资源泄露。
-
任务粒度:任务应足够细粒度但不应过小,避免线程切换开销超过任务执行时间。
-
I/O密集型任务:对于I/O密集型任务,可能需要更多线程,考虑使用带限制的线程池。
-
监控与调优:监控线程池状态,根据实际应用场景调整参数。
通过以上示例和最佳实践,可以充分发挥WorkStealingPool在多核CPU环境下的并行处理能力,显著提升应用性能。
- 感谢你赐予我前进的力量

