并发编程 —— 深入理解线程池

概述

在程序中,我们会用各种池化技术来缓存创建昂贵的对象,比如线程池、连接池、内存池。一般是预先创建一些对象放入池中,使用的时候直接取出使用,用完归还以便复用,还会通过一定的策略调整池中缓存对象的数量,实现池的动态伸缩。

由于线程的创建比较昂贵,随意、没有控制地创建大量线程会造成性能问题,因此短平快的任务一般考虑使用线程池来处理,而不是直接创建线程。

那么,如何正确的创建并正确的使用线程池呢,这篇文章就来细看下。

线程池

虽然在 Java 语言中创建线程看上去就像创建一个对象一样简单,只需要 new Thread() 就可以了,但实际上创建线程远不是创建一个对象那么简单。

创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了。所以线程是一个重量级的对象,应该避免频繁创建和销毁,一般就是采用线程池来避免频繁的创建和销毁线程。

 

线程池原理

Java 通过用户线程与内核线程结合的 1:1 线程模型来实现,Java 将线程的调度和管理设置在了用户态。在 HotSpot VM 的线程模型中,Java 线程被一对一映射为内核线程。Java 在使用线程执行程序时,需要创建一个内核线程;当该 Java 线程被终止时,这个内核线程也会被回收。因此 Java 线程的创建与销毁将会消耗一定的计算机资源,从而增加系统的性能开销。

除此之外,大量创建线程同样会给系统带来性能问题,因为内存和 CPU 资源都将被线程抢占,如果处理不当,就会发生内存溢出、CPU 使用率超负荷等问题。

为了解决上述两类问题,Java 提供了线程池概念,对于频繁创建线程的业务场景,线程池可以创建固定的线程数量,并且在操作系统底层,轻量级进程将会把这些线程映射到内核。

线程池可以提高线程复用,又可以固定最大线程使用量,防止无限制地创建线程。当程序提交一个任务需要一个线程时,会去线程池中查找是否有空闲的线程,若有,则直接使用线程池中的线程工作,若没有,会去判断当前已创建的线程数量是否超过最大线程数量,如未超过,则创建新线程,如已超过,则进行排队等待或者直接抛出异常。

 

线程池是一种生产者 - 消费者模式

线程池的设计,普遍采用的都是生产者 - 消费者模式。线程池的使用方是生产者,线程池本身是消费者。

原理实现大致如下:

 1 package com.lyyzoo.test.concurrent.executor;
 2 
 3 import java.util.ArrayList;
 4  java.util.List;
 5  java.util.concurrent.BlockingQueue;
 6  java.util.concurrent.LinkedBlockingQueue;
 7 
 8 /**
 9  * @author bojiangzhou 2020/02/12
10  */
11 public class CustomThreadPool {
12 
13     static void main(String[] args) {
14         // 使用有界阻塞队列 创建线程池
15         CustomThreadPool pool = new CustomThreadPool(2,new LinkedBlockingQueue<>(10));
16         pool.execute(() -> {
17             System.out.println("提交了一个任务");
18         });
19     }
20 
21      利用阻塞队列实现生产者-消费者模式
22     final BlockingQueue<Runnable> workQueue;
23      保存内部工作线程
24     final List<Thread> threads = new ArrayList<>();
25 
26     public CustomThreadPool(int coreSize,BlockingQueue<Runnable> workQueue) {
27         this.workQueue =28          创建工作线程
29         for (int i = 0; i < coreSize; i++) {
30             WorkerThread work = new WorkerThread();
31             work.start();
32             threads.add(work);
33         }
34 35 
36      生产者 提交任务
37      execute(Runnable command) {
38         try39              队列已满,put 会一直等待
40             workQueue.put(command);
41         } catch (InterruptedException e) {
42             e.printStackTrace();
43 44 45 
46     47      * 工作线程负责消费任务,并执行任务
48      49     class WorkerThread extends Thread {
50         @Override
51          run() {
52              循环取任务并执行,take 取不到任务会一直等待
53             while (true54                 55                     Runnable runnable = workQueue.take();
56                     runnable.run();
57                 } 58                     e.printStackTrace();
59                 }
60             }
61 62 63 }

ThreadPoolExecutor

线程池参数说明

Java 提供的线程池相关的工具类中,最核心的是 ThreadPoolExecutor,通过名字也能看出来,它强调的是 Executor,而不是一般意义上的池化资源。

ThreadPoolExecutor 的构造函数非常复杂,这个最完备的构造函数有 7 个参数:

 

各个参数的含义如下:

  • corePoolSize:表示线程池保有的最小线程数。
  • maximumPoolSize:表示线程池创建的最大线程数。
  • keepAliveTime & unit:如果一个线程空闲了 keepAliveTime & unit 这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
  • workQueue:工作队列,一般定义有界阻塞队列。
  • threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
  • handler:通过这个参数可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。ThreadPoolExecutor 已经提供了以下 4 种拒绝策略。
    •   CallerRunsPolicy:提交任务的线程自己去执行该任务。
    •   AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
    •   DiscardPolicy:直接丢弃任务,没有任何异常抛出。
    •   DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。

 

ThreadPoolExecutor 构造完成后,还可以通过如下方法定制默认行为:

  • executor.allowCoreThreadTimeOut(true):将包括“核心线程”在内的,没有任务分配的所有线程,在等待 keepAliveTime 时间后回收掉。
  • executor.prestartAllCoreThreads():创建线程池后,立即创建核心数个工作线程;线程池默认是在任务来时才创建工作线程。

 

创建线程池示例:

void test() throws InterruptedException {
 2     ThreadPoolExecutor poolExecutor =  ThreadPoolExecutor(
 3              核心线程数
 4             2, 5              最大线程数
 6             16 7              线程空闲时间
 8             60 9              使用有界阻塞队列
10             new LinkedBlockingQueue<>(1024),1)">11              定义线程创建方式,可自定线程名称
12             new ThreadFactoryBuilder().setNameFormat("executor-%d").build(),1)">13              自定义拒绝策略,一般和降级策略配合使用
14             (r,executor) ->15                  队列已满,拒绝执行
16                 throw new RejectedExecutionException("Task " + r.toString() +
17                         " rejected from " + executor.toString());
    );
21     poolExecutor.submit(() ->22         LOGGER.info("submit task"23     });
24 }

 

线程池的线程分配流程

任务提交后的大致流程如下图所示。提交任务后,如果线程数小于 corePoolSize,则创建新线程执行任务,无论当前线程池的线程是否空闲都会创建新的线程。

当创建的线程数等于 corePoolSize 时,提交的任务会被加入到设置的阻塞队列中。

当队列满了,则会创建非核心线程执行任务,直到线程池中的线程数量等于 maximumPoolSize。

当线程数量已经等于 maximumPoolSize 时, 新提交的任务无法加入到等待队列,也无法创建非核心线程直接执行,如果没有为线程池设置拒绝策略,这时线程池就会抛出 RejectedExecutionException 异常,即默认拒绝接受任务。

 

线程池默认的拒绝策略就是丢弃任务,所以我们在设置有界队列时,需要考虑设置合理的拒绝策略,要考虑到高峰时期任务的数量,避免任务被丢弃而影响业务流程。

 

强烈建议使用有界队列

创建 ThreadPoolExecutor 时强烈建议使用有界队列。如果设置为无界队列,那么一般最大线程数的设置是不起作用的,而且遇到任务高峰时,如果一直往队列添加任务,容易出现OOM,抛出如下异常。

Exception in thread "http-nio-45678-ClientPoller" 
    java.lang.OutOfMemoryError: GC overhead limit exceeded

 

使用有界队列时,需要注意,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会抛出 RejectedExecutionException,这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略,因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。

 

监控线程池的状态

建议用一些监控手段来观察线程池的状态。线程池这个组件往往会表现得任劳任怨、默默无闻,除非是出现了拒绝策略,否则压力再大都不会抛出一个异常。如果我们能提前观察到线程池队列的积压,或者线程数量的快速膨胀,往往可以提早发现并解决问题。

可以通过日志定时展示线程池的状态:

void displayThreadPoolStatus(ThreadPoolExecutor threadPool,String threadPoolName,1)">long period,TimeUnit unit) {
 2     Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> 3         LOGGER.info("[>>ExecutorStatus<<] ThreadPool Name: [{}],Pool Status: [shutdown={},Terminated={}],Pool Thread Size: {},Active Thread Count: {},Task Count: {},Tasks Completed: {},Tasks in Queue: {}"                threadPoolName,1)"> 5                 threadPool.isShutdown(),threadPool.isTerminated(),1)"> 线程是否被终止
 6                 threadPool.getPoolSize(),1)"> 线程池线程数量
 7                 threadPool.getActiveCount(),1)"> 工作线程数
 8                 threadPool.getTaskCount(),1)"> 总任务数
 9                 threadPool.getCompletedTaskCount(),1)"> 已完成的任务数
10                 threadPool.getQueue().size());  线程池中线程的数量
11     },012 }

 

还可以通过 Spring Boot Actuator 的 InfoContributor 功能通过 info 接口暴露线程池状态

 2  * 暴露线程池状态
 *
 bojiangzhou 2020/06/30
 5  class ExecutorInfoContributor implements InfoContributor {
    @Override
 9      contribute(Info.Builder builder) {
10         Map<String,ThreadPoolExecutor> executorMap = CommonExecutor.ExecutorManager.getAllThreadPoolExecutor();
11         if (MapUtils.isEmpty(executorMap)) {
return;
13 14 
15         executorMap.forEach((executorName,1)">16             builder.withDetail(executorName,threadPoolInfo(executor));
17 19 
20     private static Map<String,Object> threadPoolInfo(ThreadPoolExecutor threadPool) {
21         Map<String,Object> info = new HashMap<>(822         info.put("Terminated",threadPool.isTerminated());23         info.put("PoolSize",threadPool.getPoolSize()); 线程池工作线程数
24         info.put("CorePoolSize",threadPool.getCorePoolSize()); 线程池核心线程数
25         info.put("MaximumPoolSize",threadPool.getMaximumPoolSize()); 线程池最大线程数
26         info.put("LargestPoolSize",threadPool.getLargestPoolSize()); 最大达到过的线程数
27         info.put("CompletedTaskCount",threadPool.getCompletedTaskCount());28         info.put("TaskCount",threadPool.getTaskCount());29         info.put("QueueSize",threadPool.getQueue().size()); 队列大小
30         info.put("QueueRemainingCapacity",threadPool.getQueue().remainingCapacity()); 队列剩余容量
31          info;
33 }

线程池任务提交方式

提交任务可以通过 execute 和 submit 方法提交任务,下面就来看下它们的区别。

submit 方法签名:

execute 方法签名:

 

使用 execute 提交任务

使用 execute 提交任务,线程池内抛出异常会导致线程退出,线程池只能重新创建一个线程。如果每个异步任务都以异常结束,那么线程池可能完全起不到线程重用的作用。

而且主线程无法捕获(catch)到线程池内抛出的异常。因为没有手动捕获异常进行处理,ThreadGroup 帮我们进行了未捕获异常的默认处理,向标准错误输出打印了出现异常的线程名称和异常信息。显然,这种没有以统一的错误日志格式记录错误信息打印出来的形式,对生产级代码是不合适的。

 

如下,execute 提交任务,抛出异常后,从线程名称可以看出,老线程退出,创建了新的线程。

ThreadGroup 处理未捕获异常:直接输出到 System.err

 

解决方式:

  • 以 execute 方法提交到线程池的异步任务,最好在任务内部做好异常处理;
  • 设置自定义的异常处理程序作为保底,比如在声明线程池时自定义线程池的未捕获异常处理程序。或者设置全局的默认未捕获异常处理程序。
 自定义线程池的未捕获异常处理程序
 2 ThreadPoolExecutor executor = new ThreadPoolExecutor(8,8 3         30 4         new LinkedBlockingQueue<>(),1)"> 5          ThreadFactoryBuilder()
 6                 .setNameFormat("pool-%d")
 7                 .setUncaughtExceptionHandler((Thread t,Throwable e) -> 8                     log.error("pool happen exception,thread is {}"                })
10                 .build());
11                 
12  设置全局的默认未捕获异常处理程序
static14     Thread.setDefaultUncaughtExceptionHandler((thread,throwable)->15         log.error("Thread {} got exception"17 }  

定义的异常处理程序将未捕获的异常信息打印到标准日志中了,老线程同样会退出。如果要避免这个问题,就需要使用 submit 方法提交任务。

 

使用 submit 提交任务

使用 submit,线程不会退出,但是异常不会记录,会被生吞掉。查看 FutureTask 源码可以发现,在执行任务出现异常之后,异常存到了一个 outcome 字段中,只有在调用 get 方法获取 FutureTask 结果的时候,才会以 ExecutionException 的形式重新抛出异常。所以我们可以通过捕获 get 方法抛出的异常来判断线程的任务是否抛出了异常。

 

submit 提交任务,可以通过 Future 获取返回结果,如果抛出异常,可以捕获 ExecutionException 得到异常栈信息。通过线程名称可以看出,老线程也没有退出。

需要注意的是,使用 submit 时,setUncaughtExceptionHandler 设置的异常处理器不会生效。

 

submit 与 execute 的区别

execute提交的是Runnable类型的任务,而submit提交的是Callable或者Runnable类型的任务;

execute的提交没有返回值,而submit的提交会返回一个Future类型的对象;

execute提交的时候,如果有异常,就会直接抛出异常,而submit在遇到异常的时候,通常不会立马抛出异常,而是会将异常暂时存储起来,等待你调用Future.get()方法的时候,才会抛出异常;

execute 提交的任务抛出异常,老线程会退出,线程池会立即创建一个新的线程。submit 提交的任务抛出异常,老线程不会退出;

线程池设置的 UncaughtExceptionHandler 对 execute 提交的任务生效,对 submit 提交的任务不生效。

线程数设置多少合适

创建多少线程合适,要看多线程具体的应用场景。我们的程序一般都是 CPU 计算和 I/O 操作交叉执行的,由于 I/O 设备的速度相对于 CPU 来说都很慢,所以大部分情况下,I/O 操作执行的时间相对于 CPU 计算来说都非常长,这种场景我们一般都称为 I/O 密集型计算;和 I/O 密集型计算相对的就是 CPU 密集型计算了,CPU 密集型计算大部分场景下都是纯 CPU 计算。I/O 密集型程序和 CPU 密集型程序,计算最佳线程数的方法是不同的。

 

CPU 密集型计算

多线程本质上是提升多核 CPU 的利用率,所以对于一个 4 核的 CPU,每个核一个线程,理论上创建 4 个线程就可以了,再多创建线程也只是增加线程切换的成本。所以,对于 CPU 密集型的计算场景,理论上“线程的数量 = CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。

 

I/O 密集型的计算场景

如果 CPU 计算和 I/O 操作的耗时是 1:1,那么 2 个线程是最合适的。如果 CPU 计算和 I/O 操作的耗时是 1:2,那设置 3 个线程是合适的,如下图所示:CPU 在 A、B、C 三个线程之间切换,对于线程 A,当 CPU 从 B、C 切换回来时,线程 A 正好执行完 I/O 操作。这样 CPU 和 I/O 设备的利用率都达到了 100%。

会发现,对于 I/O 密集型计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,可以总结出这样一个公式:最佳线程数 =1 +(I/O 耗时 / CPU 耗时)

对于多核 CPU,需要等比扩大,计算公式如下:最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]

 

线程池线程数设置 

可通过如下方式获取CPU核数:

1 2  * 获取返回CPU核数
3 4 @return 返回CPU核数,默认为8
5  6 int getCpuProcessors() {
7     return Runtime.getRuntime() != null && Runtime.getRuntime().availableProcessors() > 0 ?
8             Runtime.getRuntime().availableProcessors() : 89 }

 

在一些非核心业务中,我们可以将核心线程数设置小一些,最大线程数量设置为CPU核心数量,阻塞队列大小根据具体场景设置;不要过大,防止大量任务进入等待队列而超时,应尽快创建非核心线程执行任务;也不要过小,避免队列满了任务被拒绝丢弃。

public ThreadPoolExecutor executor() {
 2     int coreSize = getCpuProcessors();
 3     ThreadPoolExecutor executor =  5             10 6             new LinkedBlockingQueue<>(512 ThreadFactoryBuilder().setNameFormat("executor-%d").build(),1)"> ThreadPoolExecutor.AbortPolicy()
    );15      executor;
}

 

在一些核心业务中,核心线程数设置为CPU核心数,最大线程数可根据公式 最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)] 来计算。阻塞队列可以根据具体业务场景设置,如果线程处理业务非常迅速,我们可以考虑将阻塞队列设置大一些,处理的请求吞吐量会大些;如果线程处理业务非常耗时,阻塞队列设置小些,防止请求在阻塞队列中等待过长时间而导致请求已超时。

 ThreadPoolExecutor executor() {
     getCpuProcessors();
    ThreadPoolExecutor executor =  ThreadPoolExecutor(
            coreSize,coreSize * 8 ThreadFactoryBuilder().setNameFormat("executor-%d" ThreadPoolExecutor.AbortPolicy()
    ); executor;
}

 

注意:一般不要将 corePoolSize 设置为 0,例如下面的线程池,使用了无界队列,虽 maximumPoolSize > 0,但实际上只会有一个工作线程,因为其它任务都加入等待队列了。

1 ThreadPoolExecutor executor = new ThreadPoolExecutor(0,5,30TimeUnit.SECONDS,1)">3         4         new ThreadFactoryBuilder().setNameFormat("test-%d").build()
5 );

 

线程池如何优先启用非核心线程

如果想让线程池激进一点,优先开启更多的线程,而把队列当成一个后备方案,可以自定义队列,重写 offer 方法,因为线程池是通过 offer 方法将任务放入队列。

 

通过重写队列的 offer 方法,直接返回 false,造成这个队列已满的假象,线程池在工作队列满了无法入队的情况下会扩容线程池。直到线程数达到最大线程数,就会触发拒绝策略,此时再通过自定义的拒绝策略将任务通过队列的 put 方法放入队列中。这样就可以优先开启更多线程,而不是进入队列了。

 ThreadPoolExecutor 通过 offer 将元素放入队列,重载队列的 offer 方法,直接返回 false,造成队列已满的假象
 3      队列满时,会创建新的线程直到达到 maximumPoolSize,之后会触发执行拒绝策略
 4     LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
final long serialVersionUID = 8303142475890427046L 6 
 7  8         boolean offer(Runnable e) {
return false    };
 当线程达到 maximumPoolSize 时会触发拒绝策略,此时将任务 put 到队列中
14     RejectedExecutionHandler rejectedExecutionHandler =  RejectedExecutionHandler() {
15 16          rejectedExecution(Runnable r,ThreadPoolExecutor executor) {
17             18                  任务拒绝时,通过 put 放入队列
                queue.put(r);
20             } 21                 Thread.currentThread().interrupt();
22 24  构造线程池
27     ThreadPoolExecutor executor = new ThreadPoolExecutor(2,428             60029             queue,1)">30             new ThreadFactoryBuilder().setNameFormat("demo-%d"            rejectedExecutionHandler);
32 
33     IntStream.rangeClosed(1,50).forEach(i ->34         executor.submit(() ->35             log.info("start..."36             sleep(900037 38 39 }

优雅的终止线程和线程池

优雅地终止线程

在程序中,我们不能随便中断一个线程,因为这是极其不安全的操作,我们无法知道这个线程正运行在什么状态,它可能持有某把锁,强行中断可能导致锁不能释放的问题;或者线程可能在操作数据库,强行中断导致数据不一致混乱的问题。正因此,JAVA里将Thread的stop方法设置为过时,以禁止大家使用。

优雅地终止线程,不是自己终止自己,而是在一个线程 T1 中,终止线程 T2;这里所谓的“优雅”,指的是给 T2 一个机会料理后事,而不是被一剑封喉。两阶段终止模式,就是将终止过程分成两个阶段,其中第一个阶段主要是线程 T1 向线程 T2发送终止指令,而第二阶段则是线程 T2响应终止指令。

Java 线程进入终止状态的前提是线程进入 RUNNABLE 状态,而实际上线程也可能处在休眠状态,也就是说,我们要想终止一个线程,首先要把线程的状态从休眠状态转换到 RUNNABLE 状态。如何做到呢?这个要靠 Java Thread 类提供的 interrupt() 方法,它可以将休眠状态的线程转换到 RUNNABLE 状态。

线程转换到 RUNNABLE 状态之后,我们如何再将其终止呢?RUNNABLE 状态转换到终止状态,优雅的方式是让 Java 线程自己执行完 run() 方法,所以一般我们采用的方法是设置一个标志位,然后线程会在合适的时机检查这个标志位,如果发现符合终止条件,则自动退出 run() 方法。这个过程其实就是第二阶段:响应终止指令。终止指令,其实包括两方面内容:interrupt() 方法和线程终止的标志位。

如果我们在线程内捕获中断异常(如Thread.sleep()抛出了中断一次)之后,需通过 Thread.currentThread().interrupt() 重新设置线程的中断状态,因为 JVM 的异常处理会清除线程的中断状态。

 

建议自己设置线程终止标志位,避免线程内调用第三方类库的方法未处理线程中断状态,如下所示。

 InterruptDemo {
     * 输出:调用 interrupt() 时,只是设置了线程中断标识,线程依旧会继续执行当前方法,执行完之后再退出线程。
     * do something...
     * continue do something...
     * 线程被中断...
12      void main(String[] args) 14         Proxy proxy =  Proxy();
        proxy.start();
16 
17         Thread.sleep(6000        proxy.stop();
 Proxy {
22          自定义线程终止标志位
23         volatile boolean terminated = 24 
25         boolean started = 26 
27         Thread t;
28 
synchronized  start() {
 (started) {
31                 33             started = 34             terminated = 36             t = new Thread(() ->37                 while (!terminated) {  取代 while (true)
38                     System.out.println("do something..."39                     40                         Thread.sleep(200041                     } 42                          如果其它线程中断此线程,抛出异常时,需重新设置线程中断状态,因为 JVM 的异常处理会清除线程的中断状态。
43                         System.out.println("线程被中断..."                        Thread.currentThread().interrupt();
45                     }
46                     System.out.println("continue do something..."48                 started = 49             });
            t.start();
51 52 
53          stop() {
54              设置中断标志
55             terminated =             t.interrupt();
57 59 
60 }

 

优雅的终止线程池

线程池提供了两个方法来中断线程池:shutdown() 和 shutdownNow()。

shutdown():是一种很保守的关闭线程池的方法。线程池执行 shutdown() 后,就会拒绝接收新的任务,但是会等待线程池中正在执行的任务和已经进入阻塞队列的任务都执行完之后才最终关闭线程池。

shutdownNow():相对激进一些,线程池执行 shutdownNow() 后,会拒绝接收新的任务,同时还会中断线程池中正在执行的任务,已经进入阻塞队列的任务也被剥夺了执行的机会,不过这些被剥夺执行机会的任务会作为 shutdownNow() 方法的返回值返回。因为 shutdownNow() 方法会中断正在执行的线程,所以提交到线程池的任务,如果需要优雅地结束,就需要正确地处理线程中断。如果提交到线程池的任务不允许取消,那就不能使用 shutdownNow() 方法终止线程池。

 

如果想在jvm关闭的时候进行内存清理、对象销毁等操作,或者仅仅想起个线程然后这个线程不会退出,可以使用Runtime.addShutdownHook。

这个方法的作用就是在JVM中增加一个关闭的钩子。当程序正常退出、系统调用 System.exit 方法或者虚拟机被关闭时才会执行系统中已经设置的所有钩子,当系统执行完这些钩子后,JVM才会关闭。

利用这个性质,就可以在这个最后执行的线程中把线程池优雅的关闭掉。虽然jvm关闭了,但优雅关闭线程池总是好的,特别是涉及到服务端的 tcp 连接。

 * 添加Hook在Jvm关闭时优雅的关闭线程池
@param threadPool     线程池
 threadPoolName 线程池名称
 6   hookShutdownThreadPool(ExecutorService threadPool,String threadPoolName) {
 8     Runtime.getRuntime().addShutdownHook( 9         LOGGER.info("[>>ExecutorShutdown<<] Start to shutdown the thead pool: [{}]"10          使新任务无法提交
        threadPool.shutdown();
12          等待未完成任务结束
14             if (!threadPool.awaitTermination(6015                 threadPool.shutdownNow();  取消当前执行的任务
16                 LOGGER.warn("[>>ExecutorShutdown<<] Interrupt the worker,which may cause some task inconsistent. Please check the biz logs."17 
 等待任务取消的响应
19                 20                     LOGGER.error("[>>ExecutorShutdown<<] Thread pool can't be shutdown even with interrupting worker threads,1)">23         }  (InterruptedException ie) {
24              重新取消当前线程进行中断
25             threadPool.shutdownNow();
26             LOGGER.error("[>>ExecutorShutdown<<] The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconsistent state. Please check the biz logs."27 
28              保留中断状态
            Thread.currentThread().interrupt();
30 31 
32         LOGGER.info("[>>ExecutorShutdown<<] Finally shutdown the thead pool: [{}]"    }));
34 }

Executors

考虑到 ThreadPoolExecutor 的构造函数实在是有些复杂,所以 Java 并发包里提供了一个线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。

但《阿里巴巴 Java 开发手册》中提到,禁止使用这些方法来创建线程池,而应该手动 new ThreadPoolExecutor 来创建线程池。最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。最典型的就是 newFixedThreadPool 和 newCachedThreadPool,可能因为资源耗尽导致 OOM 问题。

 

newCachedThreadPool

具有缓存性质的线程池,线程最大空闲时间60s,线程可重复利用,没有最大线程数限制。使用的是 SynchronousQueue 无容量阻塞队列,没有最大线程数限制。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。

高并发情况下,大量的任务进来后会创建大量的线程,导致OOM(无法创建本地线程):

1 [11:30:30.487] [http-nio-45678-exec-1] [ERROR] [.a.c.c.C.[.[.[/].[dispatcherServlet]:175 ] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; 
2     nested exception is java.lang.OutOfMemoryError: unable to create new native thread] with root cause
3 java.lang.OutOfMemoryError: unable to create native thread 

 

newFixedThreadPool

具有固定数量的线程池,核心线程数等于最大线程数,超出最大线程数进行等待。使用的是 LinkedBlockingQueue 无界阻塞队列。虽然使用 newFixedThreadPool 可以把工作线程控制在固定的数量上,但任务队列是无界的。如果任务较多并且执行较慢的话,队列可能会快速积压,撑爆内存导致 OOM。

如果一直往这个无界队列中添加任务,不久就会出现OOM异常(内存占满):

1 Exception in thread "http-nio-45678-ClientPoller" 
2     java.lang.OutOfMemoryError: GC overhead limit exceeded

 

newSingleThreadExecutor

核心线程数与最大线程数均为1,可用于当锁控制同步。使用的是 LinkedBlockingQueue 无界阻塞队列。

 

newScheduledThreadPool

具有时间调度性的线程池,必须初始化核心线程数。

没有最大线程数限制,线程最大空闲时间为0,空闲线程执行完即销毁。底层使用 DelayedWorkQueue 实现延迟特性。

线程池创建正确姿势

最后,总结一下,从如下的一些方面考虑如何正确地创建线程池。

线程池配置

我们需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求,一般都需要设置有界的工作队列和可控的线程数。

要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列:

  • 对于执行比较慢、数量不大的 IO 任务,要考虑更多的线程数,而不需要太大的队列。
  • 对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数 *2(理由是,线程一定调度到某个 CPU 进行执行,如果任务本身是 CPU 绑定的任务,那么过多的线程只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做缓冲。

 

任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量 CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题。

除了建议手动声明线程池以外,还建议用一些监控手段来观察线程池的状态。如果我们能提前观察到线程池队列的积压,或者线程数量的快速膨胀,往往可以提早发现并解决问题。

 

确认线程池本身是不是复用的

既然使用了线程池就需要确保线程池是在复用的,每次 new 一个线程池出来可能比不用线程池还糟糕。如果你没有直接声明线程池而是使用其他同学提供的类库来获得一个线程池,请务必查看源码,以确认线程池的实例化方式和配置是符合预期的。

 

斟酌线程池的混用策略

不要盲目复用线程池,别人定义的线程池属性不一定适合你的任务,而且混用会相互干扰。

另外,Java 8 的 parallel stream 背后是共享同一个 ForkJoinPool,默认并行度是 CPU 核数 -1。对于 CPU 绑定的任务来说,使用这样的配置比较合适,但如果集合操作涉及同步 IO 操作的话(比如数据库操作、外部服务调用等),建议自定义一个 ForkJoinPool(或普通线程池)。因此在使用 Java8 的并行流时,建议只用在计算密集型的任务,IO密集型的任务建议自定义线程池来提交任务,避免影响其它业务。

 

CommonExecutor

如下是我自己封装的一个线程池工具类,还提供了执行批量任务的方法,关于批量任务后面再单独写篇文章来介绍。

  1  org.hzero.core.util;
  2 
  3   4  java.util.Collections;
  5   6  java.util.Map;
  7 import java.util.concurrent.*  8  java.util.stream.Collectors;
  9  javax.annotation.Nonnull;
 10 
 11  com.google.common.collect.ImmutableMap;
 12  com.google.common.util.concurrent.ThreadFactoryBuilder;
 13  org.apache.commons.collections4.CollectionUtils;
 14  org.apache.commons.lang3.RandomUtils;
 15  org.slf4j.Logger;
 16  org.slf4j.LoggerFactory;
 17  org.springframework.dao.DuplicateKeyException;
 18 
 19  io.choerodon.core.exception.CommonException;
 20 
 21  org.hzero.core.base.BaseConstants;
 22 
 23  24  bojiangzhou 2020/02/24
 25   26  CommonExecutor {
 27 
 28     final Logger LOGGER = LoggerFactory.getLogger(CommonExecutor. 29 
 30     final ThreadPoolExecutor BASE_EXECUTOR;
 31 
 32      33         String executorName = "BaseExecutor" 34         BASE_EXECUTOR = buildThreadFirstExecutor(executorName);
 35         ExecutorManager.registerThreadPoolExecutor(executorName,BASE_EXECUTOR);
 36  37 
 38 
 39 
 40      41      * 获取默认构造的通用线程池,线程池核心是为 CPU 核数,最大线程数为 8倍 CPU 核数
 42      *
 43      *  ThreadPoolExecutor
 44       45      ThreadPoolExecutor getCommonExecutor() {
 46          BASE_EXECUTOR;
 47  48 
 49      50      * 构建线程优先的线程池
 51      * <p>
 52      * 线程池默认是当核心线程数满了后,将任务添加到工作队列中,当工作队列满了之后,再创建线程直到达到最大线程数。
 53  54  55      * 线程优先的线程池,就是在核心线程满了之后,继续创建线程,直到达到最大线程数之后,再把任务添加到工作队列中。
 56  57  58      * 此方法默认设置核心线程数为 CPU 核数,最大线程数为 8倍 CPU 核数,空闲线程超过 5 分钟销毁,工作队列大小为 65536。
 59  60  poolName        线程池名称
 61  62       63      ThreadPoolExecutor buildThreadFirstExecutor(String poolName) {
 64          CommonExecutor.getCpuProcessors();
 65         int maxSize = coreSize * 8 66         return buildThreadFirstExecutor(coreSize,maxSize,1 << 16 67  68 
 69      70  71  72  73  74  75  76  77  corePoolSize    核心线程数
 78  maximumPoolSize 最大线程数
 79  keepAliveTime   空闲线程的空闲时间
 80  unit            时间单位
 81  workQueueSize   工作队列容量大小
 82  83  84       85     static ThreadPoolExecutor buildThreadFirstExecutor( corePoolSize,1)"> 86                                                                maximumPoolSize,1)"> 87                                                                keepAliveTime,1)"> 88                                                               TimeUnit unit,1)"> 89                                                                workQueueSize,1)"> 90                                                               String poolName) {
 91          自定义队列,优先开启更多线程,而不是放入队列
 92         LinkedBlockingQueue<Runnable> queue = (workQueueSize) {
 93             long serialVersionUID = 5075561696269543041L 94 
 95             @Override
 96              offer(@Nonnull Runnable o) {
 97                 false;  造成队列已满的假象
 98  99         };
100 
101         102         RejectedExecutionHandler rejectedExecutionHandler = (runnable,1)">103             104                  任务拒绝时,通过 offer 放入队列
105                 queue.put(runnable);
106             } 107                 LOGGER.warn("{} Queue offer interrupted. "108 109 110 111 
112         ThreadPoolExecutor executor = 113                 corePoolSize,maximumPoolSize,1)">114                 keepAliveTime,unit,1)">115                 queue,1)">116                 117                         .setNameFormat(poolName + "-%d"118                         .setUncaughtExceptionHandler((Thread thread,Throwable throwable) ->119                             LOGGER.error("{} catching the uncaught exception,ThreadName: [{}]"120                         })
121                         .build(),1)">122                 rejectedExecutionHandler
123         );
124 
125         executor.allowCoreThreadTimeOut(126 
127         CommonExecutor.displayThreadPoolStatus(executor,1)">128         CommonExecutor.hookShutdownThreadPool(executor,1)">129 
130         ExecutorManager.registerThreadPoolExecutor(poolName,executor);
131 
132         133 134 
135     136      * 批量提交异步任务,使用默认的线程池
137 138  tasks 将任务转化为 AsyncTask 批量提交
139      140     static <T> List<T> batchExecuteAsync(List<AsyncTask<T>> tasks,@Nonnull String taskName) {
141          batchExecuteAsync(tasks,BASE_EXECUTOR,taskName);
142 143 
144     145      * 批量提交异步任务,执行失败可抛出异常或返回异常编码即可 <br>
146 147      * 需注意提交的异步任务无法控制事务,一般需容忍产生一些垃圾数据的情况下才能使用异步任务,异步任务执行失败将抛出异常,主线程可回滚事务.
148 149      * 异步任务失败后,将取消剩余的任务执行.
150 151  tasks    将任务转化为 AsyncTask 批量提交
152  executor 线程池,需自行根据业务场景创建相应的线程池
153  返回执行结果
154      155     static <T> List<T> batchExecuteAsync(@Nonnull List<AsyncTask<T>>156          (CollectionUtils.isEmpty(tasks)) {
157              Collections.emptyList();
158 159 
160         int size = tasks.size();
161 
162         List<Callable<T>> callables = tasks.stream().map(t -> (Callable<T>) () ->163             164                 T r = t.doExecute();
165 
166                 LOGGER.debug("[>>Executor<<] Async task execute success. ThreadName: [{}],BatchTaskName: [{}],SubTaskName: [{}]"167                         Thread.currentThread().getName(),taskName,t.taskName());
168                  r;
169             }  (Throwable e) {
170                 LOGGER.warn("[>>Executor<<] Async task execute error. ThreadName: [{}],SubTaskName: [{}],exception: {}"171 172                 throw e;
173 174         }).collect(Collectors.toList());
175 
176         CompletionService<T> cs = new ExecutorCompletionService<>(executor,1)">(size));
177         List<Future<T>> futures = (size);
178         LOGGER.info("[>>Executor<<] Start async tasks,TaskSize: [{}]"179 
180         for (Callable<T> task : callables) {
181             futures.add(cs.submit(task));
182 183 
184         List<T> resultList = 185         int i = 0; i < size; i++186             187                 Future<T> future = cs.poll(6188                 if (future != null189                     T result = future.get();
190                     resultList.add(result);
191                     LOGGER.debug("[>>Executor<<] Async task [{}] - [{}] execute success,result: {}"192                 } else193                     cancelTask(futures);
194                     LOGGER.error("[>>Executor<<] Async task [{}] - [{}] execute timeout,then cancel other tasks."195                      CommonException(BaseConstants.ErrorCode.TIMEOUT);
196 197             }  (ExecutionException e) {
198                 LOGGER.warn("[>>Executor<<] Async task [{}] - [{}] execute error,1)">199                 cancelTask(futures);
200                 Throwable throwable = e.getCause();
201                 if (throwable instanceof CommonException) {
202                      (CommonException) throwable;
203                 } else  DuplicateKeyException) {
204                      (DuplicateKeyException) throwable;
205                 } 206                     new CommonException("error.executorError"207 208             } 209 210                 Thread.currentThread().interrupt();  重置中断标识
211                 LOGGER.error("[>>Executor<<] Async task [{}] - [{}] were interrupted."212                  CommonException(BaseConstants.ErrorCode.ERROR);
213 214 215         LOGGER.info("[>>Executor<<] Finish async tasks,1)">216          resultList;
217 218 
219     220      * 根据一定周期输出线程池的状态
221 222 223 224      225      displayThreadPoolStatus(ThreadPoolExecutor threadPool,1)">226         displayThreadPoolStatus(threadPool,threadPoolName,RandomUtils.nextInt(60,600227 228 
229     230 231 232 233 234  period         周期
235  unit           时间单位
236      237     238         Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() ->239             String payload = "[>>ExecutorStatus<<] ThreadPool Name: [{}],Largest Pool Size: {},1)">240             Object[] params =  Object[]{threadPoolName,1)">241                     threadPool.isShutdown(),1)">242                     threadPool.getPoolSize(),1)">243                     threadPool.getLargestPoolSize(),1)"> 线程最大达到的数量
244                     threadPool.getActiveCount(),1)">245                     threadPool.getTaskCount(),1)">246                     threadPool.getCompletedTaskCount(),1)">247                     threadPool.getQueue().size()};
248 
249             if (threadPool.getQueue().remainingCapacity() < 64250                 LOGGER.warn(payload,params);
251             } 252                 LOGGER.info(payload,1)">253 254         },1)">255 256 
257     258      * 添加Hook在Jvm关闭时优雅的关闭线程池
259 260 261 262      263     264         Runtime.getRuntime().addShutdownHook(265             LOGGER.info("[>>ExecutorShutdown<<] Start to shutdown the thead pool: [{}]"266             267             threadPool.shutdown();
268             269                 270                 271                     threadPool.shutdownNow(); 272                     LOGGER.warn("[>>ExecutorShutdown<<] Interrupt the worker,1)">273 
274                     275                     276                         LOGGER.error("[>>ExecutorShutdown<<] Thread pool can't be shutdown even with interrupting worker threads,1)">277 278 279             } 280                 281                 threadPool.shutdownNow();
282                 LOGGER.error("[>>ExecutorShutdown<<] The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconsistent state. Please check the biz logs."283 
284                 285 286 287 
288             LOGGER.info("[>>ExecutorShutdown<<] Finally shutdown the thead pool: [{}]"289         }));
290 291 
292     293      * 获取返回CPU核数
294 295 296      297     298         299                 Runtime.getRuntime().availableProcessors() : 8300 301 
302     static <T> void cancelTask(List<Future<T>> futures) {
303         for (Future<T> future : futures) {
304             if (!future.isDone()) {
305                 future.cancel(306 307 308 309 
310      ExecutorManager {
311 
312         final ConcurrentHashMap<String,ThreadPoolExecutor> EXECUTORS = new ConcurrentHashMap<>(8313 
314         315          * 向管理器注册线程池
316          *
317          * 318  executor       ThreadPoolExecutor
319          320          registerThreadPoolExecutor(String threadPoolName,1)">321             EXECUTORS.put(threadPoolName,1)">322 323 
324         325          * 根据名称获取线程池
326 327 328          329          ThreadPoolExecutor getThreadPoolExecutor(String threadPoolName) {
330              EXECUTORS.get(threadPoolName);
331 332 
333         334          * 获取所有已注册的线程池
335 336 337          338          getAllThreadPoolExecutor() {
339              ImmutableMap.copyOf(EXECUTORS);
340 341 
342         343          * 根据名称移除已注册的线程池
344 345 346          347          removeThreadPoolExecutor(String threadPoolName) {
348             EXECUTORS.remove(threadPoolName);
349 350 351 
352 }

AsyncTask:

 java.util.UUID;
 4 
interface AsyncTask<T> 7     default String taskName() {
 UUID.randomUUID().toString();
10 
    T doExecute();
12 }

 

--------------------------------------------------------------------------------------------------------------

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


本文从从Bitcask存储模型讲起,谈轻量级KV系统设计与实现。从来没有最好的K-V系统,只有最适合应用业务实际场景的系统,做任何的方案选择,要结合业务当前的实际情况综合权衡,有所取有所舍。
内部的放到gitlab pages的博客,需要统计PV,不蒜子不能准确统计,原因在于gitlab的host设置了strict-origin-when-cross-origin, 导致不蒜子不能正确获取referer,从而PV只能统计到网站的PV。 为了方便统计页面的PV,这里简单的写了一个java程
PCM 自然界中的声音非常复杂,波形极其复杂,通常我们采用的是脉冲代码调制编码,即PCM编码。PCM通过抽样、量化、编码三个步骤将连续变化的模拟信号转换为数字编码。 采样率 采样频率,也称为采样速度或者采样率,定义了每秒从连续信号中提取并组成离散信号的采样个数,它用赫兹(Hz)来表示。采样频率的倒数
本文介绍如何离线生成sst并在线加载,提供一种用rocksdb建立分布式kv系统替换mongodb的思路
验证用户输入是否正确是我们应用程序中的常见功能。Spring提供了`@Valid`和@`Validated`两个注解来实现验证功能,本文详细介绍 [@Valid]和[@Validated]注解的区别 。
引入pdf2dom &lt;dependency&gt; &lt;groupId&gt;net.sf.cssbox&lt;/groupId&gt; &lt;artifactId&gt;pdf2dom&lt;/artifactId&gt; &lt;version&gt;1.8&lt;/version&
grafana 是一款非常优秀的可视化报表工具,有设计精良的可视化工具,今天来聊一聊如何将grafana集成到自己的应用中。 原理是: grafana允许iframe访问,开启auth.proxy, java 后端鉴权后代理grafana 前端通过iframe访问后端代理过的grafana graf
介绍 Call Graph是一款IDEA插件,用于可视化基于IntelliJ平台的IDE的函数调用图。 这个插件的目标是让代码更容易理解,有助于读懂和调试代码。当前只支持Java。针对Typescript、Javascript或Python工具,可以使用作者的另外一款工具Codemap(https:
原理 通过线程安全findAndModify 实现锁 实现 定义锁存储对象: /** * mongodb 分布式锁 */ @Data @NoArgsConstructor @AllArgsConstructor @Document(collection = &quot;distributed-loc
Singleton 单例模式 单例模式是确保每个应用程序只存在一个实例的机制。默认情况下,Spring将所有bean创建为单例。 你用@Autowired获取的bean,全局唯一。 @RestController public class LibraryController { @Autowired
pipeline 分布式任务调度器 目标: 基于docker的布式任务调度器, 比quartzs,xxl-job 更强大的分布式任务调度器。 可以将要执行的任务打包为docker镜像,或者选择已有镜像,自定义脚本程序,通过pipeline框架来实现调度。 开源地址: https://github.c
python训练的模型,转换为onnx模型后,用python代码可以方便进行推理,但是java代码如何实现呢? 首先ONNX 推理,可以使用`onnxruntime` ```xml com.microsoft.onnxruntime onnxruntime 1.15.1 ``` 另外,训练的模型需要
要获取内网地址,可以尝试连接到10.255.255.255:1。如果连接成功,获取本地套接字的地址信息就是当前的内网IP。 python实现: ```python import socket def extract_ip(): st = socket.socket(socket.AF_INET, s
为什么要有索引 gremlin 其实是一个逐级过滤的运行机制,比如下面的一个简单的gremlin查询语句: g.V().hasLabel(&quot;label&quot;).has(&quot;prop&quot;,&quot;value&quot;) 运行原理就是: 找出所有的顶点V 然后过滤出
最近在分析一个应用中的某个接口的耗时情况时,发现一个看起来极其普通的对象创建操作,竟然每次需要消耗 8ms 左右时间,分析后发现这个对象可以通过对象池模式进行优化,优化后此步耗时仅有 0.01ms。
点赞再看,动力无限。Hello world : ) 微信搜「 程序猿阿朗 」。 本文 Github.com/niumoo/JavaNotes 和 未读代码网站 已经收录,有很多知识点和系列文章。 此篇文章介绍 Java JMX 技术的相关概念和具体的使用方式。 当前文章属于Java 性能分析优化系列
如何将Java JAR 转化为 win/mac/linux 独立可执行程序?不需要预装 JRE 运行?
点赞再看,动力无限。 微信搜「 程序猿阿朗 」。 本文 Github.com/niumoo/JavaNotes 和 未读代码博客 已经收录,有很多知识点和系列文章。 Java 19 在2022 年 9 月 20 日正式发布,Java 19 不是一个长期支持版本,直到 2023 年 3 月它将被 JD
点赞再看,动力无限。Hello world : ) 微信搜「 程序猿阿朗 」。 本文 Github.com/niumoo/JavaNotes 和 未读代码博客 已经收录,有很多知识点和系列文章。 前言 Java 反编译,一听可能觉得高深莫测,其实反编译并不是什么特别高级的操作,Java 对于 Cla
JSON 对于开发者并不陌生,如今的 WEB 服务、移动应用、甚至物联网大多都是以 **JSON** 作为数据交换的格式。学习 JSON 格式的操作工具对开发者来说是必不可少的。这篇文章将介绍如何使用 **Jackson** 开源工具库对 JSON 进行常见操作。