開源輕量級動態可監控線程池DynamicTp介紹

架構成長指南 發佈 2022-09-29T01:08:59.345406+00:00

前言使用線程池 ThreadPoolExecutor 過程中你是否有以下痛點呢?

前言

使用線程池 ThreadPoolExecutor 過程中你是否有以下痛點呢?

  1. 代碼中創建了一個 ThreadPoolExecutor,但是不知道那幾個核心參數設置多少比較合適
  2. 憑經驗設置參數值,上線後發現需要調整,改代碼重新發布服務,非常麻煩
  3. 線程池相對開發人員來說是個黑盒,運行情況不能及時感知到,直到出現問題

如果你有以上痛點,這塊開源的動態可監控線程池框架(DynamicTp)或許能幫助到你

此項目由Dromara社區開源,基於配置中心的輕量級動態可監控線程池,項目地址:https://dynamictp.cn/

功能特性

  • 代碼零侵入:所有配置都放在配置中心,對業務代碼零侵入
  • 輕量簡單:基於 springboot 實現,引入 starter,接入只需簡單4步就可完成,順利3分鐘搞定
  • 高可擴展:框架核心功能都提供 SPI 接口供用戶自定義個性化實現(配置中心、配置文件解析、通知告警、監控數據採集、任務包裝等等)
  • 線上大規模應用:參考美團線程池實踐open in new window,美團內部已經有該理論成熟的應用經驗
  • 多平台通知報警:提供多種報警維度(配置變更通知、活性報警、容量閾值報警、拒絕觸發報警、任務執行或等待超時報警),已支持企業微信、釘釘、飛書報警,同時提供 SPI 接口可自定義擴展實現
  • 監控:定時採集線程池指標數據,支持通過 MicroMeter、jsonLog 日誌輸出、Endpoint 三種方式,可通過 SPI 接口自定義擴展實現
  • 任務增強:提供任務包裝功能,實現TaskWrapper接口即可,如 MdcTaskWrapper、TtlTaskWrapper、SwTraceTaskWrapper,可以支持線程池上下文信息傳遞
  • 兼容性:JUC 普通線程池和 Spring 中的 ThreadPoolTaskExecutor 也可以被框架監控,@Bean 定義時加 @DynamicTp 註解即可
  • 可靠性:框架提供的線程池實現 Spring 生命周期方法,可以在 Spring 容器關閉前儘可能多的處理隊列中的任務
  • 多模式:參考Tomcat線程池提供了 IO 密集型場景使用的 EagerDtpExecutor 線程池
  • 支持多配置中心:基於主流配置中心實現線程池參數動態調整,實時生效,已支持 Nacos、Apollo、Zookeeper、Consul、Etcd,同時也提供 SPI 接口可自定義擴展實現
  • 中間件線程池管理:集成管理常用第三方組件的線程池,已集成Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix等組件的線程池管理(調參、監控報警)

系統架構圖

接入指南

Maven 依賴

下面只介紹幾個常用場景接入pom,zk、consul這些請查看官網

  • apollo 應用用接入用此依賴
<dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId>
<version>1.0.8</version>
</dependency>
  • spring-cloud 場景下的 nacos 應用接入用此依賴
 <dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId>
<version>1.0.8</version>
</dependency>
  • 非 spring-cloud 場景下的 nacos 應用接入用此依賴
<dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId>
<version>1.0.8</version>
</dependency>

配置文件

線程池定義可以配置在文件中,然後在Spring應用中可以通過@Resource、@Autowire、或通過工具類來獲取實例

以下配置文件,除了維護公共屬性還定義了名稱為austin.im.notice、execute-xxl-thread-pool線程池,其中格式支持yml、properties 類型、json 類型、zk文件

spring:
  dynamic:
    tp:
      enabled: true
      enabledBanner: true           # 是否開啟banner列印,默認true
      enabledCollect: true          # 是否開啟監控指標採集,默認false
      collectorType: micrometer     # 監控數據採集器類型(JsonLog | MicroMeter),默認logging
      monitorInterval: 5            # 監控時間間隔(報警判斷、指標採集),默認5s
      apollo:                       # apollo配置,不配置默認拿apollo配置第一個namespace
        namespace: dynamic-tp-apollo-dtp.yml
      configType: yml
      platforms:
        - platform: wechat
          urlKey: 38aa7eff500-1287
          receivers: apollo
        - platform: ding
          urlKey: f80dad441fcd65bac48473d4a88dcd6a
          secret: SECb544445a6a34f0315d08b17de41
          receivers: 18888888888
      executors:
        - threadPoolName: austin.im.notice
          corePoolSize: 6
          maximumPoolSize: 8
          queueCapacity: 200
          queueType: VariableLinkedBlockingQueue   # 任務隊列,查看源碼QueueTypeEnum枚舉類
          rejectedHandlerType: CallerRunsPolicy    # 拒絕策略,查看RejectedTypeEnum枚舉類
          keepAliveTime: 50
          allowCoreThreadTimeOut: false
          threadNamePrefix: austin-           # 線程名前綴
        - threadPoolName: execute-xxl-thread-pool
          corePoolSize: 3
          maximumPoolSize: 3
          queueCapacity: 200
          queueType: VariableLinkedBlockingQueue   # 任務隊列,查看源碼QueueTypeEnum枚舉類
          rejectedHandlerType: CallerRunsPolicy    # 拒絕策略,查看RejectedTypeEnum枚舉類
          keepAliveTime: 50
          allowCoreThreadTimeOut: false
          threadNamePrefix: austin-           # 線程名前綴
          notifyItems: # 報警項,不配置自動會配置(變更通知、容量報警、活性報警、拒絕報警)
            - type: capacity               # 報警項類型,查看源碼 NotifyTypeEnum枚舉類
              enabled: true
              threshold: 80                # 報警閾值
              platforms: [ding,wechat]     # 可選配置,不配置默認拿上層platforms配置的所以平台
              interval: 120                # 報警間隔(單位:s)
            - type: change
              enabled: true
            - type: liveness
              enabled: true
              threshold: 80
            - type: reject
              enabled: true
              threshold: 1

代碼使用

線程池實例定義

如果不再配置文件中進行定義,也可以在代碼中直接定義

@Configuration
public class DtpConfig {  
  
  /**
   * 通過{@link DynamicTp} 註解定義普通juc線程池,會享受到該框架監控功能,註解名稱優先級高於方法名
   *
   * @return 線程池實例
   */
  @DynamicTp("commonExecutor")
  @Bean
  public ThreadPoolExecutor commonExecutor() {
      return (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
  }

  /**
   * 通過{@link ThreadPoolCreator} 快速創建一些簡單配置的動態線程池
   * tips: 建議直接在配置中心配置就行,不用@Bean聲明
   *
   * @return 線程池實例
   */
  @Bean
  public DtpExecutor dtpExecutor1() {
      return ThreadPoolCreator.createDynamicFast("dtpExecutor1");
  }

  /**
   * 通過{@link ThreadPoolBuilder} 設置詳細參數創建動態線程池(推薦方式),
   * ioIntensive,參考tomcat線程池設計,實現了處理io密集型任務的線程池,具體參數可以看代碼注釋
   *
   * tips: 建議直接在配置中心配置就行,不用@Bean聲明
   * @return 線程池實例
   */
  @Bean
  public DtpExecutor ioIntensiveExecutor() {
      return ThreadPoolBuilder.newBuilder()
              .threadPoolName("ioIntensiveExecutor")
              .corePoolSize(20)
              .maximumPoolSize(50)
              .queueCapacity(2048)
              .ioIntensive(true)
              .buildDynamic();
  }

  /**
   * tips: 建議直接在配置中心配置就行,不用@Bean聲明
   * @return 線程池實例
   */
  @Bean
  public ThreadPoolExecutor dtpExecutor2() {
      return ThreadPoolBuilder.newBuilder()
              .threadPoolName("dtpExecutor2")
              .corePoolSize(10)
              .maximumPoolSize(15)
              .keepAliveTime(50)
              .timeUnit(TimeUnit.MILLISECONDS)
              .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
              .waitForTasksToCompleteOnShutdown(true)
              .awaitTerminationSeconds(5)
              .buildDynamic();
  }
}

代碼調用

從DtpRegistry中根據線程池名稱獲取,或者通過依賴注入方式(推薦,更優雅)

  1. 依賴注入方式使用,優先推薦依賴注入方式,不能使用依賴注入的場景可以使用方式2
@Resource
private ThreadPoolExecutor dtpExecutor1;

public void exec() {
   dtpExecutor1.execute(() -> System.out.println("test"));
}
  1. 通過DtpRegistry註冊器獲取
public static void main(String[] args) {
   DtpExecutor dtpExecutor = DtpRegistry.getDtpExecutor("dtpExecutor1");
   dtpExecutor.execute(() -> System.out.println("test"));
}

通知報警

調參通知

運行報警

線程池活躍度告警

活躍度 = activeCount / maximumPoolSize

隊列容量告警

容量使用率 = queueSize / queueCapacity

拒絕策略告警

線程池線程數達到配置的最大線程數,且任務隊列已滿,再提交任務會觸發拒絕策略

任務隊列超時告警

重寫ThreadPoolExecutor的execute()方法和beforeExecute()方法,如果配置了執行超時或排隊超時值,則會進行報警

任務執行超時告警

重寫ThreadPoolExecutor的afterExecute()方法,根據當前時間和beforeExecute()中設置的startTime的差值即可算出任務的實際執行時間,然後判斷如果差值大於配置的runTimeout則累加排隊超時任務數量,則會進行告警

監控

支持接入prometheus+grafana做監控,效果如下

總結

DynamicTcp是一個功能實用,上手簡單動態線程池組件,很輕量對業務無侵入,目前我們業務系統已經開始介入,讀者朋友們趕緊使用起來吧

關鍵字: