批量数据简单分次处理
一、实现思路
package com.nn3n.batchexec;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public final class ListBatchProcessor {
private final List<Long> dataList;
private final AtomicInteger currentIndex = new AtomicInteger(0);
private final int batchSize;
public ListBatchProcessor(List<Long> dataList, int batchSize) {
this.dataList = dataList;
this.batchSize = batchSize;
}
public synchronized List<Long> getNextBatch() {
int start = currentIndex.get();
if (start >= dataList.size()) {
return Collections.emptyList(); // 已处理完所有数据
}
int end = Math.min(start + batchSize, dataList.size());
List<Long> batch = dataList.subList(start, end);
currentIndex.set(end); // 原子更新索引
return batch;
}
public boolean hasMore() {
return currentIndex.get() < dataList.size();
}
}
二、验证测试
package com.nn3n.bootdemo;
import com.nn3n.batchexec.ListBatchProcessor;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.*;
public class BatchTest {
@Test
void testSingleThreadProcessing() {
// 准备测试数据
List<Long> testData = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
ListBatchProcessor processor = new ListBatchProcessor(testData, 3);
// 验证批次处理
assertTrue(processor.hasMore());
assertEquals(Arrays.asList(1L, 2L, 3L), processor.getNextBatch());
assertTrue(processor.hasMore());
assertEquals(Arrays.asList(4L, 5L, 6L), processor.getNextBatch());
assertTrue(processor.hasMore());
assertEquals(Arrays.asList(7L, 8L, 9L), processor.getNextBatch());
assertTrue(processor.hasMore());
assertEquals(Collections.singletonList(10L), processor.getNextBatch());
assertFalse(processor.hasMore());
assertEquals(Collections.EMPTY_LIST, processor.getNextBatch());
}
@Test
void testMultiThreadProcessing() throws InterruptedException {
// 准备大数据集
List<Long> bigData = generateTestData(100000);
ListBatchProcessor processor = new ListBatchProcessor(bigData, 1000);
// 多线程测试
int threadCount = 16;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicLong totalProcessed = new AtomicLong(0);
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
while (processor.hasMore()) {
List<Long> batch = processor.getNextBatch();
totalProcessed.addAndGet(batch.size());
}
latch.countDown();
});
}
latch.await();
executor.shutdown();
// 验证所有数据都被处理且没有重复
assertEquals(bigData.size(), totalProcessed.get());
}
private List<Long> generateTestData(int size) {
Long[] data = new Long[size];
for (int i = 0; i < size; i++) {
data[i] = (long) i;
}
return Arrays.asList(data);
}
}
三、总结
好好学习,天天向上。
- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 软件从业者Hort
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果