Java 線程 Executor 框架詳解與使用
在HotSpot VM的線程模型中,Java線程被一對一映射為本地操作系統線程。Java線程啟動時會創建一個本地操作系統線程;當該Java線程終止時,這個操作系統線程也會被回收,在JVM中我們可以通過-Xss設置每個線程的大小。操作系統會調度所有線程並將它們分配給可用的CPU。
在上層,java多線程程序通常把應用分解為若干個任務,然後使用用戶級的調度器(Executor框架)將這些任務映射為固定數量的線程;在底層,操作系統內核將這些線程映射到硬體處理器上。這種兩級調度模型的示意圖如下圖所示
通過上圖可以看出應用程序通過Executor控制上層調度,操作系統內核控制下層調度。
註:oskernel操作系統核心包括操作系統軟體和應用,只是操作系統最基本的功能,例如內存管理,進程管理,硬體驅動等
Executor結構
executor結構主要包括任務、任務的執行和非同步結果的計算。
任務
包括被執行任務需要實現的介面:Runnable介面或Callable介面
任務的執行
包括任務執行機制的核心介面Executor,以及繼承自Executor的ExecutorService介面。Executor框架有兩個關鍵類實現了ExecutorService介面(ThreadPoolExecutor和ScheduledThreadPoolExecutor)
非同步計算的結果
包括介面Future和實現Future介面的FutureTask類
下面我們來看看executor類圖
在Executor使用過程中,主線程首先要創建實現Runnable或者Callable介面的任務對象。工具類Executors可以把一個Runnable對象封裝為一個Callable對象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object resule))。
如果執行ExecutorService.submit(…),ExecutorService將返回一個實現Future介面的對象(FutureTask)。由於FutureTask實現了Runnable,我們也可以創建FutureTask,然後直接交給ExecutorService執行。最後,主線程可以執行FutureTask.get()方法來等待任務執行完成。主線程也可以執行FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務的執行。
FixedThreadPool
初始化
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); }
FixedThreadPool的corePoolSize和maximumPoolSize都被設置為創建FixedThreadPool時指定的參數nThreads。當線程池中的線程數大於corePoolSize時,keepAliveTime為多餘的空閑線程等待新任務的最長時間,超過這個時間後多餘的線程將被終止。這裡把keepAliveTime設置為0L,意味著多餘的空閑線程會被立即終止
運行過程
下面是FixedThreadPool運行過程示意圖
1、如果當前運行的線程數少於corePoolSize,則創建新線程來執行任務。
2、在線程池完成預熱之後(當前運行的線程數等於corePoolSize),將任務加入LinkedBlockingQueue。
3、線程執行完1中的任務後,會在循環中反覆從LinkedBlockingQueue獲取任務來執行。
FixedThreadPool使用無界隊列LinkedBlockingQueue作為線程池的工作隊列(隊列的容量為Integer.MAX_VALUE)。使用無界隊列作為工作隊列會對線程池帶來如下影響
1、當線程池中的線程數達到corePoolSize後,新任務將在無界隊列中等待,因此線程池中的線程數不會超過corePoolSize。
2、由於1,使用無界隊列時maximumPoolSize將是一個無效參數。
3、由於1和2,使用無界隊列時keepAliveTime將是一個無效參數。
4、由於使用無界隊列,運行中的FixedThreadPool(未執行方法shutdown()或shutdownNow())不會拒絕任務(不會調用RejectedExecutionHandler.rejectedExecution方法)。
使用場景
FixedThreadPool適用於為了滿足資源管理的需求,而需要限制當前線程數量的應用場景,它適用於負載比較重的伺服器
SingleThreadExecutor
初始化
創建使用單個線程的SingleThread-Executor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory)); }
SingleThreadExecutor的corePoolSize和maximumPoolSize被設置為1。其他參數與FixedThreadPool相同。SingleThreadExecutor使用無界隊列LinkedBlockingQueue作為線程池的工作隊列(隊列的容量為Integer.MAX_VALUE)。SingleThreadExecutor使用無界隊列作為工作隊列對線程池帶來的影響與FixedThreadPool相同
運行過程
下圖是SingleThreadExecutor的運行過程示意圖
1、如果當前運行的線程數少於corePoolSize(即線程池中無運行的線程),則創建一個新線程來執行任務。
2、在線程池完成預熱之後(當前線程池中有一個運行的線程),將任務加入LinkedBlockingQueue。
3、線程執行完1中的任務後,會在一個無限循環中反覆從LinkedBlockingQueue獲取任務來執行。
使用場景
SingleThreadExecutor適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個線程是活動的應用場景。
CachedThreadPool
初始化
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory); }
CachedThreadPool的corePoolSize被設置為0,即corePool為空;maximumPoolSize被設置為Integer.MAX_VALUE,即maximumPool是無界的。這裡把keepAliveTime設置為60L,意味著CachedThreadPool中的空閑線程等待新任務的最長時間為60秒,空閑線程超過60秒後將會被終止。
FixedThreadPool和SingleThreadExecutor使用無界隊列LinkedBlockingQueue作為線程池的工作隊列。CachedThreadPool使用沒有容量的SynchronousQueue作為線程池的工作隊列,但CachedThreadPool的maximumPool是無界的。這意味著,如果主線程提交任務的速度高於maximumPool中線程處理任務的速度時,CachedThreadPool會不斷創建新線程。極端情況下,CachedThreadPool會因為創建過多線程而耗盡CPU和內存資源。
運行過程
下圖是CachedThreadPool的運行過程示意圖
1、首先執行SynchronousQueue.offer(Runnable task)。如果當前maximumPool中有空閑線程正在執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那麼主線程執行offer操作與空閑線程執行的poll操作配對成功,主線程把任務交給空閑線程執行,execute()方法執行完成;否則執行下面的步驟2)。
2、當初始maximumPool為空,或者maximumPool中當前沒有空閑線程時,將沒有線程執行SynchronousQueue.poll
(keepAliveTime,TimeUnit.NANOSECONDS)。這種情況下,步驟1)將失敗。此時CachedThreadPool會創建一個新線程執行任務,execute()方法執行完成。
3、在步驟2)中新創建的線程將任務執行完後,會執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這個poll操作會讓空閑線程最多在SynchronousQueue中等待60秒鐘。如果60秒鐘內主線程提交了一個新任務(主線程執行步驟1)),那麼這個空閑線程將執行主線程提交的新任務;否則,這個空閑線程將終止。由於空閑60秒的空閑線程會被終止,因此長時間保持空閑的CachedThreadPool不會使用任何資源。
前面提到過,SynchronousQueue是一個沒有容量的阻塞隊列。每個插入操作必須等待另一個線程的對應移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主線程提交的任務傳遞給空閑線程執行。CachedThreadPool中任務傳遞的示意圖如下圖所示:
使用場景
看名字我們可以知道cached緩存,CachedThreadPool可以創建一個可根據需要創建新線程的線程池,但是在以前構造的線程可用時將重用它們,對於執行很多短期非同步任務的程序而言,這些線程池通常可提高程序性能。調用 execute 將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則創建一個新線程並添加到池中。
CachedThreadPool是大小無界的線程池,適用於執行很多的短期非同步任務的小程序,或者是負載較輕的伺服器。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。它主要用來在給定的延遲之後運行任務,或者定期執行任務。ScheduledThreadPoolExecutor的功能與Timer類似,但ScheduledThreadPoolExecutor功能更強大、更靈活。Timer對應的是單個後台線程,而ScheduledThreadPoolExecutor可以在構造函數中指定多個對應的後台線程數
初始化
ScheduledThreadPoolExecutor通常使用工廠類Executors來創建。Executors可以創建2種類型的ScheduledThreadPoolExecutor,如下。
ScheduledThreadPoolExecutor:包含若干個線程的ScheduledThreadPoolExecutor。
SingleThreadScheduledExecutor:只包含一個線程的ScheduledThreadPoolExecutor。
下面分別介紹這兩種ScheduledThreadPoolExecutor。
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }
直接調用父類ThreadPoolExecutor構造方法進行初始化。ScheduledThreadPoolExecutor適用於需要多個後台線程執行周期任務,同時為了滿足資源管理的需求而需要限制後台線程的數量的應用場景。
下面看看如何創建SingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); }
運行過程
DelayQueue是一個無界隊列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中沒有什麼意義(設置maximumPoolSize的大小沒有什麼效果)。ScheduledThreadPoolExecutor的執行主要分為兩大部分。
1、當調用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法時,會向ScheduledThreadPoolExecutor的DelayQueue添加一個實現了RunnableScheduledFutur介面的ScheduledFutureTask。
2、線程池中的線程從DelayQueue中獲取ScheduledFutureTask,然後執行任務。
下面看看ScheduedThreadPoolExecutor運行過程示意圖
ScheduledThreadPoolExecutor為了實現周期性的執行任務,對ThreadPoolExecutor做了如下
的修改。
1、使用DelayQueue作為任務隊列。
2、獲取任務的方式不同(後文會說明)。
3、執行周期任務後,增加了額外的處理(後文會說明)。
undefined實現過程分析
ScheduledThreadPoolExecutor會把待調度的任務(ScheduledFutureTask)放到一個DelayQueue中。ScheduledFutureTask主要包含3個成員變數,如下。
1、long time,表示這個任務將要被執行的具體時間。
2、long sequenceNumber,表示這個任務被添加到ScheduledThreadPoolExecutor中的序號。
3、long period,表示任務執行的間隔周期。
DelayQueue封裝了一個PriorityQueue,這個PriorityQueue會對隊列中的ScheduledFutureTask進行排序。排序時,time小的排在前面(時間早的任務將被先執行)。如果兩個ScheduledFutureTask的time相同,就比較sequenceNumber,sequenceNumber小的排在前面(也就是說,如果兩個任務的執行時間相同,那麼先提交的任務將被先執行)。首先,讓我們看看ScheduledThreadPoolExecutor中的線程執行周期任務的過程。如下圖所示
1、線程1從DelayQueue中獲取已到期的ScheduledFutureTask(DelayQueue.take())。到期任務是指ScheduledFutureTask的time大於等於當前時間。
2、線程1執行這個ScheduledFutureTask。
3、線程1修改ScheduledFutureTask的time變數為下次將要被執行的時間。
4、線程1把這個修改time之後的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。


※Java編程入門學習開發的10個必備技術點!
※java第三季1-9
※JAVA中方法的調用 詳細整理
※QQ 音樂播放器簡易開發
TAG:java學習吧 |
※SpringMVC + security模塊 框架整合詳解
※Python Web 應用程序 Tornado 框架簡介
※JFinal框架學習——EhCachePlugin
※筆記應用GoodNotes將推出macOS版本,基於Project Catalyst框架
※JFinal框架學習——cache的簡單使用
※圍觀丨Google 的 Mobile UI 框架 Flutter Preview 1 發布!
※Spring Cache 框架
※網路爬蟲框架Scrapy詳解之Request
※Scrounger:iOS和Android移動應用程序滲透測試框架
※淺談Metasploit框架中的Payload
※Jmeter+Ant+Jenkins介面自動化測試框架搭建for Windows
※基於Asyncio的Python微框架:Quart
※Electron 軟體框架漏洞影響眾多熱門應用:Skype、Signal、Slack、Twitch……
※Facebook旗下Oculus VR團隊開源了DeepFocus框架
※Wasserstein is all you need:構建無監督表示的統一框架
※不需要 Root,也能用上強大的 Xposed 框架:VirtualXposed
※Coinbase推出新取證框架Dexter
※2億用戶背後的Flutter應用框架Fish Redux
※Altova跨平台移動應用框架MobileTogether發新版
※Google 跨平台 UI 框架-Flutter