1.线程池基本概念
线程池(Thread Pool)是⼀种基于池化思想管理线程的共具
2.为什么引入线程池
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
2.1.池化思想
- 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎⽚。
- 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。
- 实例池(Object Pooling):循环使⽤对象,减少资源在初始化和释放时的昂贵损耗
3.ThreadPoolExecutor基本设置
3.1ThreadPoolExecutor创建时参数信息
-
corePoolSize(核心线程数):线程池维护线程的最少数量
-
maximumPoolSize(最大线程数):线程池中允许的最大线程数。
-
keepAliveTime(存活时间):当线程数大于核心线程数时,这是多余空闲线程在终止前等待新任务的最长时间。注意,只有当线程数大于
corePoolSize
时,这个参数才会生效。 -
unit(时间单位):
keepAliveTime
参数的时间单位,例如TimeUnit.SECONDS
、TimeUnit.MINUTES
等。 -
workQueue(工作队列):用于保存等待执行的任务的阻塞队列。这个队列的容量、是否有界等特性会直接影响线程池的行为。Java 提供了几种阻塞队列的实现,如
ArrayBlockingQueue
、LinkedBlockingQueue
、SynchronousQueue
等。 -
threadFactory(线程工厂):用于创建新线程的工厂,它允许应用程序定制线程的创建过程。通过自定义的
ThreadFactory
,你可以设置线程的优先级、守护线程状态、名称等。如果不指定,则使用默认的线程工厂。 -
handler(拒绝策略):当线程池和队列都满了,无法再处理新任务时,就会用到拒绝策略。Java 提供了四种拒绝策略的实现:
AbortPolicy
:直接抛出RejectedExecutionException
异常。CallerRunsPolicy
:使用调用者的线程来执行任务。DiscardOldestPolicy
:丢弃队列中最老的任务,然后尝试重新提交被拒绝的任务。DiscardPolicy
:直接丢弃无法处理的任务,不抛出异常也不执行。
3.2ThreadPoolExecutor执行情况
-
回收线程时机:线程数 > corePoolSize(核心线程数)
-
创建线程时机:队列满了,并且已创建的线程数 < maximumPoolSize(最大线程数)
-
存活时间参数生效:线程数 > corePoolSize(核心线程数)也就是线程销毁的死亡倒计时
3.3ThreadPoolExecutor运行状况
-
RUNNING
接受新提交任务 也能处理阻塞队列的任务 -
SHUTDOWN
不接受新提交的任务 但是可以处理阻塞队列已保存的任务 -
STOP
不接受新任务 也不处理队列中的任务 会中断处理任务的进程 -
TIDYING
所有任务都已终止 workerCount有效线程数为0 -
TERMINATED
执行terminated方法执行完成后进入该状态
4.ThreadPoolExecutor是如何运行
⛏任务区
-
提交任务,执行任务分配
-
选择任务分配方式
缓冲执行
直接执行
任务拒绝
⚙线程区
-
线程分配
-
线程回收
4.1线程池运行机制
线程池如何维护自身状态
线程池的运行状态是伴随线程池的运行,由内部进行维护的,线程池将运行状态
runState
和 线程数量workerCount
维护在一起private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))
AtomicInteger
类型保存 线程池信息 高3位保存线程池运行状态 低29位保存线程数量private static int runStateOf(int c) { return c & ~CAPACITY; } //计算当前运⾏状态 private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量 private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数⽣成ctl
线程池如何管理任务
任务执行机制
任务调度
💡所有的任务调度都是由execute()方法执行完成的 执行过程
-
检查线程池运行状态,如果不是RUNNING 直接拒绝,线程池要保证在RUNNING的状态下执行任务。
-
如果workerCount < corePoolSize,则创建并启动⼀个线程来执行新提交的任务。
-
如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
-
如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动⼀个线程来执行新提交的任务。
-
如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认处理方式就是直接抛出异常
任务缓冲
使用阻塞队列作为缓冲区,生产者和消费者通过队列实现任务读取
ArrayBlockingQueue
:数组实现的有界阻塞队列,FIFO排序,支持公平锁和非公平锁
LinkedBlockingQueue
:链表实现 FIFO排序 默认长度为Integer.MAX_VALUE
PriorityBlockingQueue
:支持线程优先级队列,可以自定义实现compareTo()方法排序
DelayQueue
:基于PriorityBlockingQueue,创建元素时指定时间获取当前元素
SynchronousQueue
:不存储元素的阻塞队列,每一个put操作都需要等待take操作 否则不能添加元素,支持公平锁和非公平锁
LinkedTransferQueue
:链表构成的无界阻塞队列,相比其他队列多了transfer和tryTransfer方法
LinkedBlockingDeque
:一个由链表结构组成的双向阻塞队列,头尾都可以增删元素,多线程并发时,可以将锁的竞争最多降低到一般
任务申请
第一种情况:线程初始创建的时候, 是任务直接由新创建的线程执行
第二种情况:线程获取任务绝大多数情况, 线程从任务 队列中获取任务然后执行,执⾏完任务的空闲线程会再次去从队列中申请任务再去执行
线程池获取任务
getTask
这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回 null
值。工作线程 Worker
会不断接收新任务去执行,而当工作线程 Worker
接收不到任务的时候,就会开始被回收。
任务拒绝
拒绝策略
AbortPolicy
:丢弃任务并且抛出RejectedExecutionException异常,线程池默认拒绝策略,反馈程序运行状态
DiscardPolicy
:丢弃任务并且不会抛出异常
DiscardOldestPolicy
:丢弃队列最前面的任务,重新提交被拒绝的任务,就是是否丢弃老任务
CallerRunsPolicy
:当任务添加到线程池中被拒绝时,不是将任务丢弃,也不是抛出异常,而是将任务回退到调用者线程中执行。这种策略可以看作是一种调节机制,既不会抛弃任务,也不会因为线程池无法接受新任务而抛出异常,而是尝试在调用者线程中直接执行这些任务,从而减轻线程池的压力
线程池如何管理线程
Worker线程管理
Worker线程
: 线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}
Worker线程初始化了一个 Thread
和 firstTask
,如果 firstTask
不为空,那么 Thread
线程就会立即执行初始化任务,对应核心线程创建,如果为空则会创建线程执行任务列表的任务,即非核心线程的创建
Worker是通过继承 AQS
,使⽤ AQS
来实现独占锁这个功能。没有使用可重入锁 ReentrantLock
,而是使用 AQS
,使用非可重入式锁的特性反映线程现在的执行状态
可重入性: 是指一个线程可以多次获取同一个锁。在Java的
ReentrantLock
中,就体现了这种可重入性。当一个线程已经持有锁时,它可以再次请求该锁而不会被阻塞,直到它释放锁为止。这种特性非常有用,因为它允许一个线程在执行过程中递归地调用需要相同锁保护的方法,而不会导致死锁
不可重入性:如果Worker
类中的AQS实现不是可重入的,那么这意味着一旦一个线程获得了锁,它就不能再次获得该锁,直到它释放了锁。这与ReentrantLock
的可重入性相反。在不可重入锁的情况下,如果线程尝试在已经持有锁的情况下再次获取锁,它将被阻塞或导致死锁(取决于具体的实现和锁的策略)。
线程池使用⼀张 Hash表
去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程 是否在运行,使用 AQS
获取不可重入锁,如果获取成功,即可进行线程回收操作
Worker线程的增加
增加Worker线程是通过 addWorker
方法,该方法只关心增加线程,不考虑哪个阶段增加的线程,该方法有两个参数:firstTask
、core
firstTask
:指定线程执行的第一个任务,可以为null
core
: true:新增线程时会判断当前活动线程数是否少于corePoolSize,表示创建的是核心线程,false:新增线程时判断当前线程数是否少于maximumPoolSize,表示创建的是非核心线程
Worker线程销毁
线程池中线程的销毁依赖 JVM
自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被 JVM
回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker
被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker
无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。调用 processWorkerExit
方法执行销毁过程
Worker执行任务过程
5.业务实现
5.1创建自定义线程池
@Bean("threadPoolExecutor01") //指定线程池名称
public ThreadPoolExecutor threadPoolExecutor01(ThreadPoolConfigProperties properties) {
RejectedExecutionHandler handler;
switch (properties.getPolicy()){ //选择策略
case "DiscardPolicy":
handler = new ThreadPoolExecutor.DiscardPolicy();
break;
case "DiscardOldestPolicy":
handler = new ThreadPoolExecutor.DiscardOldestPolicy();
break;
case "CallerRunsPolicy":
handler = new ThreadPoolExecutor.CallerRunsPolicy();
break;
default:
handler = new ThreadPoolExecutor.AbortPolicy();
break;
}
// 创建线程池
// 可以配置Properties参数并注入
// 记得添加@EnableConfigurationProperties(ThreadPoolConfigProperties.class)注解
return new ThreadPoolExecutor(properties.getCorePoolSize(),//核心线程数
properties.getMaxPoolSize(), //最大线程数
properties.getKeepAliveTime(), //最长等待任务时间
TimeUnit.SECONDS, //时间单位
new LinkedBlockingQueue<>(properties.getBlockQueueSize()), //等待队列
Executors.defaultThreadFactory(), //线程工厂
handler); //处理策略
}
ThreadPool这篇文章导入有问题,可以访问语雀获得清晰版本https://www.yuque.com/yuqueyonghusjpwwe/vd1w2z/qn606zbdrmzyxfzp
或者联系下方邮箱直接发源文件