@@ -344,265 +344,114 @@ public static class CallerRunsPolicy implements RejectedExecutionHandler {
344
344
345
345
### 如果不允许丢弃任务任务,应该选择哪个拒绝策略
346
346
347
- 答案是 ` CallerRunsPolicy ` ,这一点我们从注释和源码都可以看出,只要当前程序不关闭就会使用执行 ` execute ` 方法的线程执行该任务:
347
+ 根据上面对线程池拒绝策略的介绍,相信大家很容易能够得出答案是: ` CallerRunsPolicy ` 。
348
348
349
+ 这里我们再来结合` CallerRunsPolicy ` 的源码来看看:
349
350
350
351
``` java
351
352
public static class CallerRunsPolicy implements RejectedExecutionHandler {
352
-
353
+
353
354
public CallerRunsPolicy () { }
354
355
355
-
356
+
356
357
public void rejectedExecution (Runnable r , ThreadPoolExecutor e ) {
357
358
// 只要当前程序没有关闭,就用执行execute方法的线程执行该任务
358
359
if (! e. isShutdown()) {
359
-
360
+
360
361
r. run();
361
362
}
362
363
}
363
364
}
364
365
```
365
366
367
+ 从源码可以看出,只要当前程序不关闭就会使用执行` execute ` 方法的线程执行该任务。
368
+
366
369
### CallerRunsPolicy 拒绝策略有什么风险?如何解决?
367
370
368
- 默认情况下,我们都会为了保证任务不被丢弃都优先考虑` CallerRunsPolicy ` ,这也是相对维稳的做法,这种做法的隐患是假设走到` CallerRunsPolicy ` 的任务是个非常耗时的任务,就会导致主线程就很卡死。
371
+ 我们上面也提到了:如果想要保证任何一个任务请求都要被执行的话,那选择 CallerRunsPolicy 拒绝策略更合适一些。
372
+
373
+ 不过,如果走到` CallerRunsPolicy ` 的任务是个非常耗时的任务,且处理提交任务的线程是主线程,可能会导致主线程阻塞,影响程序的正常运行。
369
374
370
- 下面就是笔者通过主线程使用线程池的方法,该线程池限定了最大线程数为2还有阻塞队列大小为1,这意味着第4个任务就会走到拒绝策略 :
375
+ 这里简单举一个例子,该线程池限定了最大线程数为 2,还阻塞队列大小为 1(这意味着第 4 个任务就会走到拒绝策略), ` ThreadUtil ` 为 Hutool 提供的工具类 :
371
376
372
377
``` java
373
- Logger log = LoggerFactory . getLogger(ThreadPoolTest . class);
374
- // 创建一个线程池,核心线程数为1,最大线程数为2
375
- // 当线程数大于核心线程数时,多余的空闲线程存活的最长时间为60秒,
376
- // 任务队列为容量为1的ArrayBlockingQueue,饱和策略为CallerRunsPolicy。
377
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (1 ,
378
- 2 ,
379
- 60 ,
380
- TimeUnit . SECONDS ,
381
- new ArrayBlockingQueue<> (1 ),
382
- new ThreadPoolExecutor .CallerRunsPolicy ());
383
-
384
- // 提交第一个任务,由核心线程执行
385
- threadPoolExecutor. execute(() - > {
386
- log. info(" 核心线程执行第一个任务" );
387
- ThreadUtil . sleep(1 , TimeUnit . MINUTES );
388
- });
389
-
390
- // 提交第二个任务,由于核心线程被占用,任务将进入队列等待
391
- threadPoolExecutor. execute(() - > {
392
- log. info(" 处理入队的第二个任务 " );
393
- ThreadUtil . sleep(1 , TimeUnit . MINUTES );
394
- });
395
-
396
- // 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理
397
- threadPoolExecutor. execute(() - > {
398
- log. info(" 非核心线程处理第三个任务" );
399
- ThreadUtil . sleep(1 , TimeUnit . MINUTES );
400
- });
401
-
402
- // 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,根据CallerRunsPolicy策略,任务将由提交任务的线程(即主线程)来执行
403
- threadPoolExecutor. execute(() - > {
404
- log. info(" 主线程处理第四个任务" );
405
- ThreadUtil . sleep(2 , TimeUnit . MINUTES );
406
- });
407
-
408
- // 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交
409
- threadPoolExecutor. execute(() - > {
410
- log. info(" 核心线程执行第五个任务" );
411
- });
378
+ Logger log = LoggerFactory . getLogger(ThreadPoolTest . class);
379
+ // 创建一个线程池,核心线程数为1,最大线程数为2
380
+ // 当线程数大于核心线程数时,多余的空闲线程存活的最长时间为60秒,
381
+ // 任务队列为容量为1的ArrayBlockingQueue,饱和策略为CallerRunsPolicy。
382
+ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (1 ,
383
+ 2 ,
384
+ 60 ,
385
+ TimeUnit . SECONDS ,
386
+ new ArrayBlockingQueue<> (1 ),
387
+ new ThreadPoolExecutor .CallerRunsPolicy ());
388
+
389
+ // 提交第一个任务,由核心线程执行
390
+ threadPoolExecutor. execute(() - > {
391
+ log. info(" 核心线程执行第一个任务" );
392
+ ThreadUtil . sleep(1 , TimeUnit . MINUTES );
393
+ });
394
+
395
+ // 提交第二个任务,由于核心线程被占用,任务将进入队列等待
396
+ threadPoolExecutor. execute(() - > {
397
+ log. info(" 非核心线程处理入队的第二个任务 " );
398
+ ThreadUtil . sleep(1 , TimeUnit . MINUTES );
399
+ });
400
+
401
+ // 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理
402
+ threadPoolExecutor. execute(() - > {
403
+ log. info(" 非核心线程处理第三个任务" );
404
+ ThreadUtil . sleep(1 , TimeUnit . MINUTES );
405
+ });
406
+
407
+ // 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,根据CallerRunsPolicy策略,任务将由提交任务的线程(即主线程)来执行
408
+ threadPoolExecutor. execute(() - > {
409
+ log. info(" 主线程处理第四个任务" );
410
+ ThreadUtil . sleep(2 , TimeUnit . MINUTES );
411
+ });
412
+
413
+ // 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交
414
+ threadPoolExecutor. execute(() - > {
415
+ log. info(" 核心线程执行第五个任务" );
416
+ });
412
417
```
413
418
419
+ 输出:
414
420
415
- 从输出结果可以看出,因为` CallerRunsPolicy ` 这个拒绝策略,导致耗时的任务用了主线程执行,导致线程池阻塞,进而导致后续任务无法及时执行,严重的情况下很可能导致` OOM ` :
416
-
417
-
418
- ``` bash
419
- 2024-04-03 00:08:12.617 INFO 20804 --- [ main] com.sharkChili.ThreadPoolApplication : 启动成功!!
420
- 2024-04-03 00:08:15.739 INFO 20804 --- [pool-1-thread-1] com.sharkChili.ThreadPoolApplication : 核心线程执行
421
- 2024-04-03 00:08:36.768 INFO 20804 --- [pool-1-thread-2] com.sharkChili.ThreadPoolApplication : 应急线程处理
422
- 2024-04-03 00:08:49.333 INFO 20804 --- [ main] com.sharkChili.ThreadPoolApplication : CallerRunsPolicy task
421
+ ``` ba
422
+ 18:19:48.203 INFO [pool-1-thread-1] c.j.concurrent.ThreadPoolTest - 核心线程执行第一个任务
423
+ 18:19:48.203 INFO [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理第三个任务
424
+ 18:19:48.203 INFO [main] c.j.concurrent.ThreadPoolTest - 主线程处理第四个任务
425
+ 18:20:48.212 INFO [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理入队的第二个任务
426
+ 18:21:48.219 INFO [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 核心线程执行第五个任务
423
427
```
424
428
429
+ 从输出结果可以看出,因为` CallerRunsPolicy ` 这个拒绝策略,导致耗时的任务用了主线程执行,导致线程池阻塞,进而导致后续任务无法及时执行,严重的情况下很可能导致 OOM。
425
430
426
- 我们从问题的本质入手,调用者采用` CallerRunsPolicy ` 是希望所有的任务都能够被执行,按照笔者的经验,假如我们的场景是偶发这种突发场景,在内存允许的情况下,我们建议增加阻塞队列` BlockingQueue ` 的大小并调整堆内存以容纳更多的任务,确保任务能够被准确执行。
427
-
428
-
429
- ![ ] ( https://qiniuyun.sharkchili.com/202404141059210.png )
430
-
431
+ 我们从问题的本质入手,调用者采用` CallerRunsPolicy ` 是希望所有的任务都能够被执行,暂时无法处理的任务又被保存在阻塞队列` BlockingQueue ` 中。这样的话,在内存允许的情况下,我们可以增加阻塞队列` BlockingQueue ` 的大小并调整堆内存以容纳更多的任务,确保任务能够被准确执行。
431
432
433
+ 为了充分利用 CPU,我们还可以调整线程池的` maximumPoolSize ` (最大线程数)参数,这样可以提高任务处理速度,避免累计在 ` BlockingQueue ` 的任务过多导致内存用完。
432
434
435
+ ![ 调整阻塞队列大小和最大线程数] ( https://oss.javaguide.cn/github/javaguide/java/concurrent/threadpool-reject-2-threadpool-reject-01.png )
433
436
434
- 若当前服务器内存资源紧张,但我们配置线程池还为尽可能利用到 ` CPU ` ,我们建议调整线程中 ` maximumPoolSize ` 以保证尽可能压榨 ` CPU ` 资源:
437
+ 如果服务器资源以达到可利用的极限,这就意味我们要在设计策略上改变线程池的调度了,我们都知道,导致主线程卡死的本质就是因为我们不希望任何一个任务被丢弃。换个思路,有没有办法既能保证任务不被丢弃且在服务器有余力时及时处理呢?
435
438
436
- ![ ] ( https://qiniuyun.sharkchili.com/202404141059257.png )
439
+ 这里提供的一种 ** 任务持久化 ** 的思路,这里所谓的任务持久化,包括但不限于:
437
440
438
-
439
-
440
- 如果服务器资源以达到可利用的极限,这就意味我们要在设计策略上改变线程池的调度了,我们都知道,导致主线程卡死的本质就是因为我们不希望任何一个任务被丢弃。换个思路,有没有办法既能保证任务不被丢弃且在服务器有余力时及时处理呢?这里笔者提供的一种思路,即任务持久化,注意这里笔者更多强调的是思路而不是实现,这里所谓的任务持久化,包括但不限于:
441
- 1 . 设计一张任务表间任务存储到` MySQL ` 数据库中。
441
+ 1 . 设计一张任务表间任务存储到 MySQL 数据库中。
442
442
2 . ` Redis ` 缓存任务。
443
443
3 . 将任务提交到消息队列中。
444
444
445
- 笔者以方案二为例,通过继承` BlockingQueue ` 实现一个混合式阻塞队列,该队列包含` JDK ` 自带的` ArrayBlockingQueue ` 和一个自定义的队列(数据存入` mysql ` 中对应` DDL ` 语句如下),通过魔改队列的添加逻辑达到任务可以存入` ArrayBlockingQueue ` 或者数据表的目的。
446
-
447
- ``` sql
448
-
449
- CREATE TABLE `task_info ` (
450
- ` id` int NOT NULL AUTO_INCREMENT,
451
- ` data` varchar (100 ) DEFAULT NULL ,
452
- PRIMARY KEY (` id` )
453
- ) ENGINE= InnoDB AUTO_INCREMENT= 6 DEFAULT CHARSET= utf8mb4 COLLATE= utf8mb4_0900_ai_ci;
454
- ```
455
-
456
-
457
-
458
- 如此一来,一旦我们的线程池中线程以达到满载时,我们就可以通过拒绝策略将最新任务持久化到` MySQL ` 数据库中,等到线程池有了有余力处理所有任务时,让其优先处理数据库中的任务以避免"饥饿"问题。
445
+ 这里以方案一为例,简单介绍一下实现逻辑:
459
446
460
- ![ ] ( https://qiniuyun.sharkchili.com/202404141059303.png )
447
+ 1 . 实现` RejectedExecutionHandler ` 接口自定义拒绝策略,自定义拒绝策略负责将线程池暂时无法处理(此时阻塞队列已满)的任务入库(保存到 MySQL 中)。注意:线程池暂时无法处理的任务会先被放在阻塞队列中,阻塞队列满了才会触发拒绝策略。
448
+ 2 . 继承` BlockingQueue ` 实现一个混合式阻塞队列,该队列包含` JDK ` 自带的` ArrayBlockingQueue ` 。另外,该混合式阻塞队列需要修改取任务处理的逻辑,也就是重写` take() ` 方法,取任务时优先从数据库中读取最早的任务,数据库中无任务时再从 ` ArrayBlockingQueue ` 中去取任务。
461
449
450
+ ![ 将一部分任务保存到MySQL中] ( https://oss.javaguide.cn/github/javaguide/java/concurrent/threadpool-reject-2-threadpool-reject-02.png )
462
451
452
+ 整个实现逻辑还是比较简单的,核心在于自定义拒绝策略和阻塞队列。如此一来,一旦我们的线程池中线程以达到满载时,我们就可以通过拒绝策略将最新任务持久化到 MySQL 数据库中,等到线程池有了有余力处理所有任务时,让其优先处理数据库中的任务以避免"饥饿"问题。
463
453
464
- 这里笔者也给出混合队列实现的核心源码,即通过继承` BlockingQueue ` 魔改了入队和出队的逻辑:
465
-
466
-
467
- ``` java
468
- public class HybridBlockingQueue <E> implements BlockingQueue<E > {
469
-
470
- private Object mysqlLock = new Object ();
471
-
472
-
473
- private ArrayBlockingQueue<E > arrayBlockingQueue;
474
-
475
- // 构造方法初始化阻塞队列大小
476
- public HybridBlockingQueue (int maxSize ) {
477
- arrayBlockingQueue = new ArrayBlockingQueue<> (maxSize);
478
- }
479
-
480
-
481
- /**
482
- * 线程池会调用的入队方法
483
- * @param e
484
- * @return
485
- */
486
- @Override
487
- public boolean offer (E e ) {
488
- return arrayBlockingQueue. offer(e);
489
- }
490
-
491
-
492
- /**
493
- * 取任务时,优先从数据库中读取最早的任务
494
- *
495
- * @return
496
- * @throws InterruptedException
497
- */
498
- @Override
499
- public E take () throws InterruptedException {
500
-
501
- synchronized (mysqlLock) {
502
- // 从数据库中读取任务,通过上锁读取避免重复消费
503
- TaskInfoMapper taskMapper = SpringUtil . getBean(TaskInfoMapper . class);
504
- TaskInfo taskInfo = taskMapper. selectByExample(null ). stream()
505
- .findFirst()
506
- .orElse(null );
507
-
508
-
509
- // 若数据库存在该任务,则先删后返回
510
- if (ObjUtil . isNotEmpty(taskInfo)) {
511
- taskMapper. deleteByPrimaryKey(taskInfo. getId());
512
- Task task = new Task (taskInfo. getData());
513
- return (E ) task;
514
- }
515
- }
516
-
517
- // 若数据库没有要处理的任务则从内存中获取
518
- return arrayBlockingQueue. poll();
519
- }
520
-
521
- /**
522
- * 带有时间限制的任务获取
523
- *
524
- * @param timeout
525
- * @param unit
526
- * @return
527
- * @throws InterruptedException
528
- */
529
- @Override
530
- public E poll (long timeout , TimeUnit unit ) throws InterruptedException {
531
- // 从数据库中读取任务,通过上锁读取避免重复消费
532
- synchronized (mysqlLock) {
533
- // 从数据库中读取任务,
534
- TaskInfoMapper taskMapper = SpringUtil . getBean(TaskInfoMapper . class);
535
- TaskInfo taskInfo = taskMapper. selectByExample(null ). stream()
536
- .findFirst()
537
- .orElse(null );
538
-
539
-
540
- // 若数据库存在该任务,则先删后返回
541
- if (ObjUtil . isNotEmpty(taskInfo)) {
542
- taskMapper. deleteByPrimaryKey(taskInfo. getId());
543
- Task task = new Task (taskInfo. getData());
544
- return (E ) task;
545
- }
546
- }
547
- // 若数据库没有要处理的任务则从内存中获取
548
- return arrayBlockingQueue. poll(timeout, unit);
549
-
550
- }
551
-
552
- // ......
553
- }
554
- ```
555
-
556
- 接下来就是自定义拒绝策略了,很明显我们的拒绝策略就叫持久化策略(注:这里序列化用的是` hutool ` 的` JSONUtil ` ):
557
-
558
- ``` java
559
- public class PersistentTaskPolicy implements RejectedExecutionHandler {
560
-
561
- @Override
562
- public void rejectedExecution (Runnable r , ThreadPoolExecutor executor ) {
563
- // 任务入库
564
- TaskInfoMapper taskMapper = SpringUtil . getBean(TaskInfoMapper . class);
565
- Task task = (Task ) r;
566
- TaskInfo taskInfo = new TaskInfo ();
567
- taskInfo. setData(JSONUtil . toJsonStr(task. getTaskInfo()));
568
- taskMapper. insertSelective(taskInfo);
569
- }
570
- }
571
-
572
- ```
573
-
574
-
575
- 最终我们的使用示例如下:
576
-
577
- ``` java
578
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (1 ,
579
- 2 ,
580
- 60 , TimeUnit . SECONDS ,
581
- new HybridBlockingQueue<> (1 ),
582
- new PersistentTaskPolicy ());
583
-
584
- threadPoolExecutor. execute(new Task (" core thread" ));
585
-
586
- threadPoolExecutor. execute(new Task (" queueTask" ));
587
-
588
- threadPoolExecutor. execute(new Task (" max thread" ));
589
-
590
- threadPoolExecutor. execute(new Task (" insert into mysql database" ));
591
- ```
592
-
593
- 最终我们的` insert into mysql database ` 因为线程池无法及时处理而走了我们自定义的拒绝策略而持久化入库,等待线程池中其他任务完成后被取出执行:
594
-
595
- ``` bash
596
- 2024-04-14 11:30:16.865 INFO 1052 --- [ main] com.sharkChili.PersistentTaskPolicy : 任务持久化,taskInfo:{" data" :" insert into mysql database" }
597
- 2024-04-14 11:31:08.516 INFO 1052 --- [pool-1-thread-2] com.sharkChili.Task : task execution completed,task info:max thread
598
- 2024-04-14 11:31:08.516 INFO 1052 --- [pool-1-thread-1] com.sharkChili.Task : task execution completed,task info:core thread
599
- 2024-04-14 11:32:08.563 INFO 1052 --- [pool-1-thread-1] com.sharkChili.Task : task execution completed,task info:queueTask
600
- 2024-04-14 11:32:08.563 INFO 1052 --- [pool-1-thread-2] com.sharkChili.Task : task execution completed,task info:insert into mysql database
601
- ```
602
-
603
-
604
-
605
- 当然,对于这个问题,我们也可以参考其他主流框架的做法,以` Netty ` 为例,它的拒绝策略则是直接创建一个线程池以外的线程处理这些任务,为了保证任务的实时处理,这种做法可能需要良好的硬件设备且临时创建的线程无法做到准确的监控:
454
+ 当然,对于这个问题,我们也可以参考其他主流框架的做法,以 Netty 为例,它的拒绝策略则是直接创建一个线程池以外的线程处理这些任务,为了保证任务的实时处理,这种做法可能需要良好的硬件设备且临时创建的线程无法做到准确的监控:
606
455
607
456
``` java
608
457
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
@@ -622,7 +471,7 @@ private static final class NewThreadRunsPolicy implements RejectedExecutionHandl
622
471
}
623
472
```
624
473
625
- ` ActiveMq ` 则是尝试在指定的时效内尽可能的争取将任务入队,以保证最大交付:
474
+ ActiveMQ 则是尝试在指定的时效内尽可能的争取将任务入队,以保证最大交付:
626
475
627
476
``` java
628
477
new RejectedExecutionHandler () {
@@ -639,8 +488,6 @@ new RejectedExecutionHandler() {
639
488
});
640
489
```
641
490
642
-
643
-
644
491
### 线程池常用的阻塞队列有哪些?
645
492
646
493
新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
0 commit comments