1.使用線程池的意義何在?
項目開發中,為了統一管理線程,並有效精準地進行排錯,我們經常要求項目人員統一使用線程池去創建線程。因為我們是在受不了有些人動不動就去創建一個線程,使用的多了以後,一旦報錯就只有一個線程報錯信息,還是線程的共用信息,再加上如果你將異常吃了(捕獲後不做處理)的情況下,這個錯誤,我實在不知道去哪裡排查,不然你換個人試試吧。
2.線程池的重要參數----你真的了解嗎
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
1、corePoolSize:核心線程數。設置核心線程數的意義何在?通俗來講核心線程數就是正式員工,需要長期堅守崗位,有任務就需要執行。
2、maximumPoolSize:最大線程池個數。設置最大線程池數量的意義何在?其實就是一個容錯機制,當你的需要執行的線程個數已經爆滿並且超過的時候,提供了一個容錯機制,可以保證在短期內多餘的任務正常執行。相當於就是臨時工,臨時過來執行任務,任務結束後就可以走了。
3、keepAliveTime:保活的時間。設置的意義何在?當線程任務無劇增的情況下,維持在正常提亮。你無需那麼多臨時工來執行任務,所以規定時間,臨時工可以走人了,也即是除核心線程外的線程可以回收了。
4、TimeUnit:保活的時間單位。這個就不多贅述了。
5、BlockingQueue:阻塞隊列。設置阻塞隊列的意義何在?當所有核心線程都正在工作時,將其放入阻塞隊列,等待後續執行。也就是這個任務進行排隊,等正式工忙完了繼續做。
6、ThreadFactory:線程工廠。生產線程,由你自己去定義你想生產什麼樣的線程。
7、RejectedExecutionHandler:拒絕策略。當你的最大線程與阻塞隊列都滿了。這個時候,你已經接收不了新的任務進行處理了。所以設置拒絕策略。相當於就是我所有的員工和臨時工都在工作了,並且排隊的任務都滿了,應對這樣的情況,你打算如何做。
除此之外還有一個重要的參數:
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;//是否允許核心線程數超時退出。
該參數有在特定的業務場景下有很大的意義。比如:你的業務只在晚上需要執行,其餘時間無需執行。那麼為何不把資源讓出來,白天的時候,可以讓其他業務占有這些資源去執行呢。
3.ThreadExecutorPool線程池重要源碼解析
由該類圖可知,Executor執行器定義執行方法,ExecutorService定義線程池操作的基本方法,AbstractExecutorService定義了線程池操作的方法模板。
ThreadPoolExecutor任務執行流程圖
1.首先是構造方法
基本的參數校驗與賦值,簡單代碼不過多贅述。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
////基本的參數校驗
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2.線程執行的方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<void> ftask = newTaskFor(task, null);//將線程對象封裝成RunnableFuture
execute(ftask);//任務執行
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);//將線程對象封裝成RunnableFuture
execute(ftask);//任務執行
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);//將線程對象封裝成RunnableFuture
execute(ftask);//任務執行
return ftask;
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//獲取當前的線程池狀態。單個參數,保存了線程池的狀態以及線程數量
if (workerCountOf(c) < corePoolSize) { //當線程數量小於核心線程數
if (addWorker(command, true)) //直接添加任務,運行線程
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//如果核心線程數已經滿了,那麼直接添加到阻塞隊列。
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//線程池不是running狀態,執行拒絕策略。
reject(command);
else if (workerCountOf(recheck) == 0)//線程池線程數量不能為0,需要有一個線程對線程池的後續操作進行處理,比如關閉線程池
addWorker(null, false);
}
else if (!addWorker(command, false))//當核心線程與阻塞隊列都滿了的時候,直接添加任務到非核心線程運行。添加失敗直接執行拒絕策略
reject(command);
}
1.關於ctl.get()方法的解釋---利用了單個變量,保存了線程池狀態以及線程數量的值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; //運行狀態 正常執行任務
private static final int shutdown = 0 << COUNT_BITS; //關閉線程池,不再接收新任務
private static final int STOP = 1 << COUNT_BITS; //關閉線程池,所有任務停止
private static final int TIDYING = 2 << COUNT_BITS; //中間狀態
private static final int TERMINATED = 3 << COUNT_BITS; //線程池已經關閉
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
2.addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();//獲取ctl的快照保存在棧上
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && //如果線程池已經關閉,或者(當前線程池關閉狀態當前任務是空且當前工作隊列不為空)不滿足的情況下直接返回
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//CAS修改線程池ctl變量,增加線程數
break retry; //添加成功直接退出
c = ctl.get(); // 添加不成功,為了保證多線程運行的安全性,重新獲取
if (runStateOf(c) != rs)//當前線程池狀態發生改變
continue retry; //直接重新運行retry循環體
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //生成自定義的線程woker
final Thread t = w.thread;
if (t != null) {
final Reentrantlock mainLock = this.mainLock;//這個代碼沒有意義,mainLock定義的變量為final。可以直接使用
mainLock.lock();//添加work使用鎖,保證添加任務的原子性。
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || //線程池處於running狀態
(rs == SHUTDOWN && firstTask == null)) {//線程池處於showdown狀態但是firstTask為空。
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)//保存當前線程池中線程的最大數量
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {//添加成功,運行線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)//線程啟動失敗
addWorkerFailed(w);//移除work,減少線程數量
}
return workerStarted;
}
t.start()執行線程任務
//Worker類中實際執行任務的方法
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts //將原始的線程狀態為-1修改為0,後續通過getState()>=0獲取線程是否已經運行的狀態,允許線程中斷。-1默認為初始化,此處需要進行處理
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//task不等於空直接運行,task等於空從workerQueue阻塞隊列獲取任務
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||//線程池運行狀態大於等於STOP
(Thread.interrupted() && //線程是否已經被中斷了
runStateAtLeast(ctl.get(), STOP))) &&//鮮橙汁運行狀態大於等於STOP
!wt.isInterrupted())//判斷任務的線程如果沒有被中斷
wt.interrupt();//中斷當前任務線程
try {
beforeExecute(wt, task);//鉤子函數,實際任務運行之前做處理
Throwable thrown = null;
try {
task.run();//執行實際任務代碼
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//鉤子函數,實際任務運行之後做處理
}
} finally {
task = null;//將任務置空
w.completedTasks++;//任務完成數加1
w.unlock();
}
}
completedAbruptly = false;//執行過程中是否發成異常
} finally {
processWorkerExit(w, completedAbruptly);
}
}
//執行任務退出操作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果有異常中斷導致任務結束
decrementWorkerCount();//將線程數量減1
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;//完成的任務數量累加
workers.remove(w);//從workers的任務集合中移除當前任務
} finally {
mainLock.unlock();
}
tryTerminate();//嘗試關閉線程池
int c = ctl.get();//獲取當前線程池的最新狀態
if (runStateLessThan(c, STOP)) {//如果當前任務狀態小於STOP
if (!completedAbruptly) {//當前任務執行無異常發生
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//根據allowCoreThreadTimeOut參數獲取最小的線程數量
if (min == 0 && ! workQueue.isEmpty())//如果核心線程允許退出,並且工作隊列不為空
min = 1;//設置最小值為1,因為最後需要有線程去執行線程池的後續處理,所有線程都沒了,後續線程池退出無線程處理
if (workerCountOf(c) >= min)//如果工作的線程數量大於等最小值
return; // replacement not needed 直接返回
}
addWorker(null, false);//如果當前線程數已經小於最小線程數,那麼需要保證最小線程數在運行,所以需要有保證線程池的正常運行,添加一個空任務。
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();//獲取當前線程池狀態
int rs = runStateOf(c);//獲取當前運行狀態
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//如果線程池狀態大於等於SHUTDOWN並且(線程數量大於等於STOP或者工作隊列為空)
decrementWorkerCount();//將線程池中線程數量減1
return null;
}
int wc = workerCountOf(c);//獲取當前線程池的線程數量
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//判斷是否運行核心線程數超時,判斷是否需要超時機制
if ((wc > maximumPoolSize || (timed && timedOut))//工作線程大於最大線程池數量或者允許超時並且有超時的情況
&& (wc > 1 || workQueue.isEmpty())) {//並且線程池線程數量大於1或者阻塞隊列為空
if (compareAndDecrementWorkerCount(c))//CAS操作將線程池數量減1
return null;//返回空
continue;//CAS失敗繼續
}
try {
Runnable r = timed ?//允許超時從隊列中拿任務並等待keepAliveTime時間
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();阻塞等待
if (r != null)//獲取的任務不為空
return r;//直接返回
timedOut = true;//如果為空,超時標誌位為true
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3.addWorkerFailed方法解析
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//獲取鎖
try {
if (w != null)//work不是空
workers.remove(w);//直接從workers中移除當前任務
decrementWorkerCount();//加個ctl中的woker數量減少
tryTerminate();//如果線程池已經是showdown狀態,嘗試讓線程池停止。多線程協作的函數
} finally {
mainLock.unlock();
}
}
3.線程池關閉shutdown方法
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//檢查關閉權限,可以忽略
advanceRunState(SHUTDOWN);//線程池狀態遞進,由running變為shutdown
interruptIdleWorkers();//中斷所有空閒線程
onShutdown(); // hook for ScheduledThreadPoolExecutor鉤子函數,調度線程池使用
} finally {
mainLock.unlock();
}
tryTerminate();//嘗試將線程池關閉。
}
1.advanceRunState方法解析
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();//獲取當前的線程狀態
if (runStateAtLeast(c, targetState) ||//當前狀態已經是大於等於shutdown直接退出
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))//cas操作將線程狀態改為targetState。
break;
}
}
2.interruptIdleWorkers方法解析
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//獲取鎖
try {
for (Worker w : workers) {//遍歷works中所有的工作任務
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {//如果沒有被中斷過,並且可以獲得鎖,證明屬於空閒線程
try {
t.interrupt();//將線程中斷,打上中斷標誌位
} catch (SecurityException ignore) {
} finally {
w.unlock();//解鎖
}
}
if (onlyOne)//只中斷一個線程標識
break;
}
} finally {
mainLock.unlock();
}
}
4.shutdownNow方法解析
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//權限檢查
advanceRunState(STOP);//狀態遞進 詳細方法見上面
interruptWorkers();//中斷所有啟動的work線程
tasks = drainQueue();//將所有未執行的任務出隊保存
} finally {
mainLock.unlock();
}
tryTerminate();//嘗試關閉線程池
return tasks;
}
1.interruptWorkers方法解析
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//獲取鎖
try {
for (Worker w : workers)//遍歷所有woker進行處理
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {//當前work的狀態大於0並且線程不為空且線程未被中斷
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
使用getState() >= 0表示當前線程已經啟動,runWorker方法中會將其狀態從-1改變。證明線程已經啟動
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
2.drainQueue方法解析
//標準的入隊和出隊功能不做過多注釋
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
5.tryTerminate方法解析
final void tryTerminate() {
for (;;) {
int c = ctl.get();//獲取當前線程狀態ctl
if (isRunning(c) ||//線程池正在運行
runStateAtLeast(c, TIDYING) ||//線程池狀態大於等於TIDYING,有其他線程已經改變線程池狀態為TIDYING或者TERMINATED了
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//線程池狀態等於shutdown並且工作隊列不為空。
return;//以上三種情況線程池無法關閉,需要繼續處理
if (workerCountOf(c) != 0) { // Eligible to terminate//當前工作線程數量不等於0
interruptIdleWorkers(ONLY_ONE);//中斷線程且只中斷一個
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//cas操作將線程池狀態置為TIDYING
try {
terminated();//線程池終止
} finally {
ctl.set(ctlOf(TERMINATED, 0));//設置線程池狀態為TERMINATED
termination.signalAll();//信號喚醒所有等待線程
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
4.總結
線程池的運用在項目中已經成為一種常態,作為一個開發人員最重要的了解其背後的設計原理以及流程,更好地運用線程池,方便提升項目程序的性能以及排查錯誤。在閱讀對應的線程池源碼時,我們只局限於單線程的思維,更多的是要去考慮當多線程並發執行時的臨界條件。了解設計者的設計初衷、以及設計意圖,能讓你更好地在項目中運用並設計符合自己項目的線程池。以上是我個人對於線程池ThreadPoolExecutor的理解,不足之處,請多多指教。