多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php開源 > php教程 > 線程池(二)

線程池(二)

來源:程序員人生   發布時間:2016-07-01 08:58:45 閱讀次數:2417次

線程池超負載了怎樣辦?都有哪些謝絕策略?

在ThreadPoolExecutor的構造方法里有1個這樣的參數,RejectedExecutionHandler handler通過查看Jdk我們可以知道這是1個接口,而且jdk內置實現了4種謝絕策略,它們都是ThreadPoolExecutor的public static class。

CallerRunsPolicy策略:只要線程池未關閉,該策略直接在調用者線程中,運行當前被拋棄的任務,雖然這樣其實不會真的拋棄任務,但是相應的被提交任務的線程性能肯定會急劇降落。
AbortPolicy策略:該策略會直接拋出異常,禁止系統正常工作。
DiscardPolicy策略:直接拋棄任務,不給予任何處理。
DiscardOldestPolicy策略:該策略會拋棄最古老的要求(也就是任務隊列中最早進入的任務),行將被履行的策略,并嘗試再次提交當前任務。
下面可以用1個小Demo來演示1下:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ExecutorServiceDemo { static void log(String msg) { System.out.println(System.currentTimeMillis() + " -> " + msg); } public static void main(String[] args) throws Exception { ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1)); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); for (int i = 0; i < 10; i++) { final int index = i; pool.submit(new Runnable() { public void run() { log("run task:" + index + " -> " + Thread.currentThread().getName()); try { Thread.sleep(1000L); } catch (Exception e) { e.printStackTrace(); } log("run over:" + index + " -> " + Thread.currentThread().getName()); } }); } log("before sleep"); Thread.sleep(4000L); log("before shutdown()"); pool.shutdown(); log("after shutdown(),pool.isTerminated=" + pool.isTerminated()); pool.awaitTermination(1, TimeUnit.SECONDS); log("now,pool.isTerminated=" + pool.isTerminated()); } }
程序設置的線程池只有1個線程,并且允許最大的線程數量也為1,同時任務隊列的最大界限為1。
當采取DisCardOldestPolicy()時:

可以知道在workQueue中的task1-task81直被拋棄(由于任務隊列只有1個容量,而線程池里的唯1線程處理的速度不是很快,而程序又不停的往線程池里提交任務)。直到最后1個task9任務才被履行。
當采取DisCardPolicy()時:

可以看見剛除開始線程履行的task0和任務隊列里的task1,其余的都被默默地拋棄了。
當采取AbortPolicy()策略時:

系統直接拋出異常。。。
當采取CallerRunsPolicy()策略時就有點不同的了:

可以看見在main線程1直幫忙處理不能被線程池處理同時也不能進入任務隊列的任務。
若以上的策略還是沒法滿足實際利用的需要。我們還可以自己擴大RejectedExecutionHandler {
void rejectedExecution(Runnable r,ThreadPoolExecutor executor);
}   
在ThreadPoolExecutor的Execute代碼里可以看見當由于超越其界限而沒有更多可用的線程或隊列槽時,或關閉 Executor 時便可能產生這類情況
 final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
 }

自己定義線程創建:ThreadFactory

看到這里我們可能有疑問那就是線程池的線程是從哪里來的?答案就是:ThreadFactory
它是1個接口,只有1個方法
public Thread newThread(Runnable r);
自定義線程池可以更加自由的設置線程的狀態,下面任性的設置線程全為后臺線程:
import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 自定義線程池 :public interface ThreadFactory * 根據需要創建新線程的對象 * Thread newThread(Runnable r) * @author Administrator * */ public class MyThreadFactory { public static class MyTask implements Runnable { @Override public void run() { System.out.println( System.currentTimeMillis() + " Thread id:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService exec = new ThreadPoolExecutor(5,5,0L,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory(){ @Override public Thread newThread(Runnable r) { Thread t = new Thread(r,"后臺線程"); t.setDaemon(true); System.out.println("create " + t.getName()+ " " + t.getId()); return t; } }); for(int i = 0; i < 5; i++) { exec.submit(task); } TimeUnit.SECONDS.sleep(2); } }

對線程池的擴大

上面僅僅只是我們自定義了創建線程時的狀態,但是有時候,我們需要對線程履行的任務進行監控,比如說任務的開始時間和結束時間。榮幸的是,ThreadPoolExecutor是1個可擴大的線程池。提供了3個方法:
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }
同時再看ThreadPoolExecutor中有1個這樣的類, private final class Worker extends AbstractQueuedSynchronizer implements Runnable它里面有1個方法runWorker
final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); clearInterruptsForTaskRun(); try { beforeExecute(w.thread, task);<span style="white-space:pre"> </span>//運行前 Throwable thrown = null; try { task.run();<span style="white-space:pre"> </span>//運行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);<span style="white-space:pre"> </span>//運行結束后 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
ThreadPoolExecutor中工作線程正是Worker的實例(它是把Runnable對象進行了包裝),Worker.runWorker()會被線程池以多線程模式異步調用,即它會被多個線程訪問,因此其beforeExecute()它們也會被多線程同時訪問。
在默許的ThreadExecutor實現中,提供了beforeExecute(),afterExecute(),terminated()空的實現。在實際的利用中可以通過對其進行擴大實現多線程池運行狀態的跟蹤。
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ExtThreadPool { public static class MyTask implements Runnable { private String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在履行線程id:" + Thread.currentThread().getId() + " Task name " + name); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService exec = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) //直接取得ThreadPoolExecutor的子類,并且重寫protect鉤子方法 { @Override protected synchronized void beforeExecute(Thread t, Runnable r) { System.out.print(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS").format(new Date())); System.out.println("準備履行:" + ((MyTask)r).name); } @Override protected synchronized void afterExecute(Runnable r, Throwable t) { System.out.print(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS").format(new Date())); System.out.println("履行完成:" + ((MyTask)r).name); } @Override protected synchronized void terminated() { System.out.print(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS").format(new Date())); System.out.println("線程池退出..."); } }; for(int i = 0; i < 5; i++) { MyTask task = new MyTask("Task" + i); exec.execute(task); Thread.sleep(10); } exec.shutdown(); } }
這里需要注意打印時間這句會產生線程安全。用Synchronized監視器讓線程在臨界區進行互斥的履行。
運行結果:


這里可以清楚地看到線程池中履行任務的創建和結束信息。

在線程池中尋覓堆棧異常信息

import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 演示線程池中的異常 * @author Administrator * */ public class DivTask implements Runnable { int a,b; public DivTask(int a, int b) { super(); this.a = a; this.b = b; } @Override public void run() { double res = a/b; System.out.println(res); } public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService es = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) { es.execute(new DivTask(100, i)); } es.shutdown(); } }
運行結果有異常拋出:
Exception in thread "pool⑴-thread⑴" java.lang.ArithmeticException: / by zero at ch3.DivTask.run(DivTask.java:25) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 50.0 33.0 25.0 100.0



但是上面的異常棧的信息還不夠全,最少我們不知道是任務到底在哪里提交的?這時候候采取可以對線程池進行擴大。
import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TraceThreadPoolExecutor extends ThreadPoolExecutor { public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public void execute(Runnable command) { super.execute(wrap(command,clientStack(),Thread.currentThread().getName())); } @Override public Future<?> submit(Runnable task) { return super.submit(wrap(task,clientStack(),Thread.currentThread().getName())); } public Exception clientStack() { return new Exception("Client stack trace"); } private Runnable wrap(final Runnable task,final Exception clientStack,String clientThreadName) { return new Runnable() { @Override public void run() { try { task.run(); } catch(Exception e) { clientStack.printStackTrace(); throw e; } } }; } }
使用這個擴大的線程池運行的結果:
100.0 25.0 33.0 50.0 java.lang.Exception: Client stack trace at DivTask$TraceThreadPoolExecutor.clientStack(DivTask.java:53) at DivTask$TraceThreadPoolExecutor.execute(DivTask.java:46) at DivTask.main(DivTask.java:75) Exception in thread "pool⑴-thread⑴" java.lang.ArithmeticException: / by zero at DivTask.run(DivTask.java:32) at DivTask$TraceThreadPoolExecutor$1.run(DivTask.java:61) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)


這樣就能夠看到完全的堆棧信息,對毛病的定位也就更準確了。
不過,有時候使用submit取得異常信息更加方便。jdk的定義:Callable 接口類似于Runnable,二者都是為那些其實例可能被另外一個線程履行的類設計的。但是Runnable 不會返回結果,并且沒法拋出經過檢查的異常。
這篇博客介紹關于submit和execute的區分,地址:http://blog.csdn.net/peachpi/article/details/6771946#

3個區分:

1、接收的參數不1樣

2、submit有返回值,而execute沒有

Method submit extends base method Executor.execute by creating and returning a Future that can be used to cancel execution and/or wait for completion. 

用到返回值的例子,比如說我有很多個做validation的task,我希望所有的task履行完,然后每一個task告知我它的履行結果,是成功還是失敗,如果是失敗,緣由是甚么。然后我就能夠把所有失敗的緣由綜合起來發給調用者。

個人覺得cancel execution這個用途不大,很少有需要去取消履行的。

而最大的用途應當是第2點。

3、submit方便Exception處理

There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with execute this exception will go to the uncaught exception handler (when you don't have provided one explicitly, the default one will just print the stack trace to System.err). If you submitted the task with submit any thrown exception, checked or not, is then part of the task's return status. For a task that was submitted with submit and that terminates with an exception, the Future.get will rethrow this exception, wrapped in an ExecutionException.

意思就是如果你在你的task里會拋出checked或unchecked exception,而你又希望外面的調用者能夠感知這些exception并做出及時的處理,那末就需要用到submit,通過捕獲Future.get拋出的異常。

比如說,我有很多更新各種數據的task,我希望如果其中1個task失敗,其它的task就不需要履行了。那我就需要catch Future.get拋出的異常,然后終止其它task的履行,代碼以下:

import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorServiceTest { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); List<Future<String>> resultList = new ArrayList<Future<String>>(); // 創建10個任務并履行 for (int i = 0; i < 10; i++) { // 使用ExecutorService履行Callable類型的任務,并將結果保存在future變量中 Future<String> future = executorService.submit(new TaskWithResult(i)); // 將任務履行結果存儲到List中 resultList.add(future); } executorService.shutdown(); // 遍歷任務的結果 for (Future<String> fs : resultList) { try { System.out.println(fs.get()); // 打印各個線程(任務)履行的結果 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { executorService.shutdownNow(); e.printStackTrace(); return; } } } } class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id) { this.id = id; } /** * 任務的具體進程,1旦任務傳給ExecutorService的submit方法,則該方法自動在1個線程上履行。 * * @return * @throws Exception */ public String call() throws Exception { System.out.println("call()方法被自動調用,干活!!! " + Thread.currentThread().getName()); if (new Random().nextBoolean()) throw new TaskException("Meet error in task." + Thread.currentThread().getName()); // 1個摹擬耗時的操作 for (int i = 999999999; i > 0; i--) ; return "call()方法被自動調用,任務的結果是:" + id + " " + Thread.currentThread().getName(); } } class TaskException extends Exception { public TaskException(String message) { super(message); } }
上面的代碼解釋的也比較詳細了。。。。運行的結果和我們在上面自己擴大的1樣都是完全的棧信息。

call()方法被自動調用,干活!!! pool⑴-thread⑵ call()方法被自動調用,干活!!! pool⑴-thread⑶ call()方法被自動調用,干活!!! pool⑴-thread⑴ call()方法被自動調用,干活!!! pool⑴-thread⑹ call()方法被自動調用,干活!!! pool⑴-thread⑷ call()方法被自動調用,干活!!! pool⑴-thread⑸ call()方法被自動調用,干活!!! pool⑴-thread⑼ call()方法被自動調用,干活!!! pool⑴-thread⑴0 call()方法被自動調用,干活!!! pool⑴-thread⑻ call()方法被自動調用,干活!!! pool⑴-thread⑺ java.util.concurrent.ExecutionException: threadTest.TaskException: Meet error in task.pool⑴-thread⑴ at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at threadTest.ExecutorServiceTest.main(ExecutorServiceTest.java:29) Caused by: threadTest.TaskException: Meet error in task.pool⑴-thread⑴ at threadTest.TaskWithResult.call(ExecutorServiceTest.java:57) at threadTest.TaskWithResult.call(ExecutorServiceTest.java:41) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 国产精品入口免费麻豆 | 欧美三级在线观看视频 | 亚洲精品一二三 | 亚洲爱爱图片 | 欧美日本视频一区 | 一区二区在线视频免费观看 | 日本不卡一区二区三区在线观看 | wwwxxxx欧美| 亚洲欧美日韩一区二区 | 亚洲国产99在线精品一区二区 | 精品欧美一区二区三区精品久久 | 久热在线视频精品网站 | 都市 校园 春色 亚洲 | 日本欧美一区二区三区乱码 | 亚洲国产日韩综合久久精品 | 欧美理论片在线观看一区二区 | 国产二区视频在线观看 | 影音先锋色成人资源网站 | 久久久久久国产精品免费免 | 欧美日韩性视频一区二区三区 | 国产欧美日韩在线人成aaaa | 中文字幕2021| 日韩亚洲欧美性感视频影片免费看 | 日韩欧美一区二区精品久久 | 国产毛片a | 三级小视频在线观看 | h毛片| 久久精品国产亚洲aa | 2020久久精品国产免费 | 中文字幕无线 | 国产成年网站v片在线观看 国产成人 免费观看 | 在线视频 亚洲 | 最近中文字幕mv手机免费高清 | 亚洲精品一区二区三区网址 | 中文字幕 亚洲 一区二区三区 | 午夜免费福利 | 91一区二区三区四区五区 | 一区二区三区 | 中文字幕在线观看 | www.国产福利 | 最近最新中文字幕免费大全 |