一、ThreadPoolExcutors的作用
java提供了ThreadPoolExcutors來(lái)創(chuàng)建一個(gè)線程池,我們?yōu)槭裁匆镁€程池呢?
1.降低資源的消耗:通過(guò)重復(fù)利用已經(jīng)創(chuàng)建好的線程降低線程的創(chuàng)建和銷毀帶來(lái)的損耗
2.提高響應(yīng)速度:因?yàn)榫€程池中的線程處于等待分配任務(wù)的狀態(tài),當(dāng)任務(wù)來(lái)時(shí)無(wú)需創(chuàng)建新的線程就能執(zhí)行
3.提高線程的可管理性
二、ThreadPoolExecutor構(gòu)造函數(shù)參數(shù)詳細(xì)介紹
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
1.int corePoolSize(核心線程數(shù))
線程池新建線程的時(shí)候,如果當(dāng)前線程總數(shù)小于corePoolSize,則新建的是核心線程,如果超過(guò)corePoolSize,則新建的是非核心線程;核心線程默認(rèn)情況下會(huì)一直存活在線程池中,即使這個(gè)核心線程啥也不干(閑置狀態(tài));如果設(shè)置了 allowCoreThreadTimeOut 為 true,那么核心線程如果不干活(閑置狀態(tài))的話,超過(guò)一定時(shí)間(時(shí)長(zhǎng)下面參數(shù)決定),就會(huì)被銷毀掉。
2.int maximumPoolSize(線程池能容納的最大線程數(shù)量)
線程總數(shù) = 核心線程數(shù) + 非核心線程數(shù)。
3.long keepAliveTime(非核心線程空閑存活時(shí)長(zhǎng))
非核心線程空閑時(shí)長(zhǎng)超過(guò)該時(shí)長(zhǎng)將會(huì)被回收,主要應(yīng)用在緩存線程池中,當(dāng)設(shè)置了 allowCoreThreadTimeOut 為 true 時(shí),對(duì)核心線程同樣起作用。
4.TimeUnit unit?
空閑線程的存活時(shí)間(keepAliveTime 的單位)它是一個(gè)枚舉類型,常用的如:TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)。
5.BlockingQueue workQueue(任務(wù)隊(duì)列)
當(dāng)所有的核心線程都在干活時(shí),新添加的任務(wù)會(huì)被添加到這個(gè)隊(duì)列中等待處理,如果隊(duì)列滿了,則新建非核心線程執(zhí)行任務(wù)
常用的workQueue類型:
????(1)SynchornousQueue:這隊(duì)列接收到任務(wù)時(shí)會(huì)直接交給線程處理,而不保留它,如果所有的線程都在工作,那就創(chuàng)建一???個(gè)新的線程來(lái)處理這個(gè)任務(wù),為了保證不出現(xiàn)線程數(shù)達(dá)到maxnumPoolSize而不能新建線程的錯(cuò)誤,所以使用這個(gè)類型 的隊(duì)列時(shí),maxnumPoolSize一般指定成Integer.MAX_VALUE,即無(wú)限大,然后核心線程數(shù)corePoolSize一般是0.
????(2)LinkedBlockingQueue:這個(gè)隊(duì)列接收到任務(wù)的時(shí)候,如果當(dāng)前線程數(shù)小于核心線程數(shù),則新建線程(核心線程)處理任務(wù);如果當(dāng)前線程數(shù)等于核心線程數(shù),則進(jìn)入隊(duì)列等待。由于這個(gè)隊(duì)列沒(méi)有最大值限制,即所有超過(guò)核心線程數(shù)的任務(wù)都將被添加到隊(duì)列中,這也就導(dǎo)致了 maximumPoolSize 的設(shè)定失效,因?yàn)榭偩€程數(shù)永遠(yuǎn)不會(huì)超過(guò) corePoolSize。
????(3)ArrayBlockingQueue:可以限定隊(duì)列的長(zhǎng)度,接收到任務(wù)的時(shí)候,如果沒(méi)有達(dá)到 corePoolSize 的值,則新建線程(核心線程)執(zhí)行任務(wù),如果達(dá)到了,則入隊(duì)等候,如果隊(duì)列已滿,則新建線程(非核心線程)執(zhí)行任務(wù),又如果總線程數(shù)到了 maximumPoolSize,并且隊(duì)列也滿了,則發(fā)生錯(cuò)誤。
????(4)DelayQueue:隊(duì)列內(nèi)元素必須實(shí)現(xiàn) Delayed 接口,這就意味著你傳進(jìn)去的任務(wù)必須先實(shí)現(xiàn) Delayed 接口。這個(gè)隊(duì)列接收到任務(wù)時(shí),首先先入隊(duì),只有達(dá)到了指定的延時(shí)時(shí)間,才會(huì)執(zhí)行任務(wù)。
6.ThreadFactory threadFactory(線程工廠):用來(lái)創(chuàng)建線程池中的線程,通常用默認(rèn)的即可
7.RejectedExecutionHandler handler(拒絕策略):在線程池已經(jīng)關(guān)閉的情況下和任務(wù)太多導(dǎo)致最大線程數(shù)和任務(wù)隊(duì)列已經(jīng)飽和,無(wú)法再接收新的任務(wù),在上面兩種情況下,只要滿足其中一種時(shí),在使用 execute() 來(lái)提交新的任務(wù)時(shí)將會(huì)拒絕,線程池提供了以下 4 種策略:
AbortPolicy:默認(rèn)策略,在拒絕任務(wù)時(shí),會(huì)拋出RejectedExecutionException。
CallerRunsPolicy:只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中,運(yùn)行當(dāng)前的被丟棄的任務(wù)。
DiscardOldestPolicy:該策略將丟棄最老的一個(gè)請(qǐng)求,也就是即將被執(zhí)行的任務(wù),并嘗試再次提交當(dāng)前任務(wù)。
DiscardPolicy:該策略默默的丟棄無(wú)法處理的任務(wù),不予任何處理。
線程池遵循的原則:
其會(huì)優(yōu)先創(chuàng)建核心線程,執(zhí)行任務(wù),當(dāng)核心線程增加CorePoolSize后,我們會(huì)把任務(wù)添加到work Queue中,當(dāng)work Queue里面的任務(wù)也塞滿了,線程池就會(huì)創(chuàng)建非核心線程執(zhí)行去執(zhí)行任務(wù),當(dāng)線程達(dá)到maximumPoolSize時(shí)候和work queue也達(dá)最大值時(shí)候我們會(huì)執(zhí)行對(duì)應(yīng)的拒絕策略
三、4類常見(jiàn)的線程池
newCachedThreadPool創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過(guò)處理需要,可靈活回收空閑線程,若無(wú)可回收,則新建線程。
newFixedThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。
newScheduledThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。
newSingleThreadExecutor 創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。
其實(shí)他們都是通過(guò)ThreadPoolExcutors創(chuàng)建的
1.CachedThreadPool
由Executors的newCachedThreadPool方法創(chuàng)建,不存在核心線程,只存在數(shù)量不定的非核心線程,而且其數(shù)量最大值為Integer.MAX_VALUE。當(dāng)線程池中的線程都處于活動(dòng)時(shí)(全滿),線程池會(huì)創(chuàng)建新的線程來(lái)處理新的任務(wù),否則就會(huì)利用新的線程來(lái)處理新的任務(wù),線程池中的空閑線程都有超時(shí)機(jī)制,默認(rèn)超時(shí)時(shí)長(zhǎng)為60s,超過(guò)60s的空閑線程就會(huì)被回收。和FixedThreadPool不同的是,CachedThreadPool的任務(wù)隊(duì)列其實(shí)相當(dāng)于一個(gè)空的集合,這將導(dǎo)致任何任務(wù)都會(huì)被執(zhí)行,因?yàn)樵谶@種場(chǎng)景下SynchronousQueue是不能插入任務(wù)的,SynchronousQueue是一個(gè)特殊的隊(duì)列,在很多情況下可以理解為一個(gè)無(wú)法儲(chǔ)存元素的隊(duì)列。從CachedThreadPool的特性看,這類線程比較適合執(zhí)行大量耗時(shí)較小的任務(wù)。當(dāng)整個(gè)線程池都處于閑置狀態(tài)時(shí),線程池中的線程都會(huì)因?yàn)槌瑫r(shí)而被停止回收,幾乎是不占任何系統(tǒng)資源。實(shí)現(xiàn)方式如下:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
2.FixedThreadPool
由Executors的newFixedThreadPool方法創(chuàng)建。它是一種線程數(shù)量固定的線程池,當(dāng)線程處于空閑狀態(tài)時(shí),他們并不會(huì)被回收,除非線程池被關(guān)閉。當(dāng)所有的線程都處于活動(dòng)狀態(tài)時(shí),新的任務(wù)都會(huì)處于等待狀態(tài),直到有線程空閑出來(lái)。FixedThreadPool只有核心線程,且該核心線程都不會(huì)被回收,這意味著它可以更快地響應(yīng)外界的請(qǐng)求。jdk實(shí)現(xiàn)如下:FixedThreadPool沒(méi)有額外線程,只存在核心線程,而且核心線程沒(méi)有超時(shí)機(jī)制,而且任務(wù)隊(duì)列沒(méi)有長(zhǎng)度的限制。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
3.ScheduledThreadPool
通過(guò)Executors的newScheduledThreadPool方式創(chuàng)建,核心線程數(shù)量是固定的,而非核心線程是沒(méi)有限制的,并且當(dāng)非核心線程閑置時(shí)它會(huì)被立即回收,ScheduledThreadPool這類線程池主要用于執(zhí)行定時(shí)任務(wù)和具有固定時(shí)期的重復(fù)任務(wù),實(shí)現(xiàn)方法如下:
?
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
4.SingleThreadPool
通過(guò)Executors的newSingleThreadExecutor方法來(lái)創(chuàng)建。這類線程池內(nèi)部只有一個(gè)核心線程,它確保所有的任務(wù)都在同一個(gè)線程中按順序執(zhí)行。SingleThreadExecutor的意義在于統(tǒng)一所有外界任務(wù)一個(gè)線程中,這使得這些任務(wù)之間不需要處理線程同步的問(wèn)題,實(shí)現(xiàn)方式如下:
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
四、為什么《阿里巴巴Java開(kāi)發(fā)手冊(cè)》上要禁止使用Executors來(lái)創(chuàng)建線程池
?Executors創(chuàng)建出來(lái)的線程池使用的全都是無(wú)界隊(duì)列,而使用無(wú)界隊(duì)列會(huì)帶來(lái)很多弊端,最重要的就是,它可以無(wú)限保存任務(wù),因此很有可能造成OOM異常。同時(shí)在某些類型的線程池里面,使用無(wú)界隊(duì)列還會(huì)導(dǎo)致maxinumPoolSize、keepAliveTime、handler等參數(shù)失效?
五、我們用代碼測(cè)試下
import java.util.*;
import java.lang.*;
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
class Rextester
{
private static int produceTaskSleepTime = 5;
private static int consumeTaskSleepTime = 1000;
private static int taskMaxNumber = 10; //定義最大添加10個(gè)線程到線程池中
public static void main(String args[]) {
//構(gòu)造一個(gè)線程池
//該策略默默的丟棄無(wú)法處理的任務(wù),不予任何處理。
ThreadPoolExecutor threadPool = getTpe("DiscardPolicy");
//該策略將丟棄最老的一個(gè)請(qǐng)求,也就是即將被執(zhí)行的任務(wù),并嘗試再次提交當(dāng)前任務(wù)。
//ThreadPoolExecutor threadPool = getTpe("DiscardOldestPolicy");
for (int i = 1; i <= taskMaxNumber; i++) {
try {
//添加任務(wù)到線程池,我們要知道添加的任務(wù)是Runnable
String value = "value is: " + i;
System.out.println("put:" + value);
threadPool.execute(new ThreadPoolTask(value));
//線程休息一段時(shí)間,方便我們分析結(jié)果
Thread. sleep(produceTaskSleepTime);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 線程池執(zhí)行的任務(wù)
*/
public static class ThreadPoolTask implements Runnable {
//保存任務(wù)所需要的數(shù)據(jù)
private String value;
public ThreadPoolTask(String value) {
this.value = value;
}
public void run() {
//打印語(yǔ)句
System. out.println("start------" + value);
try {
Thread. sleep(consumeTaskSleepTime);
} catch (Exception e) {
e.printStackTrace();
}
value = "";
}
}
/**
*初始化線程池
*/
public static ThreadPoolExecutor getTpe(String type) {
if ("DiscardOldestPolicy".equals(type)) {
return new ThreadPoolExecutor(2, 4, 3,
TimeUnit. SECONDS, new ArrayBlockingQueue<Runnable>(3),
new ThreadPoolExecutor.DiscardOldestPolicy());
} else {
return new ThreadPoolExecutor(2, 4, 3,
TimeUnit. SECONDS, new ArrayBlockingQueue<Runnable>(3),
new ThreadPoolExecutor.DiscardPolicy());
}
//DiscardPolicy DiscardOldestPolicy
}
}
運(yùn)行結(jié)果:
put:value is: 1
start------value is: 1
put:value is: 2
start------value is: 2
put:value is: 3
put:value is: 4
put:value is: 5
put:value is: 6
start------value is: 6
put:value is: 7
start------value is: 7
put:value is: 8
put:value is: 9
put:value is: 10
start------value is: 3
start------value is: 4
start------value is: 5
我們把上面的生成線程池的代碼不//
ThreadPoolExecutor threadPool = getTpe("DiscardOldestPolicy");
運(yùn)行結(jié)果如下
put:value is: 1
start------value is: 1
put:value is: 2
start------value is: 2
put:value is: 3
put:value is: 4
put:value is: 5
put:value is: 6
start------value is: 6
put:value is: 7
start------value is: 7
put:value is: 8
put:value is: 9
put:value is: 10
start------value is: 8
start------value is: 9
start------value is: 10
部分內(nèi)容參考了博客:https://blog.csdn.net/qq_34835058/article/details/80497602
?