一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

Java并發(fā)異步編程,原來十個接口的活現(xiàn)在只需要一個接口就搞定!

 西北望msm66g9f 2019-08-20

技術文章第一時間送達!

作者:錦成同學

juejin.im/post/5d3c46d2f265da1b9163dbce

什么?對你沒有聽錯,也沒有看錯 ..多線程并發(fā)執(zhí)行任務,取結果歸集~~ 不再憂愁….

引言

先來看一些APP的獲取數(shù)據(jù),諸如此類,一個頁面獲取N多個,多達10個左右的一個用戶行為數(shù)據(jù),比如:點贊數(shù),發(fā)布文章數(shù),點贊數(shù),消息數(shù),關注數(shù),收藏數(shù),粉絲數(shù),卡券數(shù),紅包數(shù)……….. 真的是多~ 我們看些圖:

平時要10+接口的去獲取數(shù)據(jù)(因為當你10+個查詢寫一起,那估計到半分鐘才能響應了),一個頁面上N多接口,真是累死前端的寶寶了,前端開啟多線程也累啊,我們做后端的要體量一下前端的寶寶們,畢竟有句話叫'程序員何苦為難程序員~'

今天我們也可以一個接口將這些數(shù)據(jù)返回~ 還賊TM快,解決串行編程,阻塞編程帶來的苦惱~

多線程并發(fā)執(zhí)行任務,取結果歸集

今天豬腳就是:Future、FutureTask、ExecutorService…

  • 用上FutureTask任務獲取結果老少皆宜,就是CPU有消耗。FutureTask也可以做閉鎖(實現(xiàn)了Future的語義,表示一種抽象的可計算的結果)。通過把Callable(相當于一個可生成結果的Runnable)作為一個屬性,進而把它自己作為一個執(zhí)行器去繼承Runnable,FutureTask 實際上就是一個支持取消行為的異步任務執(zhí)行器。

  • Callable就是一個回調(diào)接口,可以泛型聲明返回類型,而Runnable是線程去執(zhí)行的方法.這個很簡單~大家想深入了解就進去看源碼好了~?因為真的很簡單~

  • FutureTask實現(xiàn)了Future,提供了start, cancel, query等功能,并且實現(xiàn)了Runnable接口,可以提交給線程執(zhí)行。

  • Java并發(fā)工具類的三板斧 狀態(tài),隊列,CAS

狀態(tài)

 /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:        //可能發(fā)生的狀態(tài)過度過程
     * NEW -> COMPLETING -> NORMAL        // 創(chuàng)建-->完成-->正常
     * NEW -> COMPLETING -> EXCEPTIONAL   // 創(chuàng)建-->完成-->異常
     * NEW -> CANCELLED                   // 創(chuàng)建-->取消
     * NEW -> INTERRUPTING -> INTERRUPTED // 創(chuàng)建-->中斷中-->中斷結束
     */


    private volatile int state;                  // 執(zhí)行器狀態(tài)

    private static final int NEW = 0;            // 初始值        由構造函數(shù)保證 
    private static final int COMPLETING = 1;     // 完成進行時    正在設置任務結果
    private static final int NORMAL = 2;         // 正常結束      任務正常執(zhí)行完畢
    private static final int EXCEPTIONAL = 3;    // 發(fā)生異常      任務執(zhí)行過程中發(fā)生異常
    private static final int CANCELLED = 4;      // 已經(jīng)取消      任務已經(jīng)取消
    private static final int INTERRUPTING = 5;   // 中斷進行時    正在中斷運行任務的線程
    private static final int INTERRUPTED = 6;    // 中斷結束      任務被中斷

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

還不明白就看圖:

public interface Future<T{
    /**
    *取消任務
    *@param mayInterruptIfRunning
    *是否允許取消正在執(zhí)行卻沒有執(zhí)行完畢的任務,如果設置true,則表示可以取消正在執(zhí)行過程中的任務
    *如果任務正在執(zhí)行,則返回true
    *如果任務還沒有執(zhí)行,則無論mayInterruptIfRunning為true還是false,返回true
    *如果任務已經(jīng)完成,則無論mayInterruptIfRunning為true還是false,返回false
    */

    boolean cancel(boolean mayInterruptIfRunning);
    /**
    *任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true
    */

    boolean isCancelled();
    /**
    *任務是否完成
    */

    boolean isDone();
    /**
    *通過阻塞獲取執(zhí)行結果
    */

    get() throws InterruptedException, ExecutionException;
    /**
    *通過阻塞獲取執(zhí)行結果。如果在指定的時間內(nèi)沒有返回,則返回null
    */

    get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException
;
}

Future

  • cancle 可以停止任務的執(zhí)行 但不一定成功 看返回值true or false

  • get 阻塞獲取callable的任務結果,即get阻塞住調(diào)用線程,直至計算完成返回結果

  • isCancelled 是否取消成功

  • isDone 是否完成

重點說明:

Furture.get()獲取執(zhí)行結果的值,取決于執(zhí)行的狀態(tài),如果任務完成,會立即返回結果,否則一直阻塞直到任務進入完成狀態(tài),然后返回結果或者拋出異常。

“運行完成”表示計算的所有可能結束的狀態(tài),包含正常結束,由于取消而結束和由于異常而結束。當進入完成狀態(tài),他會停止在這個狀態(tài)上,只要state不處于 NEW 狀態(tài),就說明任務已經(jīng)執(zhí)行完畢。

FutureTask負責將計算結果從執(zhí)行任務的線程傳遞到調(diào)用這個線程的線程,而且確保了,傳遞過程中結果的安全發(fā)布

UNSAFE 無鎖編程技術,確保了線程的安全性~ 為了保持無鎖編程CPU的消耗,所以用狀態(tài)標記,減少空轉的時候CPU的壓力

  • 任務本尊:callable

  • 任務的執(zhí)行者:runner

  • 任務的結果:outcome

  • 獲取任務的結果:state + outcome + waiters

  • 中斷或者取消任務:state + runner + waiters

run方法

1、檢查state,非NEW,說明已經(jīng)啟動,直接返回;否則,設置runner為當前線程,成功則繼續(xù),否則,返回。

2、調(diào)用Callable.call()方法執(zhí)行任務,成功則調(diào)用set(result)方法,失敗則調(diào)用setException(ex)方法,最終都會設置state,并調(diào)用finishCompletion()方法,喚醒阻塞在get()方法上的線程們。

3、如注釋所示,如果省略ran變量,并把'set(result);' 語句移動到try代碼塊'ran = true;' 語句處,會怎樣呢?首先,從代碼邏輯上看,是沒有問題的,但是,考慮到'set(result);'方法萬一拋出異常甚至是錯誤了呢?set()方法最終會調(diào)用到用戶自定義的done()方法,所以,不可省略。

4、如果state為INTERRUPTING, 則主動讓出CPU,自旋等待別的線程執(zhí)行完中斷流程。見handlePossibleCancellationInterrupt(int s) 方法。

