以為很熟悉CountDownLatch的使用了,沒想到在生產環境翻車了

馬士兵教育cto 發佈 2022-08-13T21:25:45.190990+00:00

前言大家好,我是小郭,之前分享了CountDownLatch的使用,我們知道用來控制並發流程的同步工具,主要的作用是為了等待多個線程同時完成任務後,在進行主線程任務。

前言

大家好,我是小郭,之前分享了CountDownLatch的使用,我們知道用來控制並發流程的同步工具,主要的作用是為了等待多個線程同時完成任務後,在進行主線程任務。

萬萬沒想到,在生產環境中竟然翻車了,因為沒有考慮到一些場景,導致了CountDownLatch出現了問題,接下來來分享一下由於CountDownLatch導致的問題。

需求背景

先簡單介紹下業務場景,針對用戶批量下載的文件進行修改上傳

為了提高執行的速度,所以在採用線程池去執行 下載-修改-上傳 的操作,並在全部執行完之後統一提交保存文件地址到資料庫,於是加入了CountDownLatch來進行控制。

具體實現

根據服務本身情況,自定義一個線程池

public static ExecutorService testExtcutor() {
        return new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1));
    }
複製代碼

模擬執行

public static void main(String[] args) {
        // 下載文件總數
        List<Integer> resultList = new ArrayList<>(100);
        IntStream.range(0,100).forEach(resultList::add);
        // 下載文件分段
        List<List<Integer>> split = CollUtil.split(resultList, 10);

        ExecutorService executorService = BaseThreadPoolExector.testExtcutor();
        CountDownLatch countDownLatch = new CountDownLatch(100);
        for (List<Integer> list : split) {
            executorService.execute(() -> {
                list.forEach(i ->{
                    try {
                        // 模擬業務操作
                        Thread.sleep(500);
                        System.out.println("任務進入");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        System.out.println(e.getMessage());
                    } finally {
                        System.out.println(countDownLatch.getCount());
                        countDownLatch.countDown();
                    }
                });
            });
        }
        try {
            countDownLatch.await();
            System.out.println("countDownLatch.await()");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
複製代碼

一開始我個人感覺沒有什麼問題,反正finally都能夠做減一的操作,到最後調用await方法,進行主線程任務

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@300ffa5d rejected from java.util.concurrent.ThreadPoolExecutor@1f17ae12[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
  at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
  at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
  at Thread.executor.executorTestBlock.main(executorTestBlock.java:28)
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
任務進入
countDownLatch.countDown
複製代碼

由於任務數量較多,阻塞隊列中已經塞滿了,所以默認的拒絕策略,當隊列滿時,處理策略報錯異常,

要注意這個異常是線程池,自己拋出的,不是我們循環裡面列印出來的,

這也造成了,線上這個線程池被阻塞了,他永遠也調用不到await方法,

利用jstack,我們就能夠看到有問題

"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007ff6198b7000 nid=0xa903 waiting on condition [0x0000700001c64000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00007ff6198b6800 nid=0x5903 waiting on condition [0x0000700001b61000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

解決方案

  1. 調大阻塞隊列,但是問題來了,到底多少阻塞隊列才是大呢,如果太大了會不由又造成內存溢出等其他的問題
  2. 在第一個的基礎上,我們修改了拒絕策略,當觸發拒絕策略的時候,用調用者所在的線程來執行任務
  3. public static ThreadPoolExecutor queueExecutor(BlockingQueue<Runnable> workQueue){ return new ThreadPoolExecutor( size, size, 0L, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.CallerRunsPolicy()); } 複製代碼
  4. 你可能又會想說,會不會任務數量太多,導致調用者所在的線程執行不過來,任務提交的性能急劇下降
  5. 那我們就應該自定義拒絕策略,將這下排隊的消息記錄下來,採用補償機制的方式去執行
  6. 同時也要注意上面的那個異常是線程池拋出來的,我們自己也需要將線程池進行try catch,記錄問題數據,並且在finally中執行countDownLatch.countDown來避免,線程池的使用

總結

目前根據業務部門的反饋,業務實際中任務數不很特別多的情況,所以暫時先採用了第二種方式去解決這個線上問題

在這裡我們也可以看到,如果沒有正確的關閉countDownLatch,可能會導致一直等待,這也是我們需要注意的。

工具雖然好,但是依然要注意他帶來的問題,沒有正確的去處理好,引發的一系列連鎖反應。

關鍵字: