这个demo没有实质意义,只是简单模拟一下任务积压的场景。
java"> private static final ExecutorService PRODUCER = Executors.newFixedThreadPool(1);
private static final ThreadPoolExecutor CONSUMER = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
private static final LinkedBlockingQueue<Runnable> QUEUE = new LinkedBlockingQueue<>();
public static void main(String[] args) throws InterruptedException {
// 启动生产者线程
produce();
// 启动消费者线程
consume();
}
private static void produce() {
// 启动一个新的线程作为生产者
PRODUCER.execute(() -> {
// 使用AtomicInteger来管理任务编号,确保线程安全
AtomicInteger i = new AtomicInteger();
while (true) {
// 将一个任务添加到队列中
QUEUE.add(() -> {
// 模拟任务需要长时间执行
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
// 将中断异常转换为运行时异常,以便抛出
throw new RuntimeException(e);
}
// 执行任务,并输出任务编号
log.info("执行任务 {}" , i.getAndIncrement());
});
// 模拟生产者添加任务的频率,间隔2秒
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// 将中断异常转换为运行时异常,以便抛出
throw new RuntimeException(e);
}
}
});
}
private static void consume() throws InterruptedException {
// 无限循环,用于持续处理队列中的任务
while (true){
// 从队列中获取一个任务,该任务是待执行的Runnable对象
Runnable task = QUEUE.take();
// 获取队列当前的大小,用于监控队列的状态
int size = QUEUE.size();
// 输出队列当前的大小,用于日志记录或调试
log.info("当前队列大小 {}",size);
// 执行获取到的任务
CONSUMER.execute(task);
try {
// 使当前线程休眠8秒,目的是控制任务处理的频率
// 或者说,给队列填充新任务留下时间
Thread.sleep(8000);
} catch (InterruptedException e) {
// 线程被中断时,抛出运行时异常
// 这样做是为了让上层调用者能够处理中断情况
throw new RuntimeException(e);
}
}
}
代码总结
这段代码的主要功能是实现一个简单的生产者-消费者模型:
- 生产者:每隔2秒向队列中添加一个任务,任务执行时会休眠8秒并输出任务编号。
- 消费者:每隔8秒从队列中取出一个任务并执行,同时输出当前队列的大小。
通过这种方式,代码模拟了一个生产者和消费者之间的任务处理流程,生产者以固定频率生成任务,消费者以固定频率处理任务,并通过队列进行任务的传递和缓冲。