public void run({
        // UNSAFE.compareAndSwapObject, CAS保證Callable任務只被執(zhí)行一次 無鎖編程
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable; // 拿到執(zhí)行任務
            if (c != null && state == NEW) { // 任務不為空,并且執(zhí)行器狀態(tài)是初始值,才會執(zhí)行;如果取消就不執(zhí)行了
                V result;
                boolean ran; // 記錄是否執(zhí)行成功
                try {
                    result = c.call(); // 執(zhí)行任務
                    ran = true// 成功
                } catch (Throwable ex) {
                    result = null// 異常,清空結果
                    ran = false// 失敗
                    setException(ex); // 記錄異常
                }
                if (ran) // 問題:ran變量可以省略嗎,把set(result);移到try塊里面?
                    set(result); // 設置結果
            }
        } finally {
            runner = null// 直到set狀態(tài)前,runner一直都是非空的,為了防止并發(fā)調(diào)用run()方法。
            int s = state;
            if (s >= INTERRUPTING) // 有別的線程要中斷當前線程,把CPU讓出去,自旋等一下
                handlePossibleCancellationInterrupt(s);
        }
    }
      private void handlePossibleCancellationInterrupt(int s{
         if (s == INTERRUPTING) // 當state為INTERRUPTING時
             while (state == INTERRUPTING) // 表示有線程正在中斷當前線程
                 Thread.yield(); // 讓出CPU,自旋等待中斷
     }

再啰嗦下: run方法重點做了以下幾件事:

  • 將runner屬性設置成當前正在執(zhí)行run方法的線程

  • 調(diào)用callable成員變量的call方法來執(zhí)行任務

  • 設置執(zhí)行結果outcome, 如果執(zhí)行成功, 則outcome保存的就是執(zhí)行結果;如果執(zhí)行過程中發(fā)生了異常, 則outcome中保存的就是異常,設置結果之前,先將state狀態(tài)設為中間態(tài)

  • 對outcome的賦值完成后,設置state狀態(tài)為終止態(tài)(NORMAL或者EXCEPTIONAL)

  • 喚醒Treiber棧中所有等待的線程

  • 善后清理(waiters, callable,runner設為null)

  • 檢查是否有遺漏的中斷,如果有,等待中斷狀態(tài)完成。

怎么能少了get方法呢,一直阻塞獲取參見:awaitDone

    public V get() throws InterruptedException, ExecutionException {
        int s = state; // 執(zhí)行器狀態(tài)
         if (s <= COMPLETING) // 如果狀態(tài)小于等于COMPLETING,說明任務正在執(zhí)行,需要等待
             s = awaitDone(false0L); // 等待
         return report(s); // 報告結果
     }

順便偷偷看下get(long, TimeUnit),就是get的方法擴展,增加了超時時間,超時后我還沒拿到就生氣拋異?!?

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null// 參數(shù)校驗
            throw new NullPointerException();
        int s = state; // 執(zhí)行器狀態(tài)
        if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) // 如果狀態(tài)小于等于COMPLETING,說明任務正在執(zhí)行,需要等待;等待指定時間,state依然小于等于COMPLETING
            throw new TimeoutException(); // 拋出超時異常
        return report(s); // 報告結果
    }

那么再看awaitDone,要知道會寫死循環(huán)while(true)|for (;;)的都是高手~

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L// 計算deadline
        WaitNode q = null// 等待結點
        boolean queued = false// 是否已經(jīng)入隊
        for (;;) {
            if (Thread.interrupted()) { // 如果當前線程已經(jīng)標記中斷,則直接移除此結點,并拋出中斷異常
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state; // 執(zhí)行器狀態(tài)
            if (s > COMPLETING) { // 如果狀態(tài)大于COMPLETING,說明任務已經(jīng)完成,或者已經(jīng)取消,直接返回
                if (q != null)
                    q.thread = null// 復位線程屬性
                return s; // 返回
            } else if (s == COMPLETING) // 如果狀態(tài)等于COMPLETING,說明正在整理結果,自旋等待一會兒
                Thread.yield();
            else if (q == null// 初始,構建結點
                q = new WaitNode();
            else if (!queued) // 還沒入隊,則CAS入隊
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            else if (timed) { // 是否允許超時
                nanos = deadline - System.nanoTime(); // 計算等待時間
                if (nanos <= 0L) { // 超時
                    removeWaiter(q); // 移除結點
                    return state; // 返回結果
                }
                LockSupport.parkNanos(this, nanos); // 線程阻塞指定時間
            } else
                LockSupport.park(this); // 阻塞線程
        }
    }

至此,線程安排任務和獲取我就不啰嗦了~~~~還要很多探索的,畢竟帶薪聊天比較緊張,我就不多贅述了~

隊列

接著我們來看隊列,在FutureTask中,隊列的實現(xiàn)是一個單向鏈表,它表示所有等待任務執(zhí)行完畢的線程的集合。我們知道,F(xiàn)utureTask實現(xiàn)了Future接口,可以獲取“Task”的執(zhí)行結果,那么如果獲取結果時,任務還沒有執(zhí)行完畢怎么辦呢?那么獲取結果的線程就會在一個等待隊列中掛起,直到任務執(zhí)行完畢被喚醒。這一點有點類似于AQS中的sync queue,在下文的分析中,大家可以自己對照它們的異同點。

我們前面說過,在并發(fā)編程中使用隊列通常是將當前線程包裝成某種類型的數(shù)據(jù)結構扔到等待隊列中,我們先來看看隊列中的每一個節(jié)點是怎么個結構:

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

可見,相比于AQS的sync queue所使用的雙向鏈表中的Node,這個WaitNode要簡單多了,它只包含了一個記錄線程的thread屬性和指向下一個節(jié)點的next屬性。

值得一提的是,F(xiàn)utureTask中的這個單向鏈表是當做棧來使用的,確切來說是當做Treiber棧來使用的,不了解Treiber棧是個啥的可以簡單的把它當做是一個線程安全的棧,它使用CAS來完成入棧出棧操作(想進一步了解的話可以看這篇文章)。

為啥要使用一個線程安全的棧呢,因為同一時刻可能有多個線程都在獲取任務的執(zhí)行結果,如果任務還在執(zhí)行過程中,則這些線程就要被包裝成WaitNode扔到Treiber棧的棧頂,即完成入棧操作,這樣就有可能出現(xiàn)多個線程同時入棧的情況,因此需要使用CAS操作保證入棧的線程安全,對于出棧的情況也是同理。

由于FutureTask中的隊列本質上是一個Treiber(驅動)棧,那么使用這個隊列就只需要一個指向棧頂節(jié)點的指針就行了,在FutureTask中,就是waiters屬性:

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

事實上,它就是整個單向鏈表的頭節(jié)點。

綜上,F(xiàn)utureTask中所使用的隊列的結構如下:

CAS操作

CAS操作大多數(shù)是用來改變狀態(tài)的,在FutureTask中也不例外。我們一般在靜態(tài)代碼塊中初始化需要CAS操作的屬性的偏移量:

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField('state'));
            runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField('runner'));
            waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField('waiters'));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

從這個靜態(tài)代碼塊中我們也可以看出,CAS操作主要針對3個屬性,包括state、runner和waiters,說明這3個屬性基本是會被多個線程同時訪問的。其中state屬性代表了任務的狀態(tài),waiters屬性代表了指向棧頂節(jié)點的指針,這兩個我們上面已經(jīng)分析過了。

runner屬性代表了執(zhí)行FutureTask中的“Task”的線程。為什么需要一個屬性來記錄執(zhí)行任務的線程呢?這是為了中斷或者取消任務做準備的,只有知道了執(zhí)行任務的線程是誰,我們才能去中斷它。

定義完屬性的偏移量之后,接下來就是CAS操作本身了。在FutureTask,CAS操作最終調(diào)用的還是Unsafe類的compareAndSwapXXX方法,關于Unsafe,由于帶薪碼文這里不再贅述。

實戰(zhàn)演練

一切沒有例子的講解都是耍流氓 >>> 蔥姜切沫~~加入生命的源泉….

實戰(zhàn)項目以springboot為項目腳手架,github地址:

https://github.com/leaJone/mybot

1.MyFutureTask實現(xiàn)類

內(nèi)部定義一個線程池進行任務的調(diào)度和線程的管理以及線程的復用,大家可以根據(jù)自己的實際項目情況進行配置

其中線程調(diào)度示例:核心線程 8 最大線程 20 ?;顣r間30s 存儲隊列 10 有守護線程 拒絕策略:將超負荷任務回退到調(diào)用者

說明 :

默認使用核心線程(8)數(shù)執(zhí)行任務,任務數(shù)量超過核心線程數(shù)就丟到隊列,隊列(10)滿了就再開啟新的線程,新的線程數(shù)最大為20,當任務執(zhí)行完,新開啟的線程將存活30s,若沒有任務就消亡,線程池回到核心線程數(shù)量.

import com.boot.lea.mybot.dto.UserBehaviorDataDTO;
import com.boot.lea.mybot.service.UserService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.*;


/**
 * @author Lijing
 * @date 2019年7月29日
 */

@Slf4j
@Component
public class MyFutureTask {


    @Resource
    UserService userService;

    /**
     * 核心線程 8 最大線程 20 ?;顣r間30s 存儲隊列 10 有守護線程 拒絕策略:將超負荷任務回退到調(diào)用者
     */

    private static ExecutorService executor = new ThreadPoolExecutor(820,
            30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10),
            new ThreadFactoryBuilder().setNameFormat('User_Async_FutureTask-%d').setDaemon(true).build(),
            new ThreadPoolExecutor.CallerRunsPolicy());


    @SuppressWarnings('all')
    public UserBehaviorDataDTO getUserAggregatedResult(final Long userId) {

        System.out.println('MyFutureTask的線程:' + Thread.currentThread());


        long fansCount = 0, msgCount = 0, collectCount = 0,
                followCount = 0, redBagCount = 0, couponCount = 0;

//        fansCount = userService.countFansCountByUserId(userId);
//        msgCount = userService.countMsgCountByUserId(userId);
//        collectCount = userService.countCollectCountByUserId(userId);
//        followCount = userService.countFollowCountByUserId(userId);
//        redBagCount = userService.countRedBagCountByUserId(userId);
//        couponCount = userService.countCouponCountByUserId(userId);

        try {

            Future<Long> fansCountFT = executor.submit(() -> userService.countFansCountByUserId(userId));
            Future<Long> msgCountFT = executor.submit(() -> userService.countMsgCountByUserId(userId));
            Future<Long> collectCountFT = executor.submit(() -> userService.countCollectCountByUserId(userId));
            Future<Long> followCountFT = executor.submit(() -> userService.countFollowCountByUserId(userId));
            Future<Long> redBagCountFT = executor.submit(() -> userService.countRedBagCountByUserId(userId));
            Future<Long> couponCountFT = executor.submit(() -> userService.countCouponCountByUserId(userId));

            //get阻塞
            fansCount = fansCountFT.get();
            msgCount = msgCountFT.get();
            collectCount = collectCountFT.get();
            followCount = followCountFT.get();
            redBagCount = redBagCountFT.get();
            couponCount = couponCountFT.get();

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            log.error('>>>>>>聚合查詢用戶聚合信息異常:' + e + '<<<<<<<<<');
        }
        UserBehaviorDataDTO userBehaviorData =
                UserBehaviorDataDTO.builder().fansCount(fansCount).msgCount(msgCount)
                        .collectCount(collectCount).followCount(followCount)
                        .redBagCount(redBagCount).couponCount(couponCount).build();
        return userBehaviorData;
    }


}

2.service業(yè)務方法

常規(guī)業(yè)務查詢方法,為了特效,以及看出實際的效果,我們每個方法做了延時

import com.boot.lea.mybot.mapper.UserMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class UserServiceImpl implements UserService {

    @Autowired
    UserMapper userMapper;

    @Override
    public long countFansCountByUserId(Long userId) {
        try {
            Thread.sleep(10000);
            System.out.println('獲取FansCount===睡眠:' + 10 + 's');
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println('UserService獲取FansCount的線程  ' + Thread.currentThread().getName());
        return 520;
    }

    @Override
    public long countMsgCountByUserId(Long userId) {
        System.out.println('UserService獲取MsgCount的線程  ' + Thread.currentThread().getName());
        try {
            Thread.sleep(10000);
            System.out.println('獲取MsgCount===睡眠:' + 10 + 's');
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 618;
    }

    @Override
    public long countCollectCountByUserId(Long userId) {
        System.out.println('UserService獲取CollectCount的線程  ' + Thread.currentThread().getName());
        try {
            Thread.sleep(10000);
            System.out.println('獲取CollectCount==睡眠:' + 10 + 's');
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 6664;
    }

    @Override
    public long countFollowCountByUserId(Long userId) {
        System.out.println('UserService獲取FollowCount的線程  ' + Thread.currentThread().getName());
        try {
            Thread.sleep(10000);
            System.out.println('獲取FollowCount===睡眠:' + 10's');
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return userMapper.countFollowCountByUserId(userId);
    }

    @Override
    public long countRedBagCountByUserId(Long userId) {
        System.out.println('UserService獲取RedBagCount的線程  ' + Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(4);
            System.out.println('獲取RedBagCount===睡眠:' + 4 + 's');
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 99;
    }

    @Override
    public long countCouponCountByUserId(Long userId) {
        System.out.println('UserService獲取CouponCount的線程  ' + Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(8);
            System.out.println('獲取CouponCount===睡眠:' + 8's');
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 66;
    }
}

3.controller調(diào)用

/**
 * @author LiJing
 * @ClassName: UserController
 * @Description: 用戶控制器
 * @date 2019/7/29 15:16
 */

@RestController
@RequestMapping('user/')
public class UserController {


    @Autowired
    private UserService userService;


    @Autowired
    private MyFutureTask myFutureTask;


    @GetMapping('/index')
    @ResponseBody
    public String index() {
        return '啟動用戶模塊成功~~~~~~~~';
    }

    //http://localhost:8080/api/user/get/data?userId=4

    @GetMapping('/get/data')
    @ResponseBody
    public UserBehaviorDataDTO getUserData(Long userId) {
        System.out.println('UserController的線程:' + Thread.currentThread());
        long begin = System.currentTimeMillis();
        UserBehaviorDataDTO userAggregatedResult = myFutureTask.getUserAggregatedResult(userId);
        long end = System.currentTimeMillis();
        System.out.println('===============總耗時:' + (end - begin) /1000.0000'秒');
        return userAggregatedResult;
    }


}

我們啟動項目:開啟調(diào)用 http://localhost:8080/api/user/get/data?userId=4

當我們線程池配置為:核心線程 8 最大線程 20 ?;顣r間30s 存儲隊列 10 的時候,我們測試的結果如下:

結果:我們看到每個server method的執(zhí)行線程都是從線程池中發(fā)起的線程名:User_Async_FutureTask-%d, 總耗時從累計的52秒縮短到10秒,即取決于最耗時的方法查詢時間.

那我們再將注釋代碼放開,進行串行查詢進行測試:

結果:我們使用串行的方式進行查詢,結果匯總將達到52秒,那太可怕了~~

總結

使用FutureTask的時候,就是將任務runner以caller的方式進行回調(diào),阻塞獲取,最后我們將結果匯總,即完成了開啟多線程異步調(diào)用我們的業(yè)務方法.

            Future<Long> fansCountFT = executor.submit(new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    return userService.countFansCountByUserId(userId);
                }
            });

這里使用的只是一個簡單的例子,具體項目可以定義具體的業(yè)務方法進行歸并處理,其實在JDK1.8以后,又有了ExecutorCompletionService,ForkJoinTask,CompletableFuture這些都可以實現(xiàn)上述的方法,我們后續(xù)會做一些這些方法使用的案例,期望大家的關注,文章中有不足之處,歡迎指正~

小甜點

所以:我們要用到親愛的Spring的異步編程,異步編程有很多種方式:比如常見的Future的sync,CompletableFuture.supplyAsync,@Async,哈哈 其實都離不開Thread.start()…,等等我說個笑話:

老爸有倆孩子:小紅和小明。老爸想喝酒了,他讓小紅去買酒,小紅出去了。然后老爸突然想吸煙了,于是老爸讓小明去買煙。在面對對象的思想中,一般會把買東西,然后買回來這件事作為一個方法,如果按照順序結構或者使用多線程同步的話,小明想去買煙就必須等小紅這個買東西的操作進行完。這樣無疑增加了時間的開銷(萬一老爸尿憋呢?)。異步就是為了解決這樣的問題。你可以分別給小紅小明下達指令,讓他們?nèi)ベI東西,然后你就可以自己做自己的事,等他們買回來的時候接收結果就可以了。

package com.boot.lea.mybot.futrue;

/**
 * @ClassName: TestFuture
 * @Description: 演示異步編程
 * @author LiJing
 * @date 2019/8/5 15:16
 */

@SuppressWarnings('all')
public class TestFuture {
    static ExecutorService executor = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws InterruptedException {
        //兩個線程的線程池
        //小紅買酒任務,這里的future2代表的是小紅未來發(fā)生的操作,返回小紅買東西這個操作的結果
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println('爸:小紅你去買瓶酒!');
            try {
                System.out.println('小紅出去買酒了,女孩子跑的比較慢,估計5s后才會回來...');
                Thread.sleep(5000);
                return '我買回來了!';
            } catch (InterruptedException e) {
                System.err.println('小紅路上遭遇了不測');
                return '來世再見!';
            }
        }, executor);

        //小明買煙任務,這里的future1代表的是小明未來買東西會發(fā)生的事,返回值是小明買東西的結果
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println('爸:小明你去買包煙!');
            try {
                System.out.println('小明出去買煙了,可能要3s后回來...');
                Thread.sleep(3000);

                throw new InterruptedException();
//                return '我買回來了!';
            } catch (InterruptedException e) {
                System.out.println('小明路上遭遇了不測!');
                return '這是我托人帶來的口信,我已經(jīng)不在了。';
            }
        }, executor);

        //獲取小紅買酒結果,從小紅的操作中獲取結果,把結果打印
        future2.thenAccept((e) -> {
            System.out.println('小紅說:' + e);
        });
        //獲取小明買煙的結果
        future1.thenAccept((e) -> {
            System.out.println('小明說:' + e);
        });

        System.out.println('爸:等啊等 西湖美景三月天嘞......');
        System.out.println('爸: 我覺得無聊甚至去了趟廁所。');
        Thread.currentThread().join(9 * 1000);
        System.out.println('爸:終于給老子買來了......huo 酒');
        //關閉線程池
        executor.shutdown();
    }
}

運行結果:

    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內(nèi)容,請點擊一鍵舉報。
    轉藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多

    日韩精品视频香蕉视频| 亚洲国产丝袜一区二区三区四| 亚洲深夜精品福利一区| 日本免费一本一二区三区| 亚洲中文字幕亲近伦片| 欧美亚洲综合另类色妞| 精品日韩中文字幕视频在线| 国产成人午夜在线视频| 欧美日韩亚洲国产av| 少妇熟女精品一区二区三区| 女厕偷窥一区二区三区在线| 欧美亚洲国产日韩一区二区| 蜜桃臀欧美日韩国产精品| 久久经典一区二区三区| 亚洲精品中文字幕在线视频| 亚洲国产性生活高潮免费视频| 国产成人亚洲综合色就色| 激情丁香激情五月婷婷| 国产午夜福利在线免费观看| 99精品国产一区二区青青| 欧美日韩亚洲精品内裤| 精品久久综合日本欧美| 日本女优一色一伦一区二区三区| 日韩欧美国产高清在线| 久久精品亚洲精品一区| 视频一区二区 国产精品| 午夜福利视频六七十路熟女| 黄片三级免费在线观看| 欧美精品激情视频一区| 日韩中文高清在线专区| 成人午夜激情免费在线| 欧美精品中文字幕亚洲| 黄片美女在线免费观看| 久草视频在线视频在线观看| 久久夜色精品国产高清不卡| 国产又大又猛又粗又长又爽| 97人摸人人澡人人人超碰| 欧美黑人巨大一区二区三区| 91欧美激情在线视频| 国产性情片一区二区三区| 欧美人妻少妇精品久久性色|