多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php開源 > php教程 > 線程池(一)

線程池(一)

來源:程序員人生   發布時間:2016-07-14 14:42:23 閱讀次數:5900次

甚么是線程池?為何要使用線程池?

進行數據庫開發的時候,我們應當都接觸過數據庫連接池,為了不每次進行數據庫連接的時候都重新新建和燒毀數據庫連接,我們可使用1個數據庫連接池來保護1些數據庫連接,讓他們長時間保持1個激活的狀態,當系統需要使用使用數據庫的時候,就從連接池中拿來1個可用的連接便可,而不是創建新的連接。反之,當我們需要關閉連接的時候,也不是真的關閉連接,而是將這個連接返還給連接池。通過這樣的方式,可以節儉很多的創建和燒毀對象的時間。
其實線程池也是類似的概念,線程池中總有那末幾個活躍的線程,當你需要的時候可以從線程池里隨意拿來1個空閑線程,當完成工作時其實不著急關閉線程,而是返回給線程池,方便其他人使用。
這里我們肯定還有疑問為何我們傳統的創建自定義線程有甚么問題?問題就是雖然線程是1種輕量級的工具,但它的創建和關閉都需要花費時間,如我們在程序中隨便的創建線程而不加控制其數量,反而會耗盡cpu和內存資源。即使沒有outofmemory異常,大量的回收線程也會致使GC停頓的時間延長。所以我們實際中可以優先斟酌使用線程池對線程進行控制和管理,更加有效的公道的使用線程進行提高程序的性能。

jdk對線程池的支持

Jdk提供了1套Executor框架,核心成員以下圖:

其中ThreadPoolExecutor是1個線程池,Executors扮演著1個線程工廠的角色,通過Executors類可以獲得1個具有特定功能的線程池。通過UML圖我們可以看到ThreadPoolExecutor實現Executor接口通過這個接口,任何Runable對象都可以被ThreadPoolExecutor線程池調度。
Executors主要提供以下工廠方法:
static ExecutorService newCachedThreadPool() static ExecutorService newFixedThreadPool(int nThreads) static ExecutorService newSingleThreadExecutor() static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) static ScheduledExecutorService newSingleThreadScheduledExecutor()
newCachedThreadPool()方法:該方法返回1個會根據實際情況進行調劑的線程池,線程數量不肯定。但如果有空閑的線程可復用,則優先選擇可復用的線程。若當先線程都在工作,同時又有新的任務提交,則會創建新的線程來處理。所有線程在當前任務完成的時候,將返回線程池進行復用。
newFixedThreadPool(int nThreads)方法:該方法返回1個具有固定數量的線程的線程池。該線程池中的線程數量始終不變。當1個任務提交時,若有空閑線程則履行,若沒有,會被交給1個任務隊列,當有空閑線程的時候,就會處理任務隊里的任務。
newScheduleThreadPool(int corePoolSize)方法:該方法會返回1個ScheduledExecutorService對象。ScheduledExecutorService接口在ExecutorService接口的基礎上擴大了在給定時間履行某任務的功能,如某個固定時間后開始履行,或周期性的履行某個任務。
newSingleThreadScheduleExecutor();也返回1個ScheduledExecutorService對象,不過線程池只有1個線程。
下面簡單演示下newFixedThreadPool(int nThreads)的簡單使用:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 簡單展現newFixedThreadPool * @author zdm * */ public class ThreadPoolDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":ThreadId:" + Thread.currentThread().getId()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(5); for(int i = 0; i < 10; i++) { es.execute(new MyTask()); } } }
運行結果:

有運行結果可以知道具有5個線程的線程池的把10個任務分兩批完成,前5個和后5個任務恰好相差了1秒,因而可知上面程序符合newFixedThreadPool產的線程池的行動。

有時間計劃的任務:

