技術文章第一時間送達! 作者:錦成同學 juejin.im/post/5d3c46d2f265da1b9163dbce
什么?對你沒有聽錯,也沒有看錯 ..多線程并發(fā)執(zhí)行任務,取結果歸集~~ 不再憂愁…. 引言先來看一些APP的獲取數(shù)據(jù),諸如此類,一個頁面獲取N多個,多達10個左右的一個用戶行為數(shù)據(jù),比如:點贊數(shù),發(fā)布文章數(shù),點贊數(shù),消息數(shù),關注數(shù),收藏數(shù),粉絲數(shù),卡券數(shù),紅包數(shù)……….. 真的是多~ 我們看些圖: ![](http://image109.360doc.com/DownloadImg/2019/08/2018/168934835_1_20190820061902332)
![](http://image109.360doc.com/DownloadImg/2019/08/2018/168934835_2_20190820061902472)
平時要10+接口的去獲取數(shù)據(jù)(因為當你10+個查詢寫一起,那估計到半分鐘才能響應了),一個頁面上N多接口,真是累死前端的寶寶了,前端開啟多線程也累啊,我們做后端的要體量一下前端的寶寶們,畢竟有句話叫'程序員何苦為難程序員~' 今天我們也可以一個接口將這些數(shù)據(jù)返回~ 還賊TM快,解決串行編程,阻塞編程帶來的苦惱~ 多線程并發(fā)執(zhí)行任務,取結果歸集今天豬腳就是:Future、FutureTask、ExecutorService… 用上FutureTask任務獲取結果老少皆宜,就是CPU有消耗。FutureTask也可以做閉鎖(實現(xiàn)了Future的語義,表示一種抽象的可計算的結果)。通過把Callable(相當于一個可生成結果的Runnable)作為一個屬性,進而把它自己作為一個執(zhí)行器去繼承Runnable,FutureTask 實際上就是一個支持取消行為的異步任務執(zhí)行器。 Callable就是一個回調(diào)接口,可以泛型聲明返回類型,而Runnable是線程去執(zhí)行的方法.這個很簡單~大家想深入了解就進去看源碼好了~?因為真的很簡單~ FutureTask實現(xiàn)了Future,提供了start, cancel, query等功能,并且實現(xiàn)了Runnable接口,可以提交給線程執(zhí)行。 Java并發(fā)工具類的三板斧 狀態(tài),隊列,CAS
狀態(tài) /** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: //可能發(fā)生的狀態(tài)過度過程 * NEW -> COMPLETING -> NORMAL // 創(chuàng)建-->完成-->正常 * NEW -> COMPLETING -> EXCEPTIONAL // 創(chuàng)建-->完成-->異常 * NEW -> CANCELLED // 創(chuàng)建-->取消 * NEW -> INTERRUPTING -> INTERRUPTED // 創(chuàng)建-->中斷中-->中斷結束 */
private volatile int state; // 執(zhí)行器狀態(tài)
private static final int NEW = 0; // 初始值 由構造函數(shù)保證 private static final int COMPLETING = 1; // 完成進行時 正在設置任務結果 private static final int NORMAL = 2; // 正常結束 任務正常執(zhí)行完畢 private static final int EXCEPTIONAL = 3; // 發(fā)生異常 任務執(zhí)行過程中發(fā)生異常 private static final int CANCELLED = 4; // 已經(jīng)取消 任務已經(jīng)取消 private static final int INTERRUPTING = 5; // 中斷進行時 正在中斷運行任務的線程 private static final int INTERRUPTED = 6; // 中斷結束 任務被中斷
/** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters;
還不明白就看圖: ![](http://image109.360doc.com/DownloadImg/2019/08/2018/168934835_3_20190820061902629)
![](http://image109.360doc.com/DownloadImg/2019/08/2018/168934835_4_20190820061902722)
public interface Future<T> { /** *取消任務 *@param mayInterruptIfRunning *是否允許取消正在執(zhí)行卻沒有執(zhí)行完畢的任務,如果設置true,則表示可以取消正在執(zhí)行過程中的任務 *如果任務正在執(zhí)行,則返回true *如果任務還沒有執(zhí)行,則無論mayInterruptIfRunning為true還是false,返回true *如果任務已經(jīng)完成,則無論mayInterruptIfRunning為true還是false,返回false */ boolean cancel(boolean mayInterruptIfRunning); /** *任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true */ boolean isCancelled(); /** *任務是否完成 */ boolean isDone(); /** *通過阻塞獲取執(zhí)行結果 */ T get() throws InterruptedException, ExecutionException; /** *通過阻塞獲取執(zhí)行結果。如果在指定的時間內(nèi)沒有返回,則返回null */ T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future 重點說明:Furture.get()獲取執(zhí)行結果的值,取決于執(zhí)行的狀態(tài),如果任務完成,會立即返回結果,否則一直阻塞直到任務進入完成狀態(tài),然后返回結果或者拋出異常。 “運行完成”表示計算的所有可能結束的狀態(tài),包含正常結束,由于取消而結束和由于異常而結束。當進入完成狀態(tài),他會停止在這個狀態(tài)上,只要state不處于 NEW 狀態(tài),就說明任務已經(jīng)執(zhí)行完畢。 FutureTask負責將計算結果從執(zhí)行任務的線程傳遞到調(diào)用這個線程的線程,而且確保了,傳遞過程中結果的安全發(fā)布 UNSAFE 無鎖編程技術,確保了線程的安全性~ 為了保持無鎖編程CPU的消耗,所以用狀態(tài)標記,減少空轉的時候CPU的壓力 run方法1、檢查state,非NEW,說明已經(jīng)啟動,直接返回;否則,設置runner為當前線程,成功則繼續(xù),否則,返回。 2、調(diào)用Callable.call()方法執(zhí)行任務,成功則調(diào)用set(result)方法,失敗則調(diào)用setException(ex)方法,最終都會設置state,并調(diào)用finishCompletion()方法,喚醒阻塞在get()方法上的線程們。 3、如注釋所示,如果省略ran變量,并把'set(result);' 語句移動到try代碼塊'ran = true;' 語句處,會怎樣呢?首先,從代碼邏輯上看,是沒有問題的,但是,考慮到'set(result);'方法萬一拋出異常甚至是錯誤了呢?set()方法最終會調(diào)用到用戶自定義的done()方法,所以,不可省略。 4、如果state為INTERRUPTING, 則主動讓出CPU,自旋等待別的線程執(zhí)行完中斷流程。見handlePossibleCancellationInterrupt(int s) 方法。 public void run() { // UNSAFE.compareAndSwapObject, CAS保證Callable任務只被執(zhí)行一次 無鎖編程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 拿到執(zhí)行任務 if (c != null && state == NEW) { // 任務不為空,并且執(zhí)行器狀態(tài)是初始值,才會執(zhí)行;如果取消就不執(zhí)行了 V result; boolean ran; // 記錄是否執(zhí)行成功 try { result = c.call(); // 執(zhí)行任務 ran = true; // 成功 } catch (Throwable ex) { result = null; // 異常,清空結果 ran = false; // 失敗 setException(ex); // 記錄異常 } if (ran) // 問題:ran變量可以省略嗎,把set(result);移到try塊里面? set(result); // 設置結果 } } finally { runner = null; // 直到set狀態(tài)前,runner一直都是非空的,為了防止并發(fā)調(diào)用run()方法。 int s = state; if (s >= INTERRUPTING) // 有別的線程要中斷當前線程,把CPU讓出去,自旋等一下 handlePossibleCancellationInterrupt(s); } } private void handlePossibleCancellationInterrupt(int s) { if (s == INTERRUPTING) // 當state為INTERRUPTING時 while (state == INTERRUPTING) // 表示有線程正在中斷當前線程 Thread.yield(); // 讓出CPU,自旋等待中斷 }
再啰嗦下: run方法重點做了以下幾件事: 將runner屬性設置成當前正在執(zhí)行run方法的線程 調(diào)用callable成員變量的call方法來執(zhí)行任務 設置執(zhí)行結果outcome, 如果執(zhí)行成功, 則outcome保存的就是執(zhí)行結果;如果執(zhí)行過程中發(fā)生了異常, 則outcome中保存的就是異常,設置結果之前,先將state狀態(tài)設為中間態(tài) 對outcome的賦值完成后,設置state狀態(tài)為終止態(tài)(NORMAL或者EXCEPTIONAL) 喚醒Treiber棧中所有等待的線程 善后清理(waiters, callable,runner設為null) 檢查是否有遺漏的中斷,如果有,等待中斷狀態(tài)完成。
怎么能少了get方法呢,一直阻塞獲取參見:awaitDone public V get() throws InterruptedException, ExecutionException { int s = state; // 執(zhí)行器狀態(tài) if (s <= COMPLETING) // 如果狀態(tài)小于等于COMPLETING,說明任務正在執(zhí)行,需要等待 s = awaitDone(false, 0L); // 等待 return report(s); // 報告結果 }
順便偷偷看下get(long, TimeUnit),就是get的方法擴展,增加了超時時間,超時后我還沒拿到就生氣拋異?!? public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) // 參數(shù)校驗 throw new NullPointerException(); int s = state; // 執(zhí)行器狀態(tài) if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) // 如果狀態(tài)小于等于COMPLETING,說明任務正在執(zhí)行,需要等待;等待指定時間,state依然小于等于COMPLETING throw new TimeoutException(); // 拋出超時異常 return report(s); // 報告結果 }
那么再看awaitDone,要知道會寫死循環(huán)while(true)|for (;;)的都是高手~ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; // 計算deadline WaitNode q = null; // 等待結點 boolean queued = false; // 是否已經(jīng)入隊 for (;;) { if (Thread.interrupted()) { // 如果當前線程已經(jīng)標記中斷,則直接移除此結點,并拋出中斷異常 removeWaiter(q); throw new InterruptedException(); }
int s = state; // 執(zhí)行器狀態(tài) if (s > COMPLETING) { // 如果狀態(tài)大于COMPLETING,說明任務已經(jīng)完成,或者已經(jīng)取消,直接返回 if (q != null) q.thread = null; // 復位線程屬性 return s; // 返回 } else if (s == COMPLETING) // 如果狀態(tài)等于COMPLETING,說明正在整理結果,自旋等待一會兒 Thread.yield(); else if (q == null) // 初始,構建結點 q = new WaitNode(); else if (!queued) // 還沒入隊,則CAS入隊 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { // 是否允許超時 nanos = deadline - System.nanoTime(); // 計算等待時間 if (nanos <= 0L) { // 超時 removeWaiter(q); // 移除結點 return state; // 返回結果 } LockSupport.parkNanos(this, nanos); // 線程阻塞指定時間 } else LockSupport.park(this); // 阻塞線程 } }
至此,線程安排任務和獲取我就不啰嗦了~~~~還要很多探索的,畢竟帶薪聊天比較緊張,我就不多贅述了~ 隊列接著我們來看隊列,在FutureTask中,隊列的實現(xiàn)是一個單向鏈表,它表示所有等待任務執(zhí)行完畢的線程的集合。我們知道,F(xiàn)utureTask實現(xiàn)了Future接口,可以獲取“Task”的執(zhí)行結果,那么如果獲取結果時,任務還沒有執(zhí)行完畢怎么辦呢?那么獲取結果的線程就會在一個等待隊列中掛起,直到任務執(zhí)行完畢被喚醒。這一點有點類似于AQS中的sync queue,在下文的分析中,大家可以自己對照它們的異同點。 我們前面說過,在并發(fā)編程中使用隊列通常是將當前線程包裝成某種類型的數(shù)據(jù)結構扔到等待隊列中,我們先來看看隊列中的每一個節(jié)點是怎么個結構: static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
可見,相比于AQS的sync queue所使用的雙向鏈表中的Node,這個WaitNode要簡單多了,它只包含了一個記錄線程的thread屬性和指向下一個節(jié)點的next屬性。 值得一提的是,F(xiàn)utureTask中的這個單向鏈表是當做棧來使用的,確切來說是當做Treiber棧來使用的,不了解Treiber棧是個啥的可以簡單的把它當做是一個線程安全的棧,它使用CAS來完成入棧出棧操作(想進一步了解的話可以看這篇文章)。 為啥要使用一個線程安全的棧呢,因為同一時刻可能有多個線程都在獲取任務的執(zhí)行結果,如果任務還在執(zhí)行過程中,則這些線程就要被包裝成WaitNode扔到Treiber棧的棧頂,即完成入棧操作,這樣就有可能出現(xiàn)多個線程同時入棧的情況,因此需要使用CAS操作保證入棧的線程安全,對于出棧的情況也是同理。 由于FutureTask中的隊列本質上是一個Treiber(驅動)棧,那么使用這個隊列就只需要一個指向棧頂節(jié)點的指針就行了,在FutureTask中,就是waiters屬性: /** Treiber stack of waiting threads */ private volatile WaitNode waiters;
事實上,它就是整個單向鏈表的頭節(jié)點。 綜上,F(xiàn)utureTask中所使用的隊列的結構如下: ![](http://image109.360doc.com/DownloadImg/2019/08/2018/168934835_5_20190820061902785)
CAS操作CAS操作大多數(shù)是用來改變狀態(tài)的,在FutureTask中也不例外。我們一般在靜態(tài)代碼塊中初始化需要CAS操作的屬性的偏移量: // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField('state')); runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField('runner')); waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField('waiters')); } catch (Exception e) { throw new Error(e); } }
從這個靜態(tài)代碼塊中我們也可以看出,CAS操作主要針對3個屬性,包括state、runner和waiters,說明這3個屬性基本是會被多個線程同時訪問的。其中state屬性代表了任務的狀態(tài),waiters屬性代表了指向棧頂節(jié)點的指針,這兩個我們上面已經(jīng)分析過了。 runner屬性代表了執(zhí)行FutureTask中的“Task”的線程。為什么需要一個屬性來記錄執(zhí)行任務的線程呢?這是為了中斷或者取消任務做準備的,只有知道了執(zhí)行任務的線程是誰,我們才能去中斷它。 定義完屬性的偏移量之后,接下來就是CAS操作本身了。在FutureTask,CAS操作最終調(diào)用的還是Unsafe類的compareAndSwapXXX方法,關于Unsafe,由于帶薪碼文這里不再贅述。 實戰(zhàn)演練一切沒有例子的講解都是耍流氓 >>> 蔥姜切沫~~加入生命的源泉…. 實戰(zhàn)項目以springboot為項目腳手架,github地址: https://github.com/leaJone/mybot
1.MyFutureTask實現(xiàn)類內(nèi)部定義一個線程池進行任務的調(diào)度和線程的管理以及線程的復用,大家可以根據(jù)自己的實際項目情況進行配置 其中線程調(diào)度示例:核心線程 8 最大線程 20 ?;顣r間30s 存儲隊列 10 有守護線程 拒絕策略:將超負荷任務回退到調(diào)用者 說明 : 默認使用核心線程(8)數(shù)執(zhí)行任務,任務數(shù)量超過核心線程數(shù)就丟到隊列,隊列(10)滿了就再開啟新的線程,新的線程數(shù)最大為20,當任務執(zhí)行完,新開啟的線程將存活30s,若沒有任務就消亡,線程池回到核心線程數(shù)量.
import com.boot.lea.mybot.dto.UserBehaviorDataDTO; import com.boot.lea.mybot.service.UserService; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import java.util.concurrent.*;
/** * @author Lijing * @date 2019年7月29日 */ @Slf4j @Component public class MyFutureTask {
@Resource UserService userService;
/** * 核心線程 8 最大線程 20 ?;顣r間30s 存儲隊列 10 有守護線程 拒絕策略:將超負荷任務回退到調(diào)用者 */ private static ExecutorService executor = new ThreadPoolExecutor(8, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10), new ThreadFactoryBuilder().setNameFormat('User_Async_FutureTask-%d').setDaemon(true).build(), new ThreadPoolExecutor.CallerRunsPolicy());
@SuppressWarnings('all') public UserBehaviorDataDTO getUserAggregatedResult(final Long userId) {
System.out.println('MyFutureTask的線程:' + Thread.currentThread());
long fansCount = 0, msgCount = 0, collectCount = 0, followCount = 0, redBagCount = 0, couponCount = 0;
// fansCount = userService.countFansCountByUserId(userId); // msgCount = userService.countMsgCountByUserId(userId); // collectCount = userService.countCollectCountByUserId(userId); // followCount = userService.countFollowCountByUserId(userId); // redBagCount = userService.countRedBagCountByUserId(userId); // couponCount = userService.countCouponCountByUserId(userId);
try {
Future<Long> fansCountFT = executor.submit(() -> userService.countFansCountByUserId(userId)); Future<Long> msgCountFT = executor.submit(() -> userService.countMsgCountByUserId(userId)); Future<Long> collectCountFT = executor.submit(() -> userService.countCollectCountByUserId(userId)); Future<Long> followCountFT = executor.submit(() -> userService.countFollowCountByUserId(userId)); Future<Long> redBagCountFT = executor.submit(() -> userService.countRedBagCountByUserId(userId)); Future<Long> couponCountFT = executor.submit(() -> userService.countCouponCountByUserId(userId));
//get阻塞 fansCount = fansCountFT.get(); msgCount = msgCountFT.get(); collectCount = collectCountFT.get(); followCount = followCountFT.get(); redBagCount = redBagCountFT.get(); couponCount = couponCountFT.get();
} catch (InterruptedException | ExecutionException e) { e.printStackTrace(); log.error('>>>>>>聚合查詢用戶聚合信息異常:' + e + '<<<<<<<<<'); } UserBehaviorDataDTO userBehaviorData = UserBehaviorDataDTO.builder().fansCount(fansCount).msgCount(msgCount) .collectCount(collectCount).followCount(followCount) .redBagCount(redBagCount).couponCount(couponCount).build(); return userBehaviorData; }
}
2.service業(yè)務方法常規(guī)業(yè)務查詢方法,為了特效,以及看出實際的效果,我們每個方法做了延時 import com.boot.lea.mybot.mapper.UserMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service public class UserServiceImpl implements UserService {
@Autowired UserMapper userMapper;
@Override public long countFansCountByUserId(Long userId) { try { Thread.sleep(10000); System.out.println('獲取FansCount===睡眠:' + 10 + 's'); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println('UserService獲取FansCount的線程 ' + Thread.currentThread().getName()); return 520; }
@Override public long countMsgCountByUserId(Long userId) { System.out.println('UserService獲取MsgCount的線程 ' + Thread.currentThread().getName()); try { Thread.sleep(10000); System.out.println('獲取MsgCount===睡眠:' + 10 + 's'); } catch (InterruptedException e) { e.printStackTrace(); } return 618; }
@Override public long countCollectCountByUserId(Long userId) { System.out.println('UserService獲取CollectCount的線程 ' + Thread.currentThread().getName()); try { Thread.sleep(10000); System.out.println('獲取CollectCount==睡眠:' + 10 + 's'); } catch (InterruptedException e) { e.printStackTrace(); } return 6664; }
@Override public long countFollowCountByUserId(Long userId) { System.out.println('UserService獲取FollowCount的線程 ' + Thread.currentThread().getName()); try { Thread.sleep(10000); System.out.println('獲取FollowCount===睡眠:' + 10+ 's'); } catch (InterruptedException e) { e.printStackTrace(); } return userMapper.countFollowCountByUserId(userId); }
@Override public long countRedBagCountByUserId(Long userId) { System.out.println('UserService獲取RedBagCount的線程 ' + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(4); System.out.println('獲取RedBagCount===睡眠:' + 4 + 's'); } catch (InterruptedException e) { e.printStackTrace(); } return 99; }
@Override public long countCouponCountByUserId(Long userId) { System.out.println('UserService獲取CouponCount的線程 ' + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(8); System.out.println('獲取CouponCount===睡眠:' + 8+ 's'); } catch (InterruptedException e) { e.printStackTrace(); } return 66; } }
3.controller調(diào)用/** * @author LiJing * @ClassName: UserController * @Description: 用戶控制器 * @date 2019/7/29 15:16 */ @RestController @RequestMapping('user/') public class UserController {
@Autowired private UserService userService;
@Autowired private MyFutureTask myFutureTask;
@GetMapping('/index') @ResponseBody public String index() { return '啟動用戶模塊成功~~~~~~~~'; }
//http://localhost:8080/api/user/get/data?userId=4
@GetMapping('/get/data') @ResponseBody public UserBehaviorDataDTO getUserData(Long userId) { System.out.println('UserController的線程:' + Thread.currentThread()); long begin = System.currentTimeMillis(); UserBehaviorDataDTO userAggregatedResult = myFutureTask.getUserAggregatedResult(userId); long end = System.currentTimeMillis(); System.out.println('===============總耗時:' + (end - begin) /1000.0000+ '秒'); return userAggregatedResult; }
}
我們啟動項目:開啟調(diào)用 http://localhost:8080/api/user/get/data?userId=4 當我們線程池配置為:核心線程 8 最大線程 20 ?;顣r間30s 存儲隊列 10 的時候,我們測試的結果如下: ![](http://image109.360doc.com/DownloadImg/2019/08/2018/168934835_6_20190820061902894)
![](http://image109.360doc.com/DownloadImg/2019/08/2018/168934835_7_20190820061902957)
結果:我們看到每個server method的執(zhí)行線程都是從線程池中發(fā)起的線程名:User_Async_FutureTask-%d, 總耗時從累計的52秒縮短到10秒,即取決于最耗時的方法查詢時間. 那我們再將注釋代碼放開,進行串行查詢進行測試: ![](http://image109.360doc.com/DownloadImg/2019/08/2018/168934835_8_2019082006190335)
![](http://image109.360doc.com/DownloadImg/2019/08/2018/168934835_9_20190820061903113)
![](http://image109.360doc.com/DownloadImg/2019/08/2018/168934835_10_20190820061903176)
結果:我們使用串行的方式進行查詢,結果匯總將達到52秒,那太可怕了~~ 總結使用FutureTask的時候,就是將任務runner以caller的方式進行回調(diào),阻塞獲取,最后我們將結果匯總,即完成了開啟多線程異步調(diào)用我們的業(yè)務方法. Future<Long> fansCountFT = executor.submit(new Callable<Long>() { @Override public Long call() throws Exception { return userService.countFansCountByUserId(userId); } });
這里使用的只是一個簡單的例子,具體項目可以定義具體的業(yè)務方法進行歸并處理,其實在JDK1.8以后,又有了ExecutorCompletionService,ForkJoinTask,CompletableFuture這些都可以實現(xiàn)上述的方法,我們后續(xù)會做一些這些方法使用的案例,期望大家的關注,文章中有不足之處,歡迎指正~ 小甜點![](http://pubimage.360doc.com/wz/default.gif)
所以:我們要用到親愛的Spring的異步編程,異步編程有很多種方式:比如常見的Future的sync,CompletableFuture.supplyAsync,@Async,哈哈 其實都離不開Thread.start()…,等等我說個笑話: 老爸有倆孩子:小紅和小明。老爸想喝酒了,他讓小紅去買酒,小紅出去了。然后老爸突然想吸煙了,于是老爸讓小明去買煙。在面對對象的思想中,一般會把買東西,然后買回來這件事作為一個方法,如果按照順序結構或者使用多線程同步的話,小明想去買煙就必須等小紅這個買東西的操作進行完。這樣無疑增加了時間的開銷(萬一老爸尿憋呢?)。異步就是為了解決這樣的問題。你可以分別給小紅小明下達指令,讓他們?nèi)ベI東西,然后你就可以自己做自己的事,等他們買回來的時候接收結果就可以了。
package com.boot.lea.mybot.futrue;
/** * @ClassName: TestFuture * @Description: 演示異步編程 * @author LiJing * @date 2019/8/5 15:16 */ @SuppressWarnings('all') public class TestFuture { static ExecutorService executor = Executors.newFixedThreadPool(2);
public static void main(String[] args) throws InterruptedException { //兩個線程的線程池 //小紅買酒任務,這里的future2代表的是小紅未來發(fā)生的操作,返回小紅買東西這個操作的結果 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println('爸:小紅你去買瓶酒!'); try { System.out.println('小紅出去買酒了,女孩子跑的比較慢,估計5s后才會回來...'); Thread.sleep(5000); return '我買回來了!'; } catch (InterruptedException e) { System.err.println('小紅路上遭遇了不測'); return '來世再見!'; } }, executor);
//小明買煙任務,這里的future1代表的是小明未來買東西會發(fā)生的事,返回值是小明買東西的結果 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { System.out.println('爸:小明你去買包煙!'); try { System.out.println('小明出去買煙了,可能要3s后回來...'); Thread.sleep(3000);
throw new InterruptedException(); // return '我買回來了!'; } catch (InterruptedException e) { System.out.println('小明路上遭遇了不測!'); return '這是我托人帶來的口信,我已經(jīng)不在了。'; } }, executor);
//獲取小紅買酒結果,從小紅的操作中獲取結果,把結果打印 future2.thenAccept((e) -> { System.out.println('小紅說:' + e); }); //獲取小明買煙的結果 future1.thenAccept((e) -> { System.out.println('小明說:' + e); });
System.out.println('爸:等啊等 西湖美景三月天嘞......'); System.out.println('爸: 我覺得無聊甚至去了趟廁所。'); Thread.currentThread().join(9 * 1000); System.out.println('爸:終于給老子買來了......huo 酒'); //關閉線程池 executor.shutdown(); } }
運行結果: ![](http://pubimage.360doc.com/wz/default.gif)
|