原文: https://chenmingyu.top/concurrent-threadpool/
线程池用来处理异步任务或者并发执行的任务
优点:
java
中线程池使用ThreadPoolExecutor
实现
ThreadPoolExecutor
提供了四个构造函数,其他三个构造函数最终调用的都是下面这个构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
入参:
corePoolSize
:线程池的核心线程数量
线程池维护的核心线程数量,当线程池初始化后,核心线程数量为零,当有任务来到的时候才会创建线程去执行任务,当线程池中的工作线程数量等于核心线程数量时,新到的任务就会放到缓存队列中
maximumPoolSize
:线程池允许创建的最大线程数量
当阻塞队列满了的时候,并且线程池中创建的线程数量小于maximumPoolSize
,此时会创建新的线程执行任务
keepAliveTime
:线程活动保持时间
只有当线程池数量大于核心线程数量时,keepAliveTime
才会有效,如果当前线程数量大于核心线程数量时,并且线程的空闲时间达到keepAliveTime
,当前线程终止,直到线程池数量等于核心线程数
unit
:线程活动保持时间的单位
keepAliveTime
的单位,包括:TimeUnit.DAYS
天,TimeUnit.HOURS
小时,TimeUnit.MINUTES
分钟,TimeUnit.SECONDS
秒,TimeUnit.MILLISECONDS
毫秒,TimeUnit.MICROSECONDS
微秒,TimeUnit.NANOSECONDS
纳秒
workQueue
:任务队列,用来保存等待执行任务的阻塞队列
ArrayBlockingQueue
:是一个基于数组结构的有界队列
LinkedBlockingQueue
:是一个基于链表结构的阻塞队列
SynchronousQueue
:不存储元素的阻塞队列,每一个插入操作必须等到下一个线程调用移除操作,否则插入操作一直阻塞
PriorityBlockingQueue
:一个具有优先级的无线阻塞队列
threadFactory
:用来创建线程的工厂
handler
:饱和策略,当线程池和队列都满了的时候,必须要采取一种策略处理新的任务,默认策略是AbortPolicy
,根据自己需求选择合适的饱和策略
AbortPolicy
:直接抛出异常
CallerRunsPolicy
:用调用者所在的线程来运行当前任务
DiscardOldestPolicy
:丢弃队列里面最近的一个任务,并执行当前任务
DiscardPolicy
:不处理,丢弃掉
当然我们也可以通过实现RejectedExecutionHandler
去自定义实现处理策略
入参不同,线程池的运行机制也不同,了解每个入参的含义由于我们更透传的理解线程池的实现原理
线程池处理提交任务流程如下
处理流程:
ThreadPoolExecutor
使用execute(Runnable command)
和submit(Runnable task)
向线程池中提交任务,在submit(Runnable task)
方法中调用了execute(Runnable command)
,所以我们只要了解execute(Runnable command)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取线程池状态,并且可以通过ctl获取到当前线程池数量及线程池状态
int c = ctl.get();
// 如果工作线程数小于核心线程数量,则创建一个新线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果不符合上面条件,当前线程处于运行状态并且写入阻塞队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 双重检查,再次获取线程状态,如果当前线程状态变为非运行状态,则从队列中移除任务,执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 检查工作线程数量是否为0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//创建线程执行任务,如果添加失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
execute(Runnable command)
方法中我们比较关心的就是如何创建新的线程执行任务,就addWorker(command, true)
方法
workQueue.offer(command)
方法是用来向阻塞队列中添加任务的
reject(command)
方法会根据创建线程池时传入的饱和策略对任务进行处理,例如默认的AbortPolicy
,查看源码后知道就是直接抛了个RejectedExecutionException
异常,其他的饱和策略的源码也是特别简单
关于线程池状态与工作线程的数量是如何表示的
在ThreadPoolExecutor
中使用一个AtomicInteger
类型变量表示
/**
* ctl表示两个信息,一个是线程池的状态(高3位表示),一个是当前线程池的数量(低29位表示),这个跟我们前面 * 说过的读写锁的state变量是一样的,以一个变量记录两个信息,都是以利用int的32个字节,高十六位表述读,低十 * 六位表示写锁
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//低29位保存线程池数量
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池最大容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 运行状态存储在高3位
// 运行状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
addWorker(command, boolean)
创建工作线程,执行任务
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 线程池状态
int rs = runStateOf(c);
// 判断线程池状态,以及阻塞队列是否为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程工作线程数量
int wc = workerCountOf(c);
// 判断是否大于最大容量,以及根据传入的core判断是否大于核心线程数量还是最大线程数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加工作线程数量
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//如果线程池状态改变,则重试
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建Worker,内部创建了一个新的线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 线程池状态判断
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将创建的线程添加到线程池
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//执行任务,首先会执行Worker对象的firstTask
t.start();
workerStarted = true;
}
}
} finally {
//如果任务执行失败
if (! workerStarted)
//移除worker
addWorkerFailed(w);
}
return workerStarted;
}
ThreadPoolExecutor
中关闭线程池使用shutdown()
和shutdownNow()
方法,原理都是通过遍历线程池中的线程,对线程进行中断
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
Executor
框架将任务的提交与任务的执行进行分离
Executors
提供了一系列工厂方法用于创先线程池,返回的线程池都实现了 ExecutorService
接口
工厂方法:
newFixedThreadPool
:用于创建固定数目线程的线程池newCachedThreadPool
:用于创建一个可缓存的线程池,调用execute将重用以前构造的线程,如果现有线程没有可用的,则创建一个新线 程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程newSingleThreadExecutor
:用于创建只有一个线程的线程池newScheduledThreadPool
:用于创建一个支持定时及周期性的任务执行的线程池在阿里巴巴手册中强制要求禁止使用Executors
提供的工厂方法创建线程池
这个确实是一个很严重的问题,我们部门曾经就出现过使用FixedThreadPool
线程池,导致OOM,这是因为线程执行任务的时候被阻塞或耗时很长时间,导致阻塞队列一直在添加任务,直到内存被打满,报OOM
所以我们在使用线程池的时候要使用ThreadPoolExecutor
的构造函数去创建线程池,根据自己的任务类型来确定核心线程数和最大线程数,选择适合阻塞队列和阻塞队列的长度
合理的配置线程池需要分析一下任务的性质(使用ThreadPoolExecutor
创建线程池):
CPU密集型任务应配置竟可能小的线程,比如 cpu数量+1
IO密集型任务并不是一直在执行任务,应该配置尽可能多的线程,比如 cpu数量x2
可通过Runtime.getRuntime().availableProcessors()
获取cpu数量
执行的任务有调用外部接口比较费时的时候,这时cup空闲的时间就越长,可以将线程池数量设置大一些,这样cup空闲的时间就可以去执行别的任务
建议使用有界队列,可根据需要将长度设置大一些,防止OOM
参考:java并发编程的艺术
推荐阅读:
java并发编程 | 锁详解:AQS,Lock,ReentrantLock,ReentrantReadWriteLock
文章浏览阅读119次。该楼层疑似违规已被系统折叠隐藏此楼查看此楼/***Getaparametervalue**@paramkeyString*@paramdefString*@returnString*/publicStringgetParameter(Stringkey,Stringdef){returnisStandalone?System.getProperty(ke..._java http隧道
文章浏览阅读913次。IP主机名备注192.168.117.14keepalived-master主节点192.168.117.15keepalived-slaver备节点192.168.117.100VIP1.主备节点均安装keepalived# yum install -y keepalived httpd2.主备节点均修改keepalived日志存放路径..._keepalived sendmail
文章浏览阅读469次。--==========================================--SPFILE错误导致数据库无法启动(ORA-01565)--========================================== SPFILE错误导致数据库无法启动 SQL> startup ORA-01078: failurein proce_ora01565 ora27046
文章浏览阅读6.1k次,点赞2次,收藏54次。功能测试基础知识总结_功能测试
文章浏览阅读3.2k次,点赞3次,收藏2次。pg 中文首字母排序_pg中文排序
文章浏览阅读3.1w次,点赞23次,收藏109次。本文主要讲解CONVERT函数_mysql convert
文章浏览阅读8.6k次,点赞2次,收藏2次。HTML5 的视频播放事件想必大家已经期待很久了吧,在HTML4.1、4.0之前我们如果在网页上播放视频无外乎两种方法: 第一种:安装FLASH插件或者微软发布的插件 第二种:在本地安装播放器,在线播放组件之类的 因为并不是所有的浏览器都安装了FLASH插件,就算安装也不一定所有的都能安装成功。像苹果系统就是默认禁用FLASH的,安卓虽然一开始的时候支持FLASH,但是在安卓4.0以后也开始不_微信开发者工具视频快进
文章浏览阅读5.4k次,点赞3次,收藏4次。在使用redis的过程常见错误总结1.JedisConnectionException Connection Reset参考这边文章:Connection reset原因分析和解决方案https://blog.csdn.net/cwclw/article/details/527971311.1问题描述Exception in thread "main" redis.clients...._jedisconnectionexception: java.net.socketexception: connection reset
文章浏览阅读8.3k次,点赞8次,收藏42次。目录1.Lua垃圾回收算法原理简述2.Lua垃圾回收中的三种颜色3.Lua垃圾回收详细过程4.步骤源码详解4.1新建对象阶段4.2触发条件4.3 GC函数状态机4.4标记阶段4.5清除阶段5.总结参考资料lua垃圾回收(Garbage Collect)是lua中一个比较重要的部分。由于lua源码版本变迁,目前大多数有关这个方面的文章都还是基于lua5.1版本,有一定的滞后性。因此本文通过参考当前..._lua5.3 gc
文章浏览阅读511次。最近家中的潮人,老妈闲着没事干,开始学玩电脑,引起他的各种好奇心。如看看新闻,上上微信或做做其他的事情。但意料之中的是电脑上会莫名出现各种问题?不翼而飞的图标?照片又不见了?文件被删了,卡机或者黑屏,无声音了,等等问题。常常让她束手无策,求助于我,可惜在电话中说不清,往往只能苦等我回家后才能解决,那种开心乐趣一下子消失了。想想,这样也不是办法啊, 于是,我潜心寻找了两款优秀的远程控制软件。两款软件...
文章浏览阅读1.8k次。二.初始化工作空间三.设置下载地址四.下载功能包此处可能会报错,请看:rosdep update遇到ERROR: error loading sources list: The read operation timed out问题_DD᭄ꦿng的博客-程序员宅基地接下来一次安装所有功能包,注意对应ROS版本 五.编译功能包isolated:单独编译各个功能包,每个功能包之间不产生依赖。编译过程时间比较长,可能需要几分钟时间。此处可能会报错:缺少absl依赖包_ros18.04 安装ca
文章浏览阅读4.1k次,点赞3次,收藏7次。Haobor2.2.1配置(trivy扫描器、镜像签名)docker-compose下载https://github.com/docker/compose/releases安装cp docker-compose /usr/local/binchmod +x /usr/local/bin/docker-composeharbor下载https://github.com/goharbor/harbor/releases解压tar xf xxx.tgx配置harbor根下建立:mkd_init error: db error: failed to download vulnerability db: database download