newScheduleThreadPool(int corePoolSize)返回1個ScheduleExecutorService對象,可以根據時間需要對線程進行調度,它的主要方法以下:
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) //創建并履行在給定延遲后啟用的1次性操作。 ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) //創建并履行1個在給定初始延遲后首次啟用的定期操作,后續操作具有給定的周期;也就是將在 initialDelay 后開始履行,然后在 initialDelay+period 后履行,接著在 initialDelay + 2 * period 后履行,依此類推。 ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) //創建并履行1個在給定初始延遲后首次啟用的定期操作,隨后,在每次履行終止和下1次履行開始之間都存在給定的延遲。
與其他線程池不同,ScheduledExecutorService其實不1定會立即安排履行任務,它會在指定的時間,對任務進行調度,起到的計劃任務的作用。
這里需要注意的就是scheduleAtFixedRate和scheduleWithFixedDelay這兩個方法都是對任務進行周期性的調度,但是又有1點不同。
對FixedRate的方式來講,任務調度的頻率是1定的,它是以上1個任務開始履行的時間為出發點,以后的period時間,調度下1次任務。而FixedDealy則是上1個任務結束后,在經過delay對下1次任務進行調度。
如果還有疑問,我們可以官方的文檔對它兩的解釋:
scheduleAtFixRate(Runnable command, long initialDealy, long period, TimeUnit unit):
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay  then initialDelay+period, then initialDelay + 2 * period, and so on.
翻譯:創建1個周期性的任務,任務開始于指定的初始延遲,后續的任務依照給定的周期履行:后續第1個任務將會在initialDelay+period,下1個任務將在initialDelay+2period履行。
scheduleWithFixedDelay(Runnable command, long initialDealy, long delay, TimeUnit unit):
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next.  If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor.
翻譯:創建1個周期性的任務,任務開始于指定的初始延遲,后續的任務依照給定的延遲履行,即上1個任務結束的時間到下1個任務開始時間的時間差。
下面可以簡單演示下ScheduledExecutorService的scheduleAtFixedRate的方法:
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * ScheduledExecutorService的簡單示例 * @author zdm * */ public class ScheduleExecutorServiceDemo { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); ses.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(1); System.out.println(System.currentTimeMillis()/1000); } catch (InterruptedException e) { e.printStackTrace(); } } }, 0, 2, TimeUnit.SECONDS); } }
運行結果:

這個任務履行使用1秒,周期為2秒,也就是2秒履行1次任務,恰好運行結果符合我們的預期目標。
但是這里有1個問題就是如果任務的履行時間大于調度周期的時間會產生怎樣辦?這里我們將TimeUnit.SECONDS.sleep(5)嘗試1下

會發現任務的周期調度變成了5秒~~~
如果采取scheduledWithFixedDelay()調用會依照修改任務履行需要5秒,延遲為2秒,那末任務的實際間隔為7秒。
這里還需要注意1個問題:那就是調度程序其實不會無窮期的延續等待。如果任務本身產生了異常,那末后續的子任務都會停止調用。所以需要對異常進行及時的處理,以保證周期性任務的穩定性。

線程池的內部實現

對核心的那幾個線程池,雖然看上去功能各不相同其實內部都是使用了ThreadPoolExecutor實現:

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
從上面我們可以發現它們都是ThreadPoolExecutor的封裝,為何功能如此強大?我們可以看1下ThreadPoolExecutor的構造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

方法的參數含義以下:
corePoolSize:表示線程中的線程數量。
maximuumPoolSize:表示線程池中最大的線程數量。
keepAliveTime:當線程數量超過corePoolSize時,過剩的空閑線程的存活時間
unit:表示存活時間單位
workQueue:任務隊列,被提交但卻沒有沒履行的任務。
threadFactory:線程工廠,用于創建線程,1般用默許的便可。
handler:謝絕策略,當任務太多來不及處理時,如何謝絕。
其中的參數workQueue是1個BlockingQueue<Runnable>接口,它有幾種不同功能的子類隊列:

先認識1下Blocking:

阻塞隊列,顧名思義,首先它是1個隊列,而1個隊列在數據結構中所起的作用大致以下圖所示:


從上圖我們可以很清楚看到,通過1個同享的隊列,可使得數據由隊列的1端輸入,從另外1端輸出;
經常使用的隊列主要有以下兩種:(固然通過不同的實現方式,還可以延伸出很多不同類型的隊列,DelayQueue就是其中的1種)
  先進先出(FIFO):先插入的隊列的元素也最早出隊列,類似于排隊的功能。從某種程度上來講這類隊列也體現了1種公平性。
  落后先出(LIFO):后插入隊列的元素最早出隊列,這類隊列優先處理最近產生的事件。

      多線程環境中,通過隊列可以很容易實現數據同享,比如經典的“生產者”和“消費者”模型中,通過隊列可以很便利地實現二者之間的數據同享。假定我們有若干生產者線程,另外又有若干個消費者線程。如果生產者線程需要把準備好的數據同享給消費者線程,利用隊列的方式來傳遞數據,就能夠很方便地解決他們之間的數據同享問題。但如果生產者和消費者在某個時間段內,萬1產生數據處理速度不匹配的情況呢?理想情況下,如果生產者產出數據的速度大于消費者消費的速度,并且當生產出來的數據積累到1定程度的時候,那末生產者必須暫停等待1下(阻塞生產者線程),以便等待消費者線程把積累的數據處理終了,反之亦然。但是,在concurrent包發布之前,在多線程環境下,我們每一個程序員都必須去自己控制這些細節,特別還要統籌效力和線程安全,而這會給我們的程序帶來不小的復雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些情況下會掛起線程(即阻塞),1旦條件滿足,被掛起的線程又會自動被喚醒)

下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:


如上圖所示:當隊列中沒有數據的情況下,消費者真個所有線程都會被自動阻塞(掛起),直到有數據放入隊列。


如上圖所示:當隊列中填滿數據的情況下,生產者真個所有線程都會被自動阻塞(掛起),直到隊列中有空的位置,線程被自動喚醒。

 這也是我們在多線程環境下,為何需要BlockingQueue的緣由。作為BlockingQueue的使用者,我們不再需要關心甚么時候需要阻塞線程,甚么時候需要喚醒線程,由于這1切BlockingQueue都給你1手包辦了。既然BlockingQueue如此神通廣大,讓我們1起來見識下它的經常使用方法:

BlockingQueue的核心方法

放入數據:
offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前履行方法的線程)
offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻塞直到BlockingQueue里面有空間再繼續.
獲得數據:
  poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null;
  poll(long timeout, TimeUnit unit):從BlockingQueue取出1個隊首的對象,如果在指定時間內,隊列1旦有數據可取,則立即返回隊列中的數據。否則知道時間超時還沒有數據可取,返回失敗。
  take():取走BlockingQueue里排在首位的對象,BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入; 
  drainTo():1次性從BlockingQueue獲得所有可用的數據對象(還可以指定獲得數據的個數),通過該方法,可以提升獲得數據效力;不需要屢次分批加鎖或釋放鎖。

BlockingQueue成員詳細介紹

1. ArrayBlockingQueue
      基于數組的阻塞隊列實現,在ArrayBlockingQueue內部,保護了1個定長數組,以便緩存隊列中的數據對象,這是1個經常使用的阻塞隊列,除1個定長數組外,ArrayBlockingQueue內部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數組中的位置。
  ArrayBlockingQueue在生產者放入數據和消費者獲得數據,都是共用同1個鎖對象,由此也意味著二者沒法真正并行運行,這點特別不同于LinkedBlockingQueue;依照實現原理來分析,ArrayBlockingQueue完全可以采取分離鎖,從而實現生產者和消費者操作的完全并行運行。Doug Lea之所以沒這樣去做,或許是由于ArrayBlockingQueue的數據寫入和獲得操作已足夠輕巧,以致于引入獨立的鎖機制,除給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有1個明顯的不同的地方在于,前者在插入或刪除元素時不會產生或燒毀任何額外的對象實例,而后者則會生成1個額外的Node對象。這在長時間內需要高效并發地處理大批量數據的系統中,其對GC的影響還是存在1定的區分。而在創建ArrayBlockingQueue時,我們還可以控制對象的內部鎖是不是采取公平鎖,默許采取非公平鎖。
2. LinkedBlockingQueue
      基于鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內部也保持著1個數據緩沖隊列(該隊列由1個鏈表構成),當生產者往隊列中放入1個數據時,隊列會從生產者手中獲得數據,并緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區到達最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉1份數據,生產者線程會被喚醒,反之對消費者這真個處理也基于一樣的原理。而LinkedBlockingQueue之所以能夠高效的處理并發數據,還由于其對生產者端和消費者端分別采取了獨立的鎖來控制數據同步,這也意味著在高并發的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高全部隊列的并發性能。
作為開發者,我們需要注意的是,如果構造1個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默許1個類似無窮大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度1旦大于消費者的速度,或許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。
ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最經常使用的阻塞隊列,1般情況下,在處理多線程間的生產者消費者問題,使用這兩個類足以。
3. DelayQueue
      DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲得到該元素。DelayQueue是1個沒有大小限制的隊列,因此往隊列中插入數據的操作(生產者)永久不會被阻塞,而只有獲得數據的操作(消費者)才會被阻塞。
使用處景:
  DelayQueue使用處景較少,但都相當奇妙,常見的例子比如使用1個DelayQueue來管理1個超時未響應的連接隊列。
4. PriorityBlockingQueue
      基于優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定),但需要注意的是PriorityBlockingQueue其實不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快于消費者消費數據的速度,否則時間1長,會終究耗盡所有的可用堆內存空間。在實現PriorityBlockingQueue時,內部控制線程同步的鎖采取的是公平鎖。
5. SynchronousQueue
      1種無緩沖的等待隊列,類似于無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的終究消費者,而消費者必須親身去集市找到所要商品的直接生產者,如果1方沒有找到適合的目標,那末對不起,大家都在集市等待。相對有緩沖的BlockingQueue來講,少了1個中間經銷商的環節(緩沖區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在乎經銷商終究會將這些產品賣給那些消費者,由于經銷商可以庫存1部份商品,因此相對直接交易模式,整體來講采取中間經銷商的模式會吞吐量高1些(可以批量買賣);但另外一方面,又由于經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應性能可能會下降。
  聲明1個SynchronousQueue有兩種不同的方式,它們之間有著不太1樣的行動。公平模式和非公平模式的區分:
  如果采取公平模式:SynchronousQueue會采取公平鎖,并配合1個FIFO隊列來阻塞過剩的生產者和消費者,從而體系整體的公平策略;
  但如果是非公平模式(SynchronousQueue默許):SynchronousQueue采取非公平鎖,同時配合1個LIFO隊列來管理過剩的生產者和消費者,而后1種模式,如果生產者和消費者的處理速度有差距,則很容易出現饑渴的情況,便可能有某些生產者或是消費者的數據永久都得不到處理。所以在使用SynchronousQueue時會設置很大的maximumPoolSize,而否則會很容易履行謝絕策略。
  • 小結
      BlockingQueue不光實現了1個完全隊列所具有的基本功能,同時在多線程環境下,他還自動管理了多線間的自動等待于喚醒功能,從而使得程序員可以疏忽這些細節,關注更高級的功能。 
由此可以知道,在使用newCachedThreadPool時,當提交的任務過量時,沒有空閑的線程,使用SynchronousQueue,它是直接提交任務的隊列,從而迫使線程池創建新的線程來處理任務。當任務履行完成,它會被指定的時間內被回收。由于maximumPoolSize=0。所以當有大量任務提交,而任務的處理又不是很快的情況下,會致使系統資源的耗盡。
使用newFixedThreadPool和newSingleThreadExecutor時應當注意無界的LinkedBlockingQueue的增長。
下面給出線程池的核心調度任務的代碼:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
代碼第5行的workerCountOf()方法獲得當前線程池中的線程總數,當線程總數小于corePoolSize時,會將任務通過方法addWorker()直接調度。否則workQueue.offer()進入任務隊列,如果進入任務隊列失敗(有界隊列到達上限或使用SynchronousQueue),則會履行17行,將任務提交到線程池,當前線程數到達maximumPoolSize,則提交失敗,使用謝絕策略,未到達,則分配線程履行。






生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 亚洲欧美色视频 | 久久国产精品久久国产片 | 久草在线观看福利视频 | 国产精品2020观看久久 | 日本特一级毛片免费视频 | 91亚洲精品福利在线播放 | 亚洲精品一区二区中文 | 91精品国产91久久久久久最新 | 亚洲国产精品一区二区三区 | 国产在线观看不卡免费高清 | 欧美午夜视频在线 | 2018年成人精品| 亚洲乱码专区一区二区三区 | 国产色视频一区二区三区 | www日本免费| 亚洲区一| 碰超丶在线免费 | 美女上床视频 | 亚洲 成人 欧美 自拍 | 亚洲综合视频在线观看 | 美女福利在线 | 伊人影视 | 中文字幕 国产精品 | 亚洲女人的天堂 | 成人精品一区二区三区中文字幕 | 欧美另类一区 | 久久精品免看国产 | 久久一区二区精品综合 | 伊人www| 国产91区精品福利在线社区 | 福利精品一区 | 亚洲伊人久久网 | 免费片子| 性xxxx免费观看视频 | 日韩一区二区三区中文字幕 | 免费国产在线观看老王影院 | 欧美国产成人免费观看永久视频 | 国产成人无精品久久久 | 国产亚洲精品美女一区二区 | 青青青青在线成人视99 | 91综合网 |