一、实现思路

package com.nn3n.batchexec;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public final class ListBatchProcessor {

    private final List<Long> dataList;
    private final AtomicInteger currentIndex = new AtomicInteger(0);
    private final int batchSize;

    public ListBatchProcessor(List<Long> dataList, int batchSize) {
        this.dataList = dataList;
        this.batchSize = batchSize;
    }

    public synchronized List<Long> getNextBatch() {
        int start = currentIndex.get();
        if (start >= dataList.size()) {
            return Collections.emptyList(); // 已处理完所有数据
        }
        int end = Math.min(start + batchSize, dataList.size());
        List<Long> batch = dataList.subList(start, end);
        currentIndex.set(end); // 原子更新索引
        return batch;
    }

    public boolean hasMore() {
        return currentIndex.get() < dataList.size();
    }
}

二、验证测试

package com.nn3n.bootdemo;

import com.nn3n.batchexec.ListBatchProcessor;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.jupiter.api.Assertions.*;

public class BatchTest {

    @Test
    void testSingleThreadProcessing() {
        // 准备测试数据
        List<Long> testData = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
        ListBatchProcessor processor = new ListBatchProcessor(testData, 3);

        // 验证批次处理
        assertTrue(processor.hasMore());
        assertEquals(Arrays.asList(1L, 2L, 3L), processor.getNextBatch());

        assertTrue(processor.hasMore());
        assertEquals(Arrays.asList(4L, 5L, 6L), processor.getNextBatch());

        assertTrue(processor.hasMore());
        assertEquals(Arrays.asList(7L, 8L, 9L), processor.getNextBatch());

        assertTrue(processor.hasMore());
        assertEquals(Collections.singletonList(10L), processor.getNextBatch());

        assertFalse(processor.hasMore());
        assertEquals(Collections.EMPTY_LIST, processor.getNextBatch());
    }

    @Test
    void testMultiThreadProcessing() throws InterruptedException {
        // 准备大数据集
        List<Long> bigData = generateTestData(100000);
        ListBatchProcessor processor = new ListBatchProcessor(bigData, 1000);

        // 多线程测试
        int threadCount = 16;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);
        AtomicLong totalProcessed = new AtomicLong(0);

        for (int i = 0; i < threadCount; i++) {
            executor.submit(() -> {
                while (processor.hasMore()) {
                    List<Long> batch = processor.getNextBatch();
                    totalProcessed.addAndGet(batch.size());
                }
                latch.countDown();
            });
        }

        latch.await();
        executor.shutdown();

        // 验证所有数据都被处理且没有重复
        assertEquals(bigData.size(), totalProcessed.get());
    }

    private List<Long> generateTestData(int size) {
        Long[] data = new Long[size];
        for (int i = 0; i < size; i++) {
            data[i] = (long) i;
        }
        return Arrays.asList(data);
    }
}

三、总结

好好学习,天天向上。