今天跟大家聊一聊无论是在工作中常用还是在面试中常问的线程池,通过画图的方式来彻底弄懂线程池的工作原理,以及在实际项目中该如何制定适合业务的线程池。
一、什么是线程池
线程池其实是一种池化的技术的实现,池化技术的核心思想其实就是实现资源的一个复用,避免资源的重复创建和销毁带来的性能开销。在线程池中,线程池可以管理一堆线程,让线程执行完任务之后不会进行销毁,而是继续去处理其它线程已经提交的任务。
使用线程池的好处
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统 的稳定性,使用线程池可以进行统一的分配,调优和监控。
二、线程池的构造
Java中主要是通过构建ThreadPoolExecutor来创建线程池的。接下来我们看一下线程池是如何构造出来的
ThreadPoolExecutor的构造方法
- corePoolSize:线程池中用来工作的核心的线程数量。
- maximumPoolSize:最大线程数,线程池允许创建的最大线程数。
- keepAliveTime:超出 corePoolSize 后创建的线程存活时间或者是所有线程最大存活时间,取决于配置。
- unit:keepAliveTime 的时间单位。
- workQueue:任务队列,是一个阻塞队列,当线程数已达到核心线程数,会将任务存储在阻塞队列中。
- threadFactory :线程池内部创建线程所用的工厂。
- handler:拒绝策略;当队列已满并且线程数量达到最大线程数量时,会调用该方法处理该任务。
线程池的构造其实很简单,就是传入一堆参数,然后进行简单的赋值操作。
三、线程池的运行原理
说完线程池的核心构造参数的意思,接下来就来画图讲解这些参数在线程池中是如何工作的。
线程池刚创建出来是什么样子呢,如下图
不错,刚创建出来的线程池中只有一个构造时传入的阻塞队列而已,此时里面并没有的任何线程,但是如果你想要在执行之前已经创建好核心线程数,可以调用prestartAllCoreThreads方法来实现,默认是没有线程的。
当有线程通过execute方法提交了一个任务,会发生什么呢?
提交任务的时候,其实会去进行任务的处理
首先会去判断当前线程池的线程数是否小于核心线程数,也就是线程池构造时传入的参数corePoolSize。
如果小于,那么就直接通过ThreadFactory创建一个线程来执行这个任务,如图
当任务执行完之后,线程不会退出,而是会去从阻塞队列中获取任务,如下图
接下来如果又提交了一个任务,也会按照上述的步骤,去判断是否小于核心线程数,如果小于,还是会创建线程来执行任务,执行完之后也会从阻塞队列中获取任务。这里有个细节,就是提交任务的时候,就算有线程池里的线程从阻塞队列中获取不到任务,如果线程池里的线程数还是小于核心线程数,那么依然会继续创建线程,而不是复用已有的线程。
如果线程池里的线程数不再小于核心线程数呢?那么此时就会尝试将任务放入阻塞队列中,入队成功之后,如图
这样在阻塞的线程就可以获取到任务了。
但是,随着任务越来越多,队列已经满了,任务放入失败了,那怎么办呢?
此时就会判断当前线程池里的线程数是否小于最大线程数,也就是入参时的maximumPoolSize参数
如果小于最大线程数,那么也会创建非核心线程来执行提交的任务,如图
所以,从这里可以发现,就算队列中有任务,新创建的线程还是优先处理这个提交的任务,而不是从队列中获取已有的任务执行,从这可以看出,先提交的任务不一定先执行。
但是不幸的事发生了,线程数已经达到了最大线程数量,那么此时会怎么办呢?
此时就会执行拒绝策略,也就是构造线程池的时候,传入的RejectedExecutionHandler对象,来处理这个任务。
RejectedExecutionHandler的实现JDK自带的默认有4种
- AbortPolicy:丢弃任务,抛出运行时异常
- CallerRunsPolicy:由提交任务的线程来执行任务
- DiscardPolicy:丢弃这个任务,但是不抛异常
- DiscardOldestPolicy:从队列中剔除最先进入队列的任务,然后再次提交任务
线程池创建的时候,如果不指定拒绝策略就默认是AbortPolicy策略。当然,你也可以自己实现RejectedExecutionHandler接口,比如将任务存在数据库或者缓存中,这样就数据库或者缓存中获取到被拒绝掉的任务了。
到这里,我们发现,线程池构造的几个参数corePoolSize、maximumPoolSize、workQueue、threadFactory、handler我们都在上述的执行过程中讲到了,那么还差两个参数keepAliveTime和unit(unit是keepAliveTime的时间单位)没讲到,所以keepAliveTime是如何起到作用的呢,这个问题留到后面分析。
说完整个执行的流程,接下来看看execute方法代码是如何实现的。
- workerCountOf(c)<corePoolSize:这行代码就是判断是否小于核心线程数,是的话就通过addWorker方法,addWorker就是添加线程来执行任务。
- workQueue.offer(command):这行代码就表示尝试往阻塞队列中添加任务
- 添加失败之后就会再次调用addWorker方法尝试添加非核心线程来执行任务
- 如果还是添加非核心线程失败了,那么就会调用reject(command)来拒绝这个任务。
最后再来另画一张图总结execute执行流程
四、线程池中线程实现复用的原理
线程池的核心功能就是实现了线程的重复利用,那么线程池是如何实现线程的复用呢?
线程在线程池内部其实是被封装成一个Worker对象
Worker继承了AQS,也就是有一定锁的特性。
创建线程来执行任务的方法上面提到是通过addWorker方法创建的。在创建Worker对象的时候,会把线程和任务一起封装到Worker内部,然后调用runWorker方法来让线程执行任务,接下来我们就来看一下runWorker方法。
从这张图可以看出线程执行完任务不会退出的原因,runWorker内部使用了while死循环,当第一个任务执行完之后,会不断地通过getTask方法获取任务,只要能获取到任务,就会调用run方法,继续执行任务,这就是线程能够复用的主要原因。
但是如果从getTask获取不到方法的时候,最后就会调用finally中的processWorkerExit方法,来将线程退出。
这里有个一个细节就是,因为Worker继承了AQS,每次在执行任务之前都会调用Worker的lock方法,执行完任务之后,会调用unlock方法,这样做的目的就可以通过Woker的加锁状态就能判断出当前线程是否正在运行任务。如果想知道线程是否正在运行任务,只需要调用Woker的tryLock方法,根据是否加锁成功就能判断,加锁成功说明当前线程没有加锁,也就没有执行任务了,在调用shutdown方法关闭线程池的时候,就用这种方式来判断线程有没有在执行任务,如果没有的话,来尝试打断没有执行任务的线程。
五、线程是如何获取任务的以及如何实现超时的
上一节我们说到,线程在执行完任务之后,会继续从getTask方法中获取任务,获取不到就会退出。接下来我们就来看一看getTask方法的实现。
getTask方法,前面就是线程池的一些状态的判断,这里有一行代码
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
这行代码是判断,当前过来获取任务的线程是否可以超时退出。如果allowCoreThreadTimeOut设置为true或者线程池当前的线程数大于核心线程数,也就是corePoolSize,那么该获取任务的线程就可以超时退出。
那是怎么做到超时退出呢,就是这行核心代码
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
会根据是否允许超时来选择调用阻塞队列workQueue的poll方法或者take方法。如果允许超时,则会调用poll方法,传入keepAliveTime,也就是构造线程池时传入的空闲时间,这个方法的意思就是从队列中阻塞keepAliveTime时间来获取任务,获取不到就会返回null;如果不允许超时,就会调用take方法,这个方法会一直阻塞获取任务,直到从队列中获取到任务位置。从这里可以看到keepAliveTime是如何使用的了。
所以到这里应该就知道线程池中的线程为什么可以做到空闲一定时间就退出了吧。其实最主要的是利用了阻塞队列的poll方法的实现,这个方法可以指定超时时间,一旦线程达到了keepAliveTime还没有获取到任务,那么就会返回null,上一小节提到,getTask方法返回null,线程就会退出。
这里也有一个细节,就是判断当前获取任务的线程是否可以超时退出的时候,如果将allowCoreThreadTimeOut设置为true,那么所有线程走到这个timed都是true,那么所有的线程,包括核心线程都可以做到超时退出。如果你的线程池需要将核心线程超时退出,那么可以通过allowCoreThreadTimeOut方法将allowCoreThreadTimeOut变量设置为true。
整个getTask方法以及线程超时退出的机制如图所示
六、线程池的5种状态
线程池内部有5个常量来代表线程池的五种状态
- RUNNING:线程池创建时就是这个状态,能够接收新任务,以及对已添加的任务进行处理。
- SHUTDOWN:调用shutdown方法线程池就会转换成SHUTDOWN状态,此时线程池不再接收新任务,但能继续处理已添加的任务到队列中任务。
- STOP:调用shutdownNow方法线程池就会转换成STOP状态,不接收新任务,也不能继续处理已添加的任务到队列中任务,并且会尝试中断正在处理的任务的线程。
- TIDYING:
- SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态。
- 线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池会变为 TIDYING 状态。
- 线程池在 STOP 状态,线程池中执行中任务为空时,线程池会变为 TIDYING 状态。
- TERMINATED:线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会转变为 TERMINATED 状态。
线程池状态具体是存在ctl成员变量中,ctl中不仅存储了线程池的状态还存储了当前线程池中线程数的大小
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
最后画个图来总结一下这5种状态的流转
其实,在线程池运行过程中,绝大多数操作执行前都得判断当前线程池处于哪种状态,再来决定是否继续执行该操作。
七、线程池的关闭
线程池提供了shutdown和shutdownNow两个方法来关闭线程池。
shutdown方法
就是将线程池的状态修改为SHUTDOWN,然后尝试打断空闲的线程(如何判断空闲,上面在说Worker继承AQS的时候说过),也就是在阻塞等待任务的线程。
shutdownNow方法
就是将线程池的状态修改为STOP,然后尝试打断所有的线程,从阻塞队列中移除剩余的任务,这也是为什么shutdownNow不能执行剩余任务的原因。
所以也可以看出shutdown方法和shutdownNow方法的主要区别就是,shutdown之后还能处理在队列中的任务,shutdownNow直接就将任务从队列中移除,线程池里的线程就不再处理了。
八、线程池的监控
在项目中使用线程池的时候,一般需要对线程池进行监控,方便出问题的时候进行查看。线程池本身提供了一些方法来获取线程池的运行状态。
- getCompletedTaskCount:已经执行完成的任务数量
- getLargestPoolSize:线程池里曾经创建过的最大的线程数量。这个主要是用来判断线程是否满过。
- getActiveCount:获取正在执行任务的线程数据
- getPoolSize:获取当前线程池中线程数量的大小
除了线程池提供的上述已经实现的方法,同时线程池也预留了很对扩展方法。比如在runWorker方法里面,在执行任务之前会回调beforeExecute方法,执行任务之后会回调afterExecute方法,而这些方法默认都是空实现,你可以自己继承ThreadPoolExecutor来扩展重写这些方法,来实现自己想要的功能。
九、Executors构建线程池以及问题分析
JDK内部提供了Executors这个工具类,来快速的创建线程池。
1)固定线程数量的线程池:核心线程数与最大线程数相等
2)单个线程数量的线程池
3)接近无限大线程数量的线程池
4)带定时调度功能的线程池
虽然JDK提供了快速创建线程池的方法,但是其实不推荐使用Executors来创建线程池,因为从上面构造线程池可以看出,newFixedThreadPool线程池,由于使用了LinkedBlockingQueue,队列的容量默认是无限大,实际使用中出现任务过多时会导致内存溢出;newCachedThreadPool线程池由于核心线程数无限大,当任务过多的时候,会导致创建大量的线程,可能机器负载过高,可能会导致服务宕机。
十、线程池的使用场景
在java程序中,其实经常需要用到多线程来处理一些业务,但是不建议单纯使用继承Thread或者实现Runnable接口的方式来创建线程,那样就会导致频繁创建及销毁线程,同时创建过多的线程也可能引发资源耗尽的风险。所以在这种情况下,使用线程池是一种更合理的选择,方便管理任务,实现了线程的重复利用。所以线程池一般适合那种需要异步或者多线程处理任务的场景。
十一、实际项目中如何合理的自定义线程池
通过上面分析提到,通过Executors这个工具类来创建的线程池其实都无法满足实际的使用场景,那么在实际的项目中,到底该如何构造线程池呢,该如何合理的设置参数?
1)线程数
线程数的设置主要取决于业务是IO密集型还是CPU密集型。
CPU密集型指的是任务主要使用来进行大量的计算,没有什么导致线程阻塞。一般这种场景的线程数设置为CPU核心数+1。
IO密集型:当执行任务需要大量的io,比如磁盘io,网络io,可能会存在大量的阻塞,所以在IO密集型任务中使用多线程可以大大地加速任务的处理。一般线程数设置为 2*CPU核心数
java中用来获取CPU核心数的方法是:Runtime.getRuntime().availableProcessors();
2)线程工厂
一般建议自定义线程工厂,构建线程的时候设置线程的名称,这样就在查日志的时候就方便知道是哪个线程执行的代码。
3)有界队列
一般需要设置有界队列的大小,比如LinkedBlockingQueue在构造的时候就可以传入参数,来限制队列中任务数据的大小,这样就不会因为无限往队列中扔任务导致系统的oom。
日常开发中,为了更好管理线程资源,减少创建线程和销毁线程的资源损耗,我们会使用线程池来执行一些异步任务。但是线程池使用不当,就可能会引发生产事故。今天田螺哥跟大家聊聊线程池的10个坑。大家看完肯定会有帮助的~
- 线程池默认使用无界队列,任务过多导致OOM
- 线程创建过多,导致OOM
- 共享线程池,次要逻辑拖垮主要逻辑
- 线程池拒绝策略的坑
- Spring内部线程池的坑
- 使用线程池时,没有自定义命名
- 线程池参数设置不合理
- 线程池异常处理的坑
- 使用完线程池忘记关闭
- ThreadLocal与线程池搭配,线程复用,导致信息错乱。
- github地址,麻烦给个star鼓励一下,感谢感谢
- 公众号:捡田螺的小男孩(欢迎关注,干货多多)
1.线程池默认使用无界队列,任务过多导致OOM
JDK开发者提供了线程池的实现类,我们基于Executors组件,就可以快速创建一个线程池。日常工作中,一些小伙伴为了开发效率,反手就用Executors新建个线程池。写出类似以下的代码:
/**
* 公众号;捡田螺的小男孩
*/
public class NewFixedTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executor.execute(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
//do nothing
}
});
}
}
}
复制代码
使用newFixedThreadPool创建的线程池,是会有坑的,它默认是无界的阻塞队列,如果任务过多,会导致OOM问题。 运行一下以上代码,出现了OOM。
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at com.example.dto.NewFixedTest.main(NewFixedTest.java:14)
复制代码
这是因为newFixedThreadPool使用了无界的阻塞队列的LinkedBlockingQueue,如果线程获取一个任务后,任务的执行时间比较长(比如,上面demo代码设置了10秒),会导致队列的任务越积越多,导致机器内存使用不停飙升, 最终出现OOM。
看下newFixedThreadPool的相关源码,是可以看到一个无界的阻塞队列的,如下:
//阻塞队列是LinkedBlockingQueue,并且是使用的是无参构造函数
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//无参构造函数,默认最大容量是Integer.MAX_VALUE,相当于无界的阻塞队列的了
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
复制代码
因此,工作中,建议大家自定义线程池,并使用指定长度的阻塞队列。
2. 线程池创建线程过多,导致OOM
有些小伙伴说,既然Executors组件创建出的线程池newFixedThreadPool,使用的是无界队列,可能会导致OOM。那么,Executors组件还可以创建别的线程池,如newCachedThreadPool,我们用它也不行嘛?
我们可以看下newCachedThreadPool的构造函数:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
它的最大线程数是Integer.MAX_VALUE。大家应该意识到使用它,可能会引发什么问题了吧。没错,如果创建了大量的线程也有可能引发OOM!
笔者在以前公司,遇到这么一个OOM问题:一个第三方提供的包,是直接使用new Thread实现多线程的。在某个夜深人静的夜晚,我们的监控系统报警了。。。这个相关的业务请求瞬间特别多,监控系统告警OOM了。
所以我们使用线程池的时候,还要当心线程创建过多,导致OOM问题。大家尽量不要使用newCachedThreadPool,并且如果自定义线程池时,要注意一下最大线程数。
3. 共享线程池,次要逻辑拖垮主要逻辑
要避免所有的业务逻辑共享一个线程池。比如你用线程池A来做登录异步通知,又用线程池A来做对账。如下图:
如果对账任务checkBillService响应时间过慢,会占据大量的线程池资源,可能直接导致没有足够的线程资源去执行loginNotifyService的任务,最后影响登录。就这样,因为一个次要服务,影响到重要的登录接口,显然这是绝对不允许的。因此,我们不能将所有的业务一锅炖,都共享一个线程池,因为这样做,风险太高了,犹如所有鸡蛋放到一个篮子里。应当做线程池隔离!
4. 线程池拒绝策略的坑,使用不当导致阻塞
我们知道线程池主要有四种拒绝策略,如下:
- AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。(默认拒绝策略)
- DiscardPolicy:丢弃任务,但是不抛出异常。
- DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务。
- CallerRunsPolicy:由调用方线程处理该任务。
如果线程池拒绝策略设置不合理,就容易有坑。我们把拒绝策略设置为DiscardPolicy或DiscardOldestPolicy并且在被拒绝的任务,Future对象调用get()方法,那么调用线程会一直被阻塞。
我们来看个demo:
/**
* 关注公众号:捡田螺的小男孩
*/
public class DiscardThreadPoolTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 一个核心线程,队列最大为1,最大线程数也是1.拒绝策略是DiscardPolicy
ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy());
Future f1 = executorService.submit(()-> {
System.out.println("提交任务1");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Future f2 = executorService.submit(()->{
System.out.println("提交任务2");
});
Future f3 = executorService.submit(()->{
System.out.println("提交任务3");
});
System.out.println("任务1完成 " + f1.get());// 等待任务1执行完毕
System.out.println("任务2完成" + f2.get());// 等待任务2执行完毕
System.out.println("任务3完成" + f3.get());// 等待任务3执行完毕
executorService.shutdown();// 关闭线程池,阻塞直到所有任务执行完毕
}
}
复制代码
运行结果:一直在运行中。。。
这是因为DiscardPolicy拒绝策略,是什么都没做,源码如下:
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
复制代码
我们再来看看线程池 submit 的方法:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
//把Runnable任务包装为Future对象
RunnableFuture<Void> ftask = newTaskFor(task, null);
//执行任务
execute(ftask);
//返回Future对象
return ftask;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; //Future的初始化状态是New
}
复制代码
我们再来看看Future的get() 方法
//状态大于COMPLETING,才会返回,要不然都会阻塞等待
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
FutureTask的状态枚举
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
复制代码
阻塞的真相水落石出啦,FutureTask的状态大于COMPLETING才会返回,要不然都会一直阻塞等待。又因为拒绝策略啥没做,没有修改FutureTask的状态,因此FutureTask的状态一直是NEW,所以它不会返回,会一直等待。
这个问题,可以使用别的拒绝策略,比如CallerRunsPolicy,它让主线程去执行拒绝的任务,会更新FutureTask状态。如果确实想用DiscardPolicy,则需要重写DiscardPolicy的拒绝策略。
温馨提示,日常开发中,使用 Future.get() 时,尽量使用带超时时间的,因为它是阻塞的。
future.get(1, TimeUnit.SECONDS);
复制代码
难道使用别的拒绝策略,就万无一失了嘛? 不是的,如果使用CallerRunsPolicy拒绝策略,它表示拒绝的任务给调用方线程用,如果这是主线程,那会不会可能也导致主线程阻塞呢?总结起来,大家日常开发的时候,多一份心眼把,多一点思考吧。
5. Spring内部线程池的坑
工作中,个别开发者,为了快速开发,喜欢直接用spring的@Async,来执行异步任务。
@Async
public void testAsync() throws InterruptedException {
System.out.println("处理异步任务");
TimeUnit.SECONDS.sleep(new Random().nextInt(100));
}
复制代码
Spring内部线程池,其实是SimpleAsyncTaskExecutor,这玩意有点坑,它不会复用线程的,它的设计初衷就是执行大量的短时间的任务。有兴趣的小伙伴,可以去看看它的源码:
/**
* {@link TaskExecutor} implementation that fires up a new Thread for each task,
* executing it asynchronously.
*
* <p>Supports limiting concurrent threads through the "concurrencyLimit"
* bean property. By default, the number of concurrent threads is unlimited.
*
* <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
* thread-pooling TaskExecutor implementation instead, in particular for
* executing a large number of short-lived tasks.
*
* @author Juergen Hoeller
* @since 2.0
* @see #setConcurrencyLimit
* @see SyncTaskExecutor
* @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
* @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
*/
@SuppressWarnings("serial")
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncListenableTaskExecutor, Serializable {
......
}
复制代码
也就是说来了一个请求,就会新建一个线程!大家使用spring的@Async时,要避开这个坑,自己再定义一个线程池。正例如下:
@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor=new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setThreadNamePrefix("tianluo-%d");
// 其他参数设置
return new ThreadPoolTaskExecutor();
}
复制代码
6. 使用线程池时,没有自定义命名
使用线程池时,如果没有给线程池一个有意义的名称,将不好排查回溯问题。这不算一个坑吧,只能说给以后排查埋坑,哈哈。我还是单独把它放出来算一个点,因为个人觉得这个还是比较重要的。反例如下:
/**
* 关注公众号:捡田螺的小男孩
*/
public class ThreadTest {
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executorOne = new ThreadPoolExecutor(5, 5, 1,
TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(20));
executorOne.execute(()->{
System.out.println("关注公众号:捡田螺的小男孩");
throw new NullPointerException();
});
}
}
复制代码
运行结果:
关注公众号:捡田螺的小男孩
Exception in thread "pool-1-thread-1" java.lang.NullPointerException
at com.example.dto.ThreadTest.lambda$main$0(ThreadTest.java:17)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
复制代码
可以发现,默认打印的线程池名字是pool-1-thread-1,如果排查问题起来,并不友好。因此建议大家给自己线程池自定义个容易识别的名字。其实用CustomizableThreadFactory即可,正例如下:
public class ThreadTest {
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executorOne = new ThreadPoolExecutor(5, 5, 1,
TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(20),new CustomizableThreadFactory("Tianluo-Thread-pool"));
executorOne.execute(()->{
System.out.println("关注公众号:捡田螺的小男孩");
throw new NullPointerException();
});
}
}
复制代码
7. 线程池参数设置不合理
线程池最容易出坑的地方,就是线程参数设置不合理。比如核心线程设置多少合理,最大线程池设置多少合理等等。当然,这块不是乱设置的,需要结合具体业务。
比如线程池如何调优,如何确认最佳线程数?
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
复制代码
我们的服务器CPU核数为8核,一个任务线程cpu耗时为20ms,线程等待(网络IO、磁盘IO)耗时80ms,那最佳线程数目:( 80 + 20 )/20 * 8 = 40。也就是设置 40个线程数最佳。
有兴趣的小伙伴,也可以看这篇文章哈: 线程池到底设置多少线程比较合适?
对于线程池参数,如果小伙伴还有疑惑的话,可以看我之前这篇文章哈:Java线程池解析
8. 线程池异常处理的坑
我们来看段代码:
/**
* 关注公众号:捡田螺的小男孩
*/
public class ThreadTest {
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executorOne = new ThreadPoolExecutor(5, 5, 1,
TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(20),new CustomizableThreadFactory("Tianluo-Thread-pool"));
for (int i = 0; i < 5; i++) {
executorOne.submit(()->{
System.out.println("current thread name" + Thread.currentThread().getName());
Object object = null;
System.out.print("result## " + object.toString());
});
}
}
}
复制代码
按道理,运行这块代码应该抛空指针异常才是的,对吧。但是,运行结果却是这样的;
current thread nameTianluo-Thread-pool1
current thread nameTianluo-Thread-pool2
current thread nameTianluo-Thread-pool3
current thread nameTianluo-Thread-pool4
current thread nameTianluo-Thread-pool5
复制代码
这是因为使用submit提交任务,不会把异常直接这样抛出来。大家有兴趣的话,可以去看看源码。可以改为execute方法执行,当然最好就是try...catch捕获,如下:
/**
* 关注公众号:捡田螺的小男孩
*/
public class ThreadTest {
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executorOne = new ThreadPoolExecutor(5, 5, 1,
TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(20),new CustomizableThreadFactory("Tianluo-Thread-pool"));
for (int i = 0; i < 5; i++) {
executorOne.submit(()->{
System.out.println("current thread name" + Thread.currentThread().getName());
try {
Object object = null;
System.out.print("result## " + object.toString());
}catch (Exception e){
System.out.println("异常了"+e);
}
});
}
}
}
复制代码
其实,我们还可以为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常。大家知道这个坑就好啦。
9. 线程池使用完毕后,忘记关闭
如果线程池使用完,忘记关闭的话,有可能会导致内存泄露问题。所以,大家使用完线程池后,记得关闭一下。同时,线程池最好也设计成单例模式,给它一个好的命名,以方便排查问题。
public class ThreadTest {
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executorOne = new ThreadPoolExecutor(5, 5, 1,
TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(20), new CustomizableThreadFactory("Tianluo-Thread-pool"));
executorOne.execute(() -> {
System.out.println("关注公众号:捡田螺的小男孩");
});
//关闭线程池
executorOne.shutdown();
}
}
复制代码
10. ThreadLocal与线程池搭配,线程复用,导致信息错乱。
使用ThreadLocal缓存信息,如果配合线程池一起,有可能出现信息错乱的情况。先看下一下例子:
private static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);
@GetMapping("wrong")
public Map wrong(@RequestParam("userId") Integer userId) {
//设置用户信息之前先查询一次ThreadLocal中的用户信息
String before = Thread.currentThread().getName() + ":" + currentUser.get();
//设置用户信息到ThreadLocal
currentUser.set(userId);
//设置用户信息之后再查询一次ThreadLocal中的用户信息
String after = Thread.currentThread().getName() + ":" + currentUser.get();
//汇总输出两次查询结果
Map result = new HashMap();
result.put("before", before);
result.put("after", after);
return result;
}
复制代码
按理说,每次获取的before应该都是null,但是呢,程序运行在 Tomcat 中,执行程序的线程是Tomcat的工作线程,而Tomcat的工作线程是基于线程池的。
线程池会重用固定的几个线程,一旦线程重用,那么很可能首次从 ThreadLocal 获取的值是之前其他用户的请求遗留的值。这时,ThreadLocal 中的用户信息就是其他用户的信息。
把tomcat的工作线程设置为1
server.tomcat.max-threads=1
复制代码
用户1,请求过来,会有以下结果,符合预期:
用户2请求过来,会有以下结果,「不符合预期」:
因此,使用类似 ThreadLocal 工具来存放一些数据时,需要特别注意在代码运行完后,显式地去清空设置的数据,正例如下:
@GetMapping("right")
public Map right(@RequestParam("userId") Integer userId) {
String before = Thread.currentThread().getName() + ":" + currentUser.get();
currentUser.set(userId);
try {
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map result = new HashMap();
result.put("before", before);
result.put("after", after);
return result;
} finally {
//在finally代码块中删除ThreadLocal中的数据,确保数据不串
currentUser.remove();
}
}