@@ -342,6 +342,305 @@ public static class CallerRunsPolicy implements RejectedExecutionHandler {
342
342
}
343
343
```
344
344
345
+ ### 如果不允许丢弃任务任务,应该选择哪个拒绝策略
346
+
347
+ 答案是` CallerRunsPolicy ` ,这一点我们从注释和源码都可以看出,只要当前程序不关闭就会使用执行` execute ` 方法的线程执行该任务:
348
+
349
+
350
+ ``` java
351
+ public static class CallerRunsPolicy implements RejectedExecutionHandler {
352
+
353
+ public CallerRunsPolicy () { }
354
+
355
+
356
+ public void rejectedExecution (Runnable r , ThreadPoolExecutor e ) {
357
+ // 只要当前程序没有关闭,就用执行execute方法的线程执行该任务
358
+ if (! e. isShutdown()) {
359
+
360
+ r. run();
361
+ }
362
+ }
363
+ }
364
+ ```
365
+
366
+ ### CallerRunsPolicy 拒绝策略有什么风险?如何解决?
367
+
368
+ 默认情况下,我们都会为了保证任务不被丢弃都优先考虑` CallerRunsPolicy ` ,这也是相对维稳的做法,这种做法的隐患是假设走到` CallerRunsPolicy ` 的任务是个非常耗时的任务,就会导致主线程就很卡死。
369
+
370
+ 下面就是笔者通过主线程使用线程池的方法,该线程池限定了最大线程数为2还有阻塞队列大小为1,这意味着第4个任务就会走到拒绝策略:
371
+
372
+ ``` java
373
+ Logger log = LoggerFactory . getLogger(ThreadPoolTest . class);
374
+ // 创建一个线程池,核心线程数为1,最大线程数为2
375
+ // 当线程数大于核心线程数时,多余的空闲线程存活的最长时间为60秒,
376
+ // 任务队列为容量为1的ArrayBlockingQueue,饱和策略为CallerRunsPolicy。
377
+ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (1 ,
378
+ 2 ,
379
+ 60 ,
380
+ TimeUnit . SECONDS ,
381
+ new ArrayBlockingQueue<> (1 ),
382
+ new ThreadPoolExecutor .CallerRunsPolicy ());
383
+
384
+ // 提交第一个任务,由核心线程执行
385
+ threadPoolExecutor. execute(() - > {
386
+ log. info(" 核心线程执行第一个任务" );
387
+ ThreadUtil . sleep(1 , TimeUnit . MINUTES );
388
+ });
389
+
390
+ // 提交第二个任务,由于核心线程被占用,任务将进入队列等待
391
+ threadPoolExecutor. execute(() - > {
392
+ log. info(" 处理入队的第二个任务" );
393
+ ThreadUtil . sleep(1 , TimeUnit . MINUTES );
394
+ });
395
+
396
+ // 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理
397
+ threadPoolExecutor. execute(() - > {
398
+ log. info(" 非核心线程处理第三个任务" );
399
+ ThreadUtil . sleep(1 , TimeUnit . MINUTES );
400
+ });
401
+
402
+ // 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,根据CallerRunsPolicy策略,任务将由提交任务的线程(即主线程)来执行
403
+ threadPoolExecutor. execute(() - > {
404
+ log. info(" 主线程处理第四个任务" );
405
+ ThreadUtil . sleep(2 , TimeUnit . MINUTES );
406
+ });
407
+
408
+ // 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交
409
+ threadPoolExecutor. execute(() - > {
410
+ log. info(" 核心线程执行第五个任务" );
411
+ });
412
+ ```
413
+
414
+
415
+ 从输出结果可以看出,因为` CallerRunsPolicy ` 这个拒绝策略,导致耗时的任务用了主线程执行,导致线程池阻塞,进而导致后续任务无法及时执行,严重的情况下很可能导致` OOM ` :
416
+
417
+
418
+ ``` bash
419
+ 2024-04-03 00:08:12.617 INFO 20804 --- [ main] com.sharkChili.ThreadPoolApplication : 启动成功!!
420
+ 2024-04-03 00:08:15.739 INFO 20804 --- [pool-1-thread-1] com.sharkChili.ThreadPoolApplication : 核心线程执行
421
+ 2024-04-03 00:08:36.768 INFO 20804 --- [pool-1-thread-2] com.sharkChili.ThreadPoolApplication : 应急线程处理
422
+ 2024-04-03 00:08:49.333 INFO 20804 --- [ main] com.sharkChili.ThreadPoolApplication : CallerRunsPolicy task
423
+ ```
424
+
425
+
426
+ 我们从问题的本质入手,调用者采用` CallerRunsPolicy ` 是希望所有的任务都能够被执行,按照笔者的经验,假如我们的场景是偶发这种突发场景,在内存允许的情况下,我们建议增加阻塞队列` BlockingQueue ` 的大小并调整堆内存以容纳更多的任务,确保任务能够被准确执行。
427
+
428
+
429
+ ![ ] ( https://qiniuyun.sharkchili.com/202404141059210.png )
430
+
431
+
432
+
433
+
434
+ 若当前服务器内存资源紧张,但我们配置线程池还为尽可能利用到` CPU ` ,我们建议调整线程中` maximumPoolSize ` 以保证尽可能压榨` CPU ` 资源:
435
+
436
+ ![ ] ( https://qiniuyun.sharkchili.com/202404141059257.png )
437
+
438
+
439
+
440
+ 如果服务器资源以达到可利用的极限,这就意味我们要在设计策略上改变线程池的调度了,我们都知道,导致主线程卡死的本质就是因为我们不希望任何一个任务被丢弃。换个思路,有没有办法既能保证任务不被丢弃且在服务器有余力时及时处理呢?这里笔者提供的一种思路,即任务持久化,注意这里笔者更多强调的是思路而不是实现,这里所谓的任务持久化,包括但不限于:
441
+ 1 . 设计一张任务表间任务存储到` MySQL ` 数据库中。
442
+ 2 . ` Redis ` 缓存任务。
443
+ 3 . 将任务提交到消息队列中。
444
+
445
+ 笔者以方案二为例,通过继承` BlockingQueue ` 实现一个混合式阻塞队列,该队列包含` JDK ` 自带的` ArrayBlockingQueue ` 和一个自定义的队列(数据存入` mysql ` 中对应` DDL ` 语句如下),通过魔改队列的添加逻辑达到任务可以存入` ArrayBlockingQueue ` 或者数据表的目的。
446
+
447
+ ``` sql
448
+
449
+ CREATE TABLE `task_info ` (
450
+ ` id` int NOT NULL AUTO_INCREMENT,
451
+ ` data` varchar (100 ) DEFAULT NULL ,
452
+ PRIMARY KEY (` id` )
453
+ ) ENGINE= InnoDB AUTO_INCREMENT= 6 DEFAULT CHARSET= utf8mb4 COLLATE= utf8mb4_0900_ai_ci;
454
+ ```
455
+
456
+
457
+
458
+ 如此一来,一旦我们的线程池中线程以达到满载时,我们就可以通过拒绝策略将最新任务持久化到` MySQL ` 数据库中,等到线程池有了有余力处理所有任务时,让其优先处理数据库中的任务以避免"饥饿"问题。
459
+
460
+ ![ ] ( https://qiniuyun.sharkchili.com/202404141059303.png )
461
+
462
+
463
+
464
+ 这里笔者也给出混合队列实现的核心源码,即通过继承` BlockingQueue ` 魔改了入队和出队的逻辑:
465
+
466
+
467
+ ``` java
468
+ public class HybridBlockingQueue <E> implements BlockingQueue<E > {
469
+
470
+ private Object mysqlLock = new Object ();
471
+
472
+
473
+ private ArrayBlockingQueue<E > arrayBlockingQueue;
474
+
475
+ // 构造方法初始化阻塞队列大小
476
+ public HybridBlockingQueue (int maxSize ) {
477
+ arrayBlockingQueue = new ArrayBlockingQueue<> (maxSize);
478
+ }
479
+
480
+
481
+ /**
482
+ * 线程池会调用的入队方法
483
+ * @param e
484
+ * @return
485
+ */
486
+ @Override
487
+ public boolean offer (E e ) {
488
+ return arrayBlockingQueue. offer(e);
489
+ }
490
+
491
+
492
+ /**
493
+ * 取任务时,优先从数据库中读取最早的任务
494
+ *
495
+ * @return
496
+ * @throws InterruptedException
497
+ */
498
+ @Override
499
+ public E take () throws InterruptedException {
500
+
501
+ synchronized (mysqlLock) {
502
+ // 从数据库中读取任务,通过上锁读取避免重复消费
503
+ TaskInfoMapper taskMapper = SpringUtil . getBean(TaskInfoMapper . class);
504
+ TaskInfo taskInfo = taskMapper. selectByExample(null ). stream()
505
+ .findFirst()
506
+ .orElse(null );
507
+
508
+
509
+ // 若数据库存在该任务,则先删后返回
510
+ if (ObjUtil . isNotEmpty(taskInfo)) {
511
+ taskMapper. deleteByPrimaryKey(taskInfo. getId());
512
+ Task task = new Task (taskInfo. getData());
513
+ return (E ) task;
514
+ }
515
+ }
516
+
517
+ // 若数据库没有要处理的任务则从内存中获取
518
+ return arrayBlockingQueue. poll();
519
+ }
520
+
521
+ /**
522
+ * 带有时间限制的任务获取
523
+ *
524
+ * @param timeout
525
+ * @param unit
526
+ * @return
527
+ * @throws InterruptedException
528
+ */
529
+ @Override
530
+ public E poll (long timeout , TimeUnit unit ) throws InterruptedException {
531
+ // 从数据库中读取任务,通过上锁读取避免重复消费
532
+ synchronized (mysqlLock) {
533
+ // 从数据库中读取任务,
534
+ TaskInfoMapper taskMapper = SpringUtil . getBean(TaskInfoMapper . class);
535
+ TaskInfo taskInfo = taskMapper. selectByExample(null ). stream()
536
+ .findFirst()
537
+ .orElse(null );
538
+
539
+
540
+ // 若数据库存在该任务,则先删后返回
541
+ if (ObjUtil . isNotEmpty(taskInfo)) {
542
+ taskMapper. deleteByPrimaryKey(taskInfo. getId());
543
+ Task task = new Task (taskInfo. getData());
544
+ return (E ) task;
545
+ }
546
+ }
547
+ // 若数据库没有要处理的任务则从内存中获取
548
+ return arrayBlockingQueue. poll(timeout, unit);
549
+
550
+ }
551
+
552
+ // ......
553
+ }
554
+ ```
555
+
556
+ 接下来就是自定义拒绝策略了,很明显我们的拒绝策略就叫持久化策略(注:这里序列化用的是` hutool ` 的` JSONUtil ` ):
557
+
558
+ ``` java
559
+ public class PersistentTaskPolicy implements RejectedExecutionHandler {
560
+
561
+ @Override
562
+ public void rejectedExecution (Runnable r , ThreadPoolExecutor executor ) {
563
+ // 任务入库
564
+ TaskInfoMapper taskMapper = SpringUtil . getBean(TaskInfoMapper . class);
565
+ Task task = (Task ) r;
566
+ TaskInfo taskInfo = new TaskInfo ();
567
+ taskInfo. setData(JSONUtil . toJsonStr(task. getTaskInfo()));
568
+ taskMapper. insertSelective(taskInfo);
569
+ }
570
+ }
571
+
572
+ ```
573
+
574
+
575
+ 最终我们的使用示例如下:
576
+
577
+ ``` java
578
+ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (1 ,
579
+ 2 ,
580
+ 60 , TimeUnit . SECONDS ,
581
+ new HybridBlockingQueue<> (1 ),
582
+ new PersistentTaskPolicy ());
583
+
584
+ threadPoolExecutor. execute(new Task (" core thread" ));
585
+
586
+ threadPoolExecutor. execute(new Task (" queueTask" ));
587
+
588
+ threadPoolExecutor. execute(new Task (" max thread" ));
589
+
590
+ threadPoolExecutor. execute(new Task (" insert into mysql database" ));
591
+ ```
592
+
593
+ 最终我们的` insert into mysql database ` 因为线程池无法及时处理而走了我们自定义的拒绝策略而持久化入库,等待线程池中其他任务完成后被取出执行:
594
+
595
+ ``` bash
596
+ 2024-04-14 11:30:16.865 INFO 1052 --- [ main] com.sharkChili.PersistentTaskPolicy : 任务持久化,taskInfo:{" data" :" insert into mysql database" }
597
+ 2024-04-14 11:31:08.516 INFO 1052 --- [pool-1-thread-2] com.sharkChili.Task : task execution completed,task info:max thread
598
+ 2024-04-14 11:31:08.516 INFO 1052 --- [pool-1-thread-1] com.sharkChili.Task : task execution completed,task info:core thread
599
+ 2024-04-14 11:32:08.563 INFO 1052 --- [pool-1-thread-1] com.sharkChili.Task : task execution completed,task info:queueTask
600
+ 2024-04-14 11:32:08.563 INFO 1052 --- [pool-1-thread-2] com.sharkChili.Task : task execution completed,task info:insert into mysql database
601
+ ```
602
+
603
+
604
+
605
+ 当然,对于这个问题,我们也可以参考其他主流框架的做法,以` Netty ` 为例,它的拒绝策略则是直接创建一个线程池以外的线程处理这些任务,为了保证任务的实时处理,这种做法可能需要良好的硬件设备且临时创建的线程无法做到准确的监控:
606
+
607
+ ``` java
608
+ private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
609
+ NewThreadRunsPolicy () {
610
+ super ();
611
+ }
612
+ public void rejectedExecution (Runnable r , ThreadPoolExecutor executor ) {
613
+ try {
614
+ // 创建一个临时线程处理任务
615
+ final Thread t = new Thread (r, " Temporary task executor" );
616
+ t. start();
617
+ } catch (Throwable e) {
618
+ throw new RejectedExecutionException (
619
+ " Failed to start a new thread" , e);
620
+ }
621
+ }
622
+ }
623
+ ```
624
+
625
+ ` ActiveMq ` 则是尝试在指定的时效内尽可能的争取将任务入队,以保证最大交付:
626
+
627
+ ``` java
628
+ new RejectedExecutionHandler () {
629
+ @Override
630
+ public void rejectedExecution (final Runnable r , final ThreadPoolExecutor executor ) {
631
+ try {
632
+ // 限时阻塞等待,实现尽可能交付
633
+ executor. getQueue(). offer(r, 60 , TimeUnit . SECONDS );
634
+ } catch (InterruptedException e) {
635
+ throw new RejectedExecutionException (" Interrupted waiting for BrokerService.worker" );
636
+ }
637
+ throw new RejectedExecutionException (" Timed Out while attempting to enqueue Task." );
638
+ }
639
+ });
640
+ ```
641
+
642
+
643
+
345
644
### 线程池常用的阻塞队列有哪些?
346
645
347
646
新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
0 commit comments