Java线程池的使用
线程池的使用
1.创建线程池
Java线程池的实现是java.util.concurrentThreadPoolExecutor类。先看该类的构造函数,该类提供了4个构造函数,但最终都调用了参数最多的一个,也就是说如果你选择其他的构造函数,某些值会使用默认值。我们看最负责的一个构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } |
corePoolSize:即使线程空闲也保留在线程池中的数目
maximumPoolSize:线程池中的线程最大数目
keepAliveTime:当线程数目大于corePoolSize时,在回收多余空闲线程前,等待新任务的最大时间。
unit: keepAliveTime的时间单位
workQueue: 任务被执行前的保存队列。该队列只保存execute方法提交的Runnable任务。
threadFactory:创建新线程的线程工厂。
handler:当执行阻塞时的处理器。在最大数目的线程都在执行任务,并且任务队列中未执行的任务数目达到队列容量时会出现执行阻塞。
2.添加任务和关闭线程池
使用public voidexecute(Runnable command)提交新的任务到线程池;
使用public voidshutdown()或者
public List<Runnable> shutdownNow()关闭线程池。
3.线程池的四种状态
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3; |
runState表示线程池的当前状态,有下面四个值:
RUNNING:可接受新的任务,并且处理队列中的任务
SHUTDOWN:不接受新的任务,但是处理队列中的任务
STOP:不接受新的任务,不处理队列中的任务,并且中断所有正在处理的任务。
TERMINATED:和STOP一样,另外所有的线程都已经被中断。
线程池状态的状态转移:
RUNNING-> SHUTDOWN:调用shutdown()方法,可能在finalize()方法中隐式调用。
(RUNNINGor SHUTDOWN) -> STOP:调用shutdownNow()方法。
SHUTDOWN-> TERMINATED:当队列和线程池都为空。
STOP-> TERMINATED:当线程池为空。
4.添加任务流程
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } } |
poolSize表示线程池中当前线程的数目;corePoolSize是创建ThreadPoolExecutor对象时的入参,表示线程池的核心线程数目。
执行任务的步骤:
(1).当前线程数目小于核心线程数目corePoolSize,启动新线程并运行execute方法传入的任务作为该新线程的第一个任务。成功,结束;失败,至步骤(3)
(2).当前线程数目大于等于核心线程数目
(3).如果线程池在RUNNING状态,则将任务添加到任务队列中。
(4)、添加任务到任务队列成功,检查线程池的状态,如果在添加任务的时候调用了shutdownNow()则需保证该任务被移除;否则保证线程池中至少有一个线程会处理任务。结束
(5)、添加任务到任务队列失败(队列满等),在线程池线程数目小于maximumPoolSize时,启动新线程来执行任务。成功,结束;失败,至步骤6
(6)、拒绝任务(shutdown或者饱和)
总结:在线程数目小于corePoolSize时,优先创建启动新线程处理任务。
在线程数目大于等于corePoolSize时,优先将新任务放入队列中。
在队列满并且线程数目小于maximumPoolSize的情况下,启动新线程处理任务。
在线程池关闭或者任务饱和时,拒绝任务。
5.线程工厂threadFactory
可以实现此接口,创建自己的线程工厂。例如下面的例子中,我们将线程池中所有的线程设置为后台线程。
public class MyThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } } |
6.拒绝任务处理
在第四小节《添加任务流程》中,我们看到新添加的任务在线程池被关闭或者线程池饱和的情况下,任务可能被拒绝。下面的异常就是任务被拒绝后抛出的异常:
java.util.concurrent.RejectedExecutionException Exception in thread "main" [2014-09-03 14:55:39,656] [main] () (ProxyInvocationHandler.java:144) ERROR com.ai.appframe2.complex.service.proxy.ProxyInvocationHandler - 方法异常 java.util.concurrent.RejectedExecutionException at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1765) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658) |
ThreadPoolExecutor对于被拒绝的任务的默认处理策略是抛出异常,如果我们不想这样怎么办?
ThreadPoolExecutor构造函数中的RejectedExecutionHandlerhandler的作用就是用来处理被拒绝的任务的。ThreadPoolExecutor预置了四种被拒绝任务被处理策略,当然你也可以实现RejectedExecutionHandler接口来做一些打印日志等任务。
我们来看线程池预置的四种被拒绝任务被处理策略:
1) ThreadPoolExecutor.AbortPolicy 处理程序遭到拒绝将抛出运行时 RejectedExecutionException;默认处理策略
2) ThreadPoolExecutor.CallerRunsPolicy 线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度;
3) ThreadPoolExecutor.DiscardPolicy 不能执行的任务将被删除;
4) ThreadPoolExecutor.DiscardOldestPolicy 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
7.任务队列选择workQueue
ThreadPoolExecutor构造函数接收一个BlockingQueue<Runnable>workQueue参数,作为未处理任务的保存队列。BlockingQueue有很多种,这里我们介绍常用的两种SynchronousQueue和LinkedBlockingQueue实现。
1) SynchronousQueue:其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。
2) LinkedBlockingQueue:一个基于已链接节点的、范围任意的 blocking queue。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。
关于这两种队列的对比,个人总结的一点经验是:对于任务会大量积压的情况,应该优先使用LinkedBlockingQueue;而对于每个任务都能得到及时处理的情况,可以使用SynchronousQueue。这也是是Executors类中使用的方法。
另外,PriorityBlockingQueue:是一个优先级队列,在处理有优先级的队列时,可以使用该实现。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
使用线程池的简单方法Executors
通过前面章节的介绍可见,直接构造ThreadPoolExecutor对象来创建线程池时,有很多精细的地方。JDK提供了java.util.concurrent.Executors类来简单生成线程池。如果你对ThreadPoolExecutor比较了解了的话,Executors的原理应该很简单了。我们看个例子吧:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } |
这是Executors提供的生成固定数目线程的线程池的方法,它只是把ThreadPoolExecutor构造函数中的corePoolSize和maximumPoolSize设置为一样,然后替你选择了一个任务保存队列LinkedBlockingQueue而已。
如何合理设置线程池的大小
通常情况下,这是一个复杂的活。
1.根据任务性质设置
要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
1)任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
2)任务的优先级:高,中和低。
3)任务的执行时间:长,中和短。
4)任务的依赖性:是否依赖其他系统资源,如数据库连接。
任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能小的线程,如配置CPU数+1个线程的线程池。IO密集型任务则由于线程并不是一直在执行任务,则配置尽可能多的线程,如2*CPU数。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。我在测试一个线程池的时候,使用循环不断提交新的任务,造成任务积压在线程池,最后程序不断的抛出抛弃任务的异常。如果使用无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。
通常这种设置方式是比较粗略的方式。
2.利特尔法则
利特尔法则(Little’slaw)是说,一个系统请求数等于请求的到达率与平均每个单独请求花费的时间之乘积
我们可以使用利特尔法则(Little’slaw)来判定线程池大小。我们只需计算请求到达率和请求处理的平均时间。然后,将上述值放到利特尔法则(Little’s law)就可以算出系统平均请求数。若请求数小于我们线程池的大小,就相应地减小线程池的大小。与之相反,如果请求数大于线程池大小,事情就有点复杂了。
当遇到有更多请求待处理的情况时,我们首先需要评估系统是否有足够的能力支持更大的线程池。准确评估的前提是,我们必须评估哪些资源会限制应用程序的扩展能力。在本文中,我们将假定是CPU,而在实际中可能是其它资源。最简单的情况是,我们有足够的空间增加线程池的大小。若没有的话,你不得不考虑其它选项,如软件调优、增加硬件,或者调优并增加硬件。
具体的我们可以参考这篇文章:
3.配置文件中配置
如果是对系统性能非常重要的一个线程池,与其猜测该线程池的合理大小,不如将它的参数开放出来。因为线程池的合理大小和系统资源也是息息相关的,假设你在设备A上面的线程池大小已经是最优了,不见得把程序放到设备B上面同样是最优的。放在配置文件中,可以方便将来根据系统运行情况进行调整。
我们看看开源任务调度框架Quartz开放了哪些参数:
<!-- 线程执行器配置,用于任务注册 --> <bean id="executor"class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize"value="6" /> <property name="maxPoolSize"value="16" /> <property name="queueCapacity"value="500" /> </bean> |
Quart开放了核心线程数目、最大线程数目、任务队列的容量这三个重要的参数,他们的设置都是和系统资源息息相关的。
线程池的监控
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用:
taskCount:线程池需要执行的任务数量。
completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不+getActiveCount:获取活动的线程数。
通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。