文章后续于https://github.com/zgkaii/CS-Study-Notes更新,欢迎批评指正!
在 HotSpot VM
的线程模型中,Java 线程被一对一映射为内核线程。Java 在使用线程执行程序时,需要创建一个内核线程;当该 Java 线程被终止时,这个内核线程也会被回收。因此 Java 线程的创建与销毁将会消耗一定的计算机资源,从而增加系统的性能开销。
除此之外,大量创建线程同样也会给系统带来性能问题,因为内存和 CPU 资源都将被线程抢占,如果处理不当,就会发生内存溢出、CPU 使用率超负荷等问题。
为了解决上述两类问题,Java 提供了线程池概念,对于频繁创建线程的业务场景,线程池可以创建固定的线程数量,并且在操作系统底层,轻量级进程将会把这些线程映射到内核。
这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:
Java并发编程中,操作系统会调度所有线程并将它们分配给可用的CPU。 在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。
从图中可以看出,应用程序通过Executor框架控制上层的调度; 而下层的调度由操作系统内核控制,下层的调度不受应用程序的控 制。
Executor框架主要由3大部分组成如下:
Runnable
接口或Callable
接口。
Runnable
接口和Callable
接口的实现类,都可以被ThreadPoolExecutor
或ScheduledThreadPoolExecutor
执行.Executor
,以及继承自Executor的 ExecutorService
接口。Executor
框架有两个关键类实现了ExecutorService
接口 (ThreadPoolExecutor
和ScheduledThreadPoolExecutor
)。
ScheduledThreadPoolExecutor
用来定时执行任务; ThreadPoolExecutor
用来执行被提交的任务。Future
和 实现Future
接口的FutureTask
类。
Runnable
接口 或 Callable
接口 的实现类提交给 ThreadPoolExecutor
或 ScheduledThreadPoolExecutor
执行。(调用 submit()
方法时会返回一个 FutureTask
对象)(1)主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors
可以把 一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)
或 Executors.callable(Runnable task,Object resule)
)。
(2)然后可以把Runnable对象直接交给ExecutorService
执行 (ExecutorService.execute(Runnable command)
);或者也可以把Runnable对象或Callable 对象提交给ExecutorService
执行(Executor-Service.submit(Runnable task)
或 ExecutorService.submit(Callabletask)
)。
(3)如果执行 ExecutorService.submit(...)
,ExecutorService
将返回一个实现Future
接口的对象。由于 FutureTask
实现了Runnable,我们也可以创建 FutureTask
,然后直接交给 ExecutorService
执行。
(4)最后,主线程可以执行 FutureTask.get()
方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)
来取消此任务的执行。
这里先了解一下Executor框架的主要成员:Executor
、ExecutorService
、Future
接口、Runnable
接口、Callable
接口和工具类Executors
。至于核心实现类ThreadPoolExecutor
、 ScheduledThreadPoolExecutor
稍后讲解。
线程池从功能上看,Executor 就是一个任务执行器。它只有一个方法:void execute(Runnable command);
,用来执行可运行的任务。
ExecutorService
接口重要方法:
重要方法 | 说明 |
---|---|
void execute(Runnable command); |
执行可运行的任务 |
void shutdown(); |
关闭线程池 停止接收新任务,原来的任务继续执行 |
List<Runnable> shutdownNow(); |
立即关闭 停止接收新任务,原来的任务停止执行 |
Future<?> submit(Runnable task); |
提交任务; 允许获取执行结果 |
<T> Future<T> submit(Runnable task, T result); |
提交任务(指定结果); 控制|获取执行结果 |
<T> Future<T> submit(Callable<T> task); |
提交任务; 允许控制任务和获取执行结果 |
boolean awaitTermination(timeOut, unit); |
阻塞当前线程,返回是否线程都执行完 |
需要注意submit()与 execute()方法区别:
比较 | submit() | execute() |
---|---|---|
有无返回值 | 有返回值,用 Future 封装 根据返回值能判断任务是否被线程池成功执行 |
无返回值 无法判断任务是否被成功线程池执行 |
能否捕获异常 | 可在主线程中 get 捕获到 | 不能捕获 |
Future接口和实现Future接口的FutureTask
类用来表示异步计算的结果。当我们把 Runnable接口或Callable接口的实现类提交(submit)给ThreadPoolExecutor
或 ScheduledThreadPoolExecutor
时,ThreadPoolExecutor
或ScheduledThreadPoolExecutor
会向我们返回一个FutureTask
对象。下面是对应的API。
<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<> submit(Runnable task)
到目前最新的JDK 8为止,Java通过上述API返回的是一个 FutureTask
对象。但从API可以看到,Java仅仅保证返回的是一个实现了Future
接口的对象。
Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor
或ScheduledThreadPoolExecutor
执行。它们之间的区别是Runnable不会返回结果,而Callable可以返回结果。
除了可以自己创建实现Callable接口的对象外,还可以使用工厂类Executors来把一个 Runnable包装成一个Callable。 下面是Executors提供的,把一个Runnable包装成一个Callable的API。
public static Callable<Object> callable(Runnable task) // 假设返回对象Callable1
下面是Executors提供的,把一个Runnable和一个待返回的结果包装成一个Callable的 API。
public static <T> Callable<T> callable(Runnable task, T result) // 假设返回对象Callable2
当我们把一个Callable对象(比如上面的Callable1或Callable2)提交给 ThreadPoolExecutor
或ScheduledThreadPoolExecutor
执行时,submit()会向我们返回一 个FutureTask
对象。我们可以执行FutureTask.get()
方法来等待任务执行完成。当任务成功完 成后FutureTask.get()
将返回该任务的结果。例如,如果提交的是对象Callable1, FutureTask.get()
方法将返回null;如果提交的是对象Callable2,FutureTask.get()
方法将返回 result对象。
在java.util.concurrent.Executors
线程工厂类里面提供了一些静态工厂,实现了以下五种类型的 ThreadPoolExecutor
:
类型 | 特性 |
---|---|
newSingleThreadExecutor |
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。 |
newFixedThreadPool |
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程 |
newCachedThreadPool |
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。 |
newScheduledThreadPool |
创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求。 |
newWorkStealingPool() |
Java 8 才加入这个线程池,其内部会构建ForkJoinPool ,利用Work-Stealing算法,并行地处理任务,不保证处理顺序。 |
在生产环境下的实际场景中,一般不太推荐使用它们。因为选择使用 Executors 提供的工厂类实现的五种线程池,将会忽略很多线程池的参数设置,工厂类一旦选择设置默认参数,就很容易导致无法调优参数设置,从而产生性能问题或者资源浪费。这里建议使用 ThreadPoolExecutor
自我定制一套线程池。
线程池实现类 ThreadPoolExecutor
是 Executor
框架最核心的类。在了解线程池参数之前,我们先来了解一下线程池状态转换。查看ThreadPoolExecutor
源码,发现定义有如下几类状态:
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
也就是说,线程池有一下5种状态:
shutdown()
方法,不再接受新任务了,但是队列里的任务得执行完毕。shutdownNow()
方法,不再接受新任务,同时抛弃阻塞队列里的所有任务并中断所有正在执行任务。
shutdownNow()
时,线程池由(RUNNING or SHUTDOWN ) -> STOP。ctl
记录的”任务数量”为0,线程池会变为TIDYING状态。在调用 shudown()/shutdownNow()
中都会尝试更新为这个状态。
terminated()
后会更新为这个状态。为了搞懂线程池的原理,我们需要首先分析一下 execute
方法。下面来看看它的源码:
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 任务队列
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get(); // ------ 1
if (workerCountOf(c) < corePoolSize) {
// ------ 2
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
// ------ 3
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // ------ 4
reject(command);
else if (workerCountOf(recheck) == 0) // ------ 5
addWorker(null, false);
}
else if (!addWorker(command, false)) // ------ 6
reject(command);
}
细化一下,我们可以把execute()
执行过程分为6步:
corePoolSize
核心线程数,通过addWorker(command, true)
新建一个线程用于执行任务。addWorker(command, false)
新建一个线程用于执行任务。addWorker(command, false)
新建一个线程用于执行任务。如果队列已 满,就执行拒绝策略。如官方代码注释所描述,整体上线程池的处理流程分为三步:
通过下图可以更好的对上面这 3 步做一个展示:
线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。我们来看一下它的部分代码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}
Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask
。thread是在调用构造方法时通过ThreadFactory
来创建的线程,可以用来执行任务;firstTask
用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue
)中的任务,也就是非核心线程的创建。
Worker执行任务的模型如下图所示:
线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock
,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。
(1)lock方法一旦获取了独占锁,表示当前线程正在执行任务中。 (2)如果正在执行任务,则不应该中断线程。 (3)如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。(4)线程池在执行shutdown方法或tryTerminate
方法时会调用interruptIdleWorkers
方法来中断空闲的线程,interruptIdleWorkers
方法会使用tryLock
方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
在线程回收过程中就使用到了这种特性,回收过程如下图所示:
增加线程是通过线程池中的addWorker
方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。addWorker
方法有两个参数:firstTask
、core。firstTask
参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize
,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize
,其执行流程如下图所示:
线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。
try {
while (task != null || (task = getTask()) != null) {
//执行任务
}
} finally {
processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
}
线程回收的工作是在processWorkerExit
方法完成的。
事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。
在Worker类中的run方法调用了runWorker
方法来执行任务,runWorker
方法的执行过程如下:
getTask()
方法获取任务。getTask()
方法从阻塞队列中取任务。getTask
结果为null则跳出循环,执行processWorkerExit()
方法,销毁线程。执行流程如下图所示:
查看ThreadPoolExecutor
的构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// --- omit ---
}
ThreadPoolExecutor
3 个最重要的参数:
corePoolSize
: 线程池的核心线程数量。maximumPoolSize
: 线程池的最大线程数(核心线程数 + 非核心线程数 = 最大线程数量)。workQueue
: 任务队列,用来储存等待执行任务的队列。ThreadPoolExecutor
其他参数:
keepAliveTime
:当线程数大于核心线程数corePoolSize
时,多余出来的空闲线程存活的最长时间(多余出来的空闲线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime
会被回收销毁)。
unit
: keepAliveTime
参数的时间单位(时、分、秒、毫秒等)。
threadFactory
:线程工厂,用来创建线程,一般默认即可。
handler
:拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务。
我们还可以通过下面这张图来了解下线程池中各个参数的相互关系:
线程池有两个线程数的设置,一个为核心线程数,一个为最大线程数。如果当前的线程总个数 < corePoolSize
,那么新建的线程为核心线程,如果当前线程总个数 >= corePoolSize
,那么新建的线程为非核心线程。 核心线程默认会一直存活下去,即便是空闲状态,但是如果设置了allowCoreThreadTimeOut(true)
的话,那么核心线程空闲时间达到keepAliveTime
也将关闭。
在创建完线程池之后,默认情况下,线程池中并没有任何线程,等到有任务来才创建线程去执行任务。但有一种情况排除在外,就是调用 prestartAllCoreThreads()
或者 prestartCoreThread()
方法的话,可以提前创建等于核心线程数的线程数量,这种方式被称为预热,在抢购系统中就经常被用到。
当创建的线程数等于 corePoolSize
时,提交的任务会被加入到设置的阻塞队列中。当队列满了,会创建线程执行任务,直到线程池中的数量等于 maximumPoolSize
。
当线程数量已经等于 maximumPoolSize
时, 新提交的任务无法加入到等待队列,也无法创建非核心线程直接执行,我们又没有为线程池设置拒绝策略,这时线程池就会抛出 RejectedExecutionException
异常,即线程池拒绝接受这个任务。
当线程池中创建的线程数量超过设置的 corePoolSize
,在某些线程处理完任务后,如果等待 keepAliveTime
时间后仍然没有新的任务分配给它,那么这个线程将会被回收。线程池回收线程时,会对所谓的“核心线程”和“非核心线程”一视同仁,直到线程池中线程的数量等于设置的 corePoolSize
参数,回收过程才会停止。
ThreadPoolExecutor
拒绝策略定义:
如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。
ThreadPoolExecutor
已经提供了以下 4 种策略。
CallerRunsPolicy
:提交任务的线程自己去执行该任务。也就是说,直接在调用execute
方法的线程中运行(run
)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。AbortPolicy
:默认的拒绝策略,会 throws RejectedExecutionException
拒绝新任务的处理。DiscardPolicy
:直接丢弃任务,没有任何异常抛出。DiscardOldestPolicy
:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。BlockingQueue
是双缓冲队列。BlockingQueue
允许两个线程同时向队列一个存储,一个取出操作。在保证并发安全的同时,提高了队列的存取效率。常用的队列如下
ArrayBlockingQueue
:规定大小的 BlockingQueue
,其构造必须指定大小。其所含的对象是 FIFO 顺序排序的。
LinkedBlockingQueue
:大小不固定的 BlockingQueue
,若其构造时指定大小,生成的BlockingQueue
有大小限制,不指定大小,其大小有 Integer.MAX_VALUE
来决定。其所含的对象是 FIFO 顺序排序的。
PriorityBlockingQueue
:类似于 LinkedBlockingQueue
,但是其所含对象的排序不是 FIFO,而是依据对象的自然顺序或者构造函数的 Comparator 决定。
SynchronousQueue
:特殊的 BlockingQueue
,对其的操作必须是放和取交替完成,它支持公平访问队列。
ScheduledThreadPoolExecutor
主要用来在给定的延迟后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor
使用的任务队列 DelayQueue
封装了一个 PriorityQueue
,PriorityQueue
会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask
的 time
变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask
的 squenceNumber
变量小的先执行)。
ScheduledThreadPoolExecutor
和 Timer
的比较:
Timer
对系统时钟的变化敏感,ScheduledThreadPoolExecutor
不是;Timer
只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor
可以配置任意数量的线程。 此外,可以通过提供 ThreadFactory
完全控制创建的线程;TimerTask
中抛出的运行时异常会杀死一个线程,从而导致 Timer
死机:-( …即计划任务将不再运行。ScheduledThreadExecutor
不仅捕获运行时异常,还允许您在需要时处理它们(通过重写 afterExecute
方法ThreadPoolExecutor
)。抛出异常的任务将被取消,但其他任务将继续运行。在 JDK1.5 之后,你没有理由再使用 Timer 进行任务调度了。
ScheduledThreadPoolExecutor
的执行主要分为两大部分:
ScheduledThreadPoolExecutor
的 scheduleAtFixedRate()
方法或者**scheduleWirhFixedDelay()
** 方法时,会向 ScheduledThreadPoolExecutor
的 DelayQueue
添加一个实现了 RunnableScheduledFuture
接口的 ScheduledFutureTask
。DelayQueue
中获取 ScheduledFutureTask
,然后执行任务。ScheduledThreadPoolExecutor
为了实现周期性的执行任务,对 ThreadPoolExecutor
做了如下修改:
DelayQueue
作为任务队列;DelayQueue
中获取已到期的 ScheduledFutureTask(DelayQueue.take())
。到期任务是指 ScheduledFutureTask
的 time 大于等于当前系统的时间;ScheduledFutureTask
;ScheduledFutureTask
的 time 变量为下次将要被执行的时间;ScheduledFutureTask
放回 DelayQueue
中(DelayQueue.add()
)。FixedThreadPool
被称为可重用固定线程数的线程池。通过 Executors 类中的相关源代码来看一下相关实现:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
从上面源代码可以看出新创建的 FixedThreadPool
的 corePoolSize
和 maximumPoolSize
都被设置为 nThreads
,这个 nThreads
参数是我们使用的时候自己传递的。
FixedThreadPool
的 execute()
方法运行示意图:
上图说明:
corePoolSize
, 如果再来新任务的话,就创建新的线程来执行任务;corePoolSize
后, 如果再来新任务的话,会将任务加入 LinkedBlockingQueue
;LinkedBlockingQueue
中获取任务来执行;FixedThreadPool
?FixedThreadPool
使用无界队列 LinkedBlockingQueue
(队列的容量为 Intger.MAX_VALUE
)作为线程池的工作队列会对线程池带来如下影响 :
corePoolSize
后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize
;maximumPoolSize
将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool
的源码可以看出创建的 FixedThreadPool
的 corePoolSize
和 maximumPoolSize
被设置为同一个值。keepAliveTime
将是一个无效参数;FixedThreadPool
(未执行 shutdown()
或 shutdownNow()
)不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。SingleThreadExecutor
是只有一个线程的线程池。下面看看SingleThreadExecutor 的实现:
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
从上面源代码可以看出新创建的 SingleThreadExecutor
的 corePoolSize
和 maximumPoolSize
都被设置为 1。其他参数和 FixedThreadPool
相同。
SingleThreadExecutor
的运行示意图:
上图说明:
corePoolSize
,则创建一个新的线程执行任务;LinkedBlockingQueue
LinkedBlockingQueue
中获取任务来执行;SingleThreadExecutor
?SingleThreadExecutor
使用无界队列 LinkedBlockingQueue
作为线程池的工作队列(队列的容量为 Intger.MAX_VALUE
)。SingleThreadExecutor
使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool
相同。说简单点就是可能会导致 OOM,
CachedThreadPool
是一个会根据需要创建新线程的线程池。下面通过源码来看看 CachedThreadPool
的实现:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool
的corePoolSize
被设置为空(0),maximumPoolSize
被设置为 Integer.MAX.VALUE
,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool
中线程处理任务的速度时,CachedThreadPool
会不断创建新的线程。极端情况下,这样会导致耗尽CPU和内存资源。
CachedThreadPool
的 execute()方法的执行示意图:
上图说明:
SynchronousQueue.offer(Runnable task)
提交任务到任务队列。如果当前 maximumPool
中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
,那么主线程执行 offer 操作与空闲线程执行的 poll
操作配对成功,主线程把任务交给空闲线程执行,execute()
方法执行完成,否则执行下面的步骤 2;maximumPool
为空,或者 maximumPool
中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
。这种情况下,步骤 1 将失败,此时 CachedThreadPool
会创建新线程执行任务,execute 方法执行完成;CachedThreadPool
?CachedThreadPool
允许创建的线程数量为 Integer.MAX_VALUE
,可能会创建大量线程,从而导致 OOM。
线程池数量计算一直是困扰程序员的一个难题,创建线程池大小时,并不说越大越好,当然太小肯定也不好。
举个例子,假如要盖一套房子,1个人工作需要花100天。为了缩短工期,肯定要增加合作的人数。那直接增加到100人,就可以1天就盖好房子?肯定不是的,人越多沟通成本越高,消耗资源越多。就跟多线程并发编程场景一样,很多时候,过多的线程只会徒增资源的开销,增加了上下文切换成本。
当然,环境也具有多变性,设置一个绝对精准的线程数其实是不大可能的,但计算出一个合理的线程数,避免由于线程池设置不合理而导致的性能问题还是有迹可循的。下面我们就来看看具体的计算方法。
一般多线程执行的任务类型可以分为 CPU 密集型和 I/O 密集型,根据不同的任务类型,我们计算线程数的方法也不一样。
CPU 密集型任务:这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务:这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。单凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
在4 核 intel i5 CPU 机器上对Github代码进行了测试:
可以看到,在CPU密集型任务中,当线程数量太小,同一时间大量请求将被阻塞在线程队列中排队等待执行线程,此时 CPU 没有得到充分利用;当线程数量太大,被创建的执行线程同时在争取 CPU 资源,又会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。通过测试可知,4~6 个线程数是最合适的。
在IO密集型任务中,看到每个线程所花费的时间。当线程数量在 8 时,线程平均执行时间是最佳的,这个线程数量和我们的计算公式所得的结果就差不多。
在平常的应用场景中,我们常常遇不到这两种极端情况,那么碰上一些常规的业务操作,比如,通过一个线程池实现向用户定时推送消息的业务,我们又该如何设置线程池的数量呢?
此时我们可以参考以下公式来计算线程数:
线程数=N(CPU核数)*(1+WT(线程等待时间)/ST(线程时间运行时间))
我们可以通过 JDK 自带的工具 VisualVM
来查看 WT/ST
比例,以下例子是基于运行纯 CPU
运算的例子,我们可以看到:
WT(线程等待时间)= 36788ms [线程运行总时间] - 36788ms[ST(线程时间运行时间)]= 0
线程数=N(CPU核数)*(1+ 0 [WT(线程等待时间)]/36788ms[ST(线程时间运行时间)])= N(CPU核数)
这跟之前通过 CPU 密集型的计算公式 N+1 所得出的结果差不多。
综合来看,我们可以根据自己的业务场景,从“N+1”和“2N”两个公式中选出一个适合的,计算出一个大概的线程数量,之后通过实际压测,逐渐往“增大线程数量”和“减小线程数量”这两个方向调整,然后观察整体的处理时间变化,最终确定一个具体的线程数量。
除了参数动态化之外,为了更好地使用线程池,我们需要对线程池的运行状况有感知,比如当前线程池的负载是怎么样的?分配的资源够不够用?任务的执行情况是怎么样的?是长任务还是短任务?基于对这些问题的思考,动态化线程池提供了多个维度的监控和告警能力,包括:线程池活跃度、任务的执行Transaction(频率、耗时)、Reject异常、线程池内部统计信息等等,既能帮助用户从多个维度分析线程池的使用情况,又能在出现问题第一时间通知到用户,从而避免故障或加速故障恢复。
线程池负载关注的核心问题是:基于当前线程池参数分配的资源够不够。对于这个问题,我们可以从事前和事中两个角度来看。事前,线程池定义了“活跃度”这个概念,来让用户在发生Reject异常之前能够感知线程池负载问题,线程池活跃度计算公式为:线程池活跃度 = activeCount/maximumPoolSize
。这个公式代表当活跃线程数趋向于maximumPoolSize
的时候,代表线程负载趋高。事中,也可以从两方面来看线程池的过载判定条件,一个是发生了Reject异常,一个是队列中有等待任务(支持定制阈值)。以上两种情况发生了都会触发告警,告警信息会通过大象推送给服务所关联的负责人。
在传统的线程池应用场景中,线程池中的任务执行情况对于用户来说是透明的。比如在一个具体的业务场景中,业务开发申请了一个线程池同时用于执行两种任务,一个是发消息任务、一个是发短信任务,这两类任务实际执行的频率和时长对于用户来说没有一个直观的感受,很可能这两类任务不适合共享一个线程池,但是由于用户无法感知,因此也无从优化。动态化线程池内部实现了任务级别的埋点,且允许为不同的业务任务指定具有业务含义的名称,线程池内部基于这个名称做Transaction打点,基于这个功能,用户可以看到线程池内部任务级别的执行情况,且区分业务,任务监控示意图如下图所示:
用户基于JDK原生线程池ThreadPoolExecutor
提供的几个public的getter方法,可以读取到当前线程池的运行状态以及参数,如下图所示:
动态化线程池基于这几个接口封装了运行时状态实时查看的功能,用户基于这个功能可以了解线程池的实时状态,比如当前有多少个工作线程,执行了多少个任务,队列中等待的任务数等等。效果如下图所示:
初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。默认情况下创建的线程名字类似 pool-1-thread-n
这样的,没有业务含义,不利于我们定位问题。
给线程池里的线程命名通常有下面两种方式:
**1. 利用 guava 的 ThreadFactoryBuilder
**
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "-%d")
.setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize
, keepAliveTime, TimeUnit.MINUTES
, workQueue, threadFactory)
2. 自己实现 ThreadFactor
。
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程工厂,它设置线程名称,有利于我们定位问题。
*/
public final class NamingThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger();
private final ThreadFactory delegate;
private final String name;
/**
* 创建一个带名字的线程池生产工厂
*/
public NamingThreadFactory(ThreadFactory delegate, String name) {
this.delegate = delegate;
this.name = name; // TODO consider uniquifying this
}
@Override
public Thread newThread(Runnable r) {
Thread t = delegate.newThread(r);
t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
return t;
}
}
动态化线程池的核心设计包括以下三个方面:
corePoolSize
、maximumPoolSize
,workQueue
,它们最大程度地决定了线程池的任务分配和线程分配策略。考虑到在实际应用中我们获取并发性的场景主要是两种:(1)并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。(2)并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求,Less is More。动态化线程池提供如下功能:
动态调参:支持线程池参数动态调整、界面化操作;包括修改线程池核心大小、最大核心大小、队列长度等;参数修改后及时生效。 任务监控:支持应用粒度、线程池粒度、任务粒度的Transaction监控;可以看到线程池的任务执行情况、最大任务执行时间、平均任务执行时间、95/99线等。 负载告警:线程池队列任务积压到一定值的时候会通过大象(美团内部通讯工具)告知应用开发负责人;当线程池负载数达到一定阈值的时候会通过大象告知应用开发负责人。 操作监控:创建/修改和删除线程池都会通知到应用的开发负责人。 操作日志:可以查看线程池参数的修改记录,谁在什么时候修改了线程池参数、修改前的参数值是什么。 权限校验:只有应用开发负责人才能够修改应用的线程池参数。
参数动态化
JDK原生线程池ThreadPoolExecutor
提供了如下几个public的setter方法,如下图所示:
JDK允许线程池使用方通过ThreadPoolExecutor
的实例来动态设置线程池的核心策略,以setCorePoolSize
为方法例,在运行期线程池使用方调用此方法设置corePoolSize
之后,线程池会直接覆盖原来的corePoolSize
值,并且基于当前值和原始值的比较结果采取不同的处理策略。对于当前值小于当前工作线程数的情况,说明有多余的worker线程,此时会向当前idle的worker线程发起中断请求以实现回收,多余的worker在下次idel
的时候也会被回收;对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务,setCorePoolSize
具体流程如下:
线程池内部会处理好当前状态做到平滑修改,其他几个方法限于篇幅,这里不一一介绍。重点是基于这几个public方法,我们只需要维护ThreadPoolExecutor
的实例,并且在需要修改的时候拿到实例修改其参数即可。基于以上的思路,我们实现了线程池参数的动态化、线程池参数在管理平台可配置可修改,其效果图如下图所示:
用户可以在管理平台上通过线程池的名字找到指定的线程池,然后对其参数进行修改,保存后会实时生效。目前支持的动态参数包括核心数、最大值、队列长度等。除此之外,在界面中,我们还能看到用户可以配置是否开启告警、队列等待任务告警阈值、活跃度告警等等。关于监控和告警,我们下面一节会对齐进行介绍。
在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。使用线程池会带来的好处有:(1)降低资源消耗;(2)提高响应速度;(3)提高线程的可管理性。
Executor框架中ScheduledThreadPoolExecutor
用来定时执行任务, ThreadPoolExecutor
用来执行被提交的任务。其中线程池实现类 ThreadPoolExecutor
是 Executor
框架最核心的类,在日常开发中应用较多。
另外,《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor
构造函数的方式。这时因为Executors创建线程池会有资源耗尽的风险。
FixedThreadPool
和SingleThreadExecutor
: 允许请求的队列长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。CachedThreadPool
和ScheduledThreadPool
: 允许创建的线程数量为Integer.MAX_VALUE
,可能会创建大量线程,从而导致 OOM。
因此,使用线程池时,我们一定要根据场景和需求配置合理的线程数、任务队列、拒绝策略、线程回收策略,并对线程进行明确的命名方便排查问题。
下拉刷新与上拉加载 在使用列表组件展示数据的时候,更新数据的交互曾经是一个没有定论的问题,有留一个刷新按钮的,有按时自动刷新的,还有根本不刷新的。但是随着移动平台的普及,移动应用的用户群越来越大,数据刷新的交互就慢慢固定下来了,而在各种交互方式中脱颖而出的一种就是人们熟悉的“下拉刷新”。 下拉刷新是个很简单也很友好的交互方式,列表滚动到顶端后可以强制下拉一段,拉出来的多余部分会显示一些提示,
这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必不可少的KaTeX数学公式新的甘特图功能,丰富你的文章UML 图表FLowchart流程图导出与导入导出导入欢迎使用Markdown编辑器你好! 这是你第一次使用 Markdown编辑器 所展示的欢迎页。如果你想学习如何使用Mar
这一节稍微复杂 angularjs 是用MVC 模式,sgit checkout -f step-2
CLASStorch.nn.Conv1d(in_channels: int, out_channels: int, kernel_size: Union[T, Tuple[T]], stride: Union[T, Tuple[T]] = 1, padding: Union[T, Tuple[T]] = 0, dilation: Union[T, Tuple[T]] = 1, groups: int = 1, bias: bool = True, padding_mode: str = ‘zeros’)
TCP/IP协议与UDP协议的区别
1.函数可以没有返回值案例,编写一个函数,从终端输入一个整数(层)打印出对应的金子塔。2.编写一个函数,从终端输入一个整数(1—9),打印出对应的乘法表3.定义函数,实现求两个double数字的最大值,并返回4.定义函数,求出三个int 类型数的和,并返回#include <stdio.h>//层数 totalLevel 是通过形参传入void printSta...
目录1.Objectives:2.Experiment Content:3.Experiment Principle:4.Experiment Steps Result and Conlusion:1、操作使用 mat2lpc, lpc2mat, entropy, imratio, compare 等函数,了解其作用和特点。2、Huffman 编码3、编写无损预测编解码(lossless pred...
一、 奇偶校验码、 二、 奇偶校验码 特点、 三、 奇偶校验码 示例、 四、 CRC 循环冗余码 ( 原理说明 )、 五、 CRC 循环冗余码 计算示例、
感谢武总:https://blog.csdn.net/wutong_login/article/details/90257697目前互联网上的视频直播有两种,一种是基于RTMP协议的直播,这种直播方式上行推流使用RTMP协议,下行播放使用RTMP,HTTP+FLV或者HLS,直播延时一般大于3秒,广泛应用秀场、游戏、赛事和事件直播,满足了对交互要求不高的场景;另一种是WebRTC协议的直播,这种直播方式使用UDP的协议进行流媒体的分发,直播延时小于1秒,同时连接数一般小于10个,主要应用在视频通话、秀场
配置数据库5.7安装资料8.0服务端下载mysql文件解压。管理员模式进入cmd,移动到目录mysql\bin安装命令mysqld -install如果出现 Install/Remove of the Service Denied! 则是没有使用管理员权限打开CMD初次使用配置mysqld --initialize运行完成后会在mysql路径下生成data文件夹,找到一个电脑用户名.err文件可以看到root和初始密码配置启动文件在mysql路径下新建my.
IDLE(An Integrated DeveLopment Environment for Python)是Python自带的编译器,在初学者,或写小程序,或用于验证的时候,经常用到!如果能熟练掌握其快捷键的话,无疑能提高工作效率。下面汇总下其常用的快捷键:------------------------------------------ALT+3 : 多行注释ALT+4 : 取消多行注释ALT+P : 翻出上一条命令, 类似于向上的箭头ALT+N : 翻出下一条命令, 类似于向下的箭
1.集成开发环境(IDE,Integrated Development Environment )是用于提供程序开发环境的应用程序,一般包括代码编辑器、编译器、调试器和图形用户界面等工具。集成了代码编写功能、分析功能、编译功能、调试功能等一体化的开发软件服务套。2.环境变量和JDK的关系 与JDK工具的关系:Path 变量中必须有一个值指向JDK的bin与JDK类库的关系:CLASSPATH 必须指向JDK的lib【但是前面要加.; 表示优先在当前目录找】Path是操作系统用的,用来找工具或程序