帶你認識3個J.U.C組件擴展

華爲雲開發者聯盟 發佈 2024-04-29T08:05:16.430218+00:00

FutureTask是J.U.C下的,但不是AQS的子類。Java1.5開始提供了Callable和Future,通過它們可以在任務執行完畢之後,得到任務執行的結果。

本文分享自華為雲社區《【高並發】J.U.C組件擴展-雲社區-華為雲》,作者: 冰 河。

1.FutureTask

FutureTask是J.U.C(java.util.concurrent)下的,但不是AQS(AbstractQueuedSynchronizer)的子類。其對線程結果的處理值得借鑑和在項目中使用。

Thread和Runnable執行完任務無法獲取執行結果。java1.5開始提供了Callable和Future,通過它們可以在任務執行完畢之後,得到任務執行的結果。

Callable與Runnable接口對比

Callable:泛型接口,提供一個call()方法,支持拋出異常,並且執行後有返回值

Runnable:接口,提供一個run()方法,不支持拋出異常,執行後無返回值

Future接口

對於具體的Callable和Runnable任務,可以進行取消,查詢任務是否被取消,查詢是否完成以及獲取結果等。

Future可以監視目標線程調用call()的情況,當調用Future的get()方法時,就可以獲得結果。此時,執行任務的線程可能不會直接完成,當前線程就開始阻塞,直到call()方法結束返回結果,當前線程才會繼續執行。總之,Future可以得到別的線程任務方法的返回值。

FutureTask類

實現的接口為RunnableFuture,而RunnableFuture接口繼承了Runnable和Future兩個接口,所以FutureTask類最終也是執行Callable類型的任務。如果FutureTask類的構造方法參數是Runnable的話,會轉換成Callable類型。

FutureTask類實現了兩個接口:Runnable和Future。所以,它即可以作為Runnable被線程執行,又可以作為Future得到Callable的返回值,這樣設計的好處如下:

假設有一個很費時的邏輯,需要計算並且返回這個值,同時,這個值又不是馬上需要,則可以使用Runnable和Future的組合,用另外一個線程去計算返回值,而當前線程在使用這個返回值之前,可以做其他的操作,等到需要這個返回值時,再通過Future得到。

Future示例代碼如下:

package IO.binghe.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class FutureExample {
    static class MyCallable implements Callable<String>{
        @Override
        public String call() throws exception {
            log.info("do something in callable");
            Thread.sleep(5000);
            return "Done";
        }
    }
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(new MyCallable());
        log.info("do something in main");
        Thread.sleep(1000);
        String result = future.get();
        log.info("result: {}", result);
        executorService.shutdown();
    }
}

FutureTask示例代碼如下:

package io.binghe.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
@Slf4j
public class FutureTaskExample {
    public static void main(String[] args) throws Exception{
        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("do something in callable");
                Thread.sleep(5000);
                return "Done";
            }
        });
        new Thread(futureTask).start();
        log.info("do something in main");
        Thread.sleep(1000);
        String result = futureTask.get();
        log.info("result: {}", result);
    }
}

2.fork/Join框架

位於J.U.C(java.util.concurrent)中,是Java7中提供的用於執行並行任務的框架,其可以將大任務分割成若干個小任務,最終匯總每個小任務的結果後得到最終結果。基本思想和Hadoop的MapReduce思想類似。

主要採用的是工作竊取算法(某個線程從其他隊列里竊取任務來執行),並行分治計算中的一種Work-stealing策略

為什麼需要使用工作竊取算法呢?

假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,於是把這些子任務分別放到不同的隊列里,並為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。但是有的線程會先把自己隊列里的任務幹完,而其他線程對應的隊列里還有任務等待處理。幹完活的線程與其等著,不如去幫其他線程幹活,於是它就去其他線程的隊列里竊取一個任務來執行。

而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

工作竊取算法的優點:

充分利用線程進行並行計算,並減少了線程間的競爭

工作竊取算法的缺點:

在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。並且該算法會消耗更多的系統資源,比如創建多個線程和多個雙端隊列。

對於Fork/Join框架而言,當一個任務正在等待它使用Join操作創建的子任務結束時,執行這個任務的工作線程查找其他未被執行的任務,並開始執行這些未被執行的任務,通過這種方式,線程充分利用它們的運行時間來提高應用程式的性能。為了實現這個目標,Fork/Join框架執行的任務有一些局限性。

Fork/Join框架局限性:

(1)任務只能使用Fork和Join操作來進行同步機制,如果使用了其他同步機制,則在同步操作時,工作線程就不能執行其他任務了。比如,在Fork/Join框架中,使任務進行了睡眠,那麼,在睡眠期間內,正在執行這個任務的工作線程將不會執行其他任務了。

(2)在Fork/Join框架中,所拆分的任務不應該去執行IO操作,比如:讀寫數據文件

(3)任務不能拋出檢查異常,必須通過必要的代碼來出來這些異常

Fork/Join框架的核心類

Fork/Join框架的核心是兩個類:ForkJoinPool和ForkJoinTask。ForkJoinPool負責實現工作竊取算法、管理工作線程、提供關於任務的狀態以及執行信息。ForkJoinTask主要提供在任務中執行Fork和Join操作的機制。

示例代碼如下:

package io.binghe.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    public static final int threshold = 2;
    private int start;
    private int end;
    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        //如果任務足夠小就計算任務
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任務大於閾值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            // 執行子任務
            leftTask.fork();
            rightTask.fork();

            // 等待任務執行結束合併其結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合併子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }
    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一個計算任務,計算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        //執行一個任務
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

3.BlockingQueue

阻塞隊列,是線程安全的。

被阻塞的情況如下:

(1)當隊列滿時,進行入隊列操作

(2)當隊列空時,進行出隊列操作

使用場景如下:

主要在生產者和消費者場景

BlockingQueue的方法

BlockingQueue 具有 4 組不同的方法用於插入、移除以及對隊列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:

四組不同的行為方式解釋:

  • 拋出異常

如果試圖的操作無法立即執行,拋一個異常。

  • 特殊值

如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。

  • 阻塞

如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行。

  • 超時

如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。

BlockingQueue的實現類如下:

  • ArrayBlockingQueue:有界的阻塞隊列(容量有限,必須在初始化的時候指定容量大小,容量大小指定後就不能再變化),內部實現是一個數組,以FIFO的方式存儲數據,最新插入的對象是尾部,最新移除的對象是頭部。
  • DelayQueue:阻塞的是內部元素,DelayQueue中的元素必須實現一個接口——Delayed(存在於J.U.C下)。Delayed接口繼承了Comparable接口,這是因為Delayed接口中的元素需要進行排序,一般情況下,都是按照Delayed接口中的元素過期時間的優先級進行排序。應用場景主要有:定時關閉連接、緩存對象、超時處理等。內部實現使用PriorityQueue和ReentrantLock。
  • LinkedBlockingQueue:大小配置是可選的,如果初始化時指定了大小,則是有邊界的;如果初始化時未指定大小,則是無邊界的(其實默認大小是Integer類型的最大值)。內部實現時一個鍊表,以FIFO的方式存儲數據,最新插入的對象是尾部,最新移除的對象是頭部。
  • PriorityBlockingQueue:帶優先級的阻塞隊列,無邊界,但是有排序規則,允許插入空對象(也就是null)。所有插入的對象必須實現Comparable接口,隊列優先級的排序規則就是按照對Comparable接口的實現來定義的。可以從PriorityBlockingQueue中獲得一個疊代器Iterator,但這個疊代器並不保證按照優先級的順序進行疊代。
  • SynchronousQueue:隊列內部僅允許容納一個元素,當一個線程插入一個元素後,就會被阻塞,除非這個元素被另一個線程消費。因此,也稱SynchronousQueue為同步隊列。SynchronousQueue是一個無界非緩存的隊列。準確的說,它不存儲元素,放入元素只有等待取走元素之後,才能再次放入元素,

點擊下方,第一時間了解華為雲新鮮技術~

華為雲博客_大數據博客_AI博客_雲計算博客_開發者中心-華為雲

關鍵字: