布景
运用线程池 ThreadPoolExecutor 过程中你是否有以下痛点呢?
1.代码中创建了一个 ThreadPoolExecutor,但是不知道那几个中心参数设置多少比较适宜
2.凭经历设置参数值,上线后发现需求调整,改代码从头发布服务,非常费事
3.线程池相对开发人员来说是个黑盒,运转状况不能及时感知到,直到出现问题
如果你有以上痛点,动态可监控线程池结构(DynamicTp)或许能帮助到你。
如果看过 ThreadPoolExecutor 的源码,大约能够知道它对中心参数根本都有供给 set / get 办法以及一些扩展办法,能够在运转时动态修正、获取相应的值,这些办法有:
public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
public void allowCoreThreadTimeOut(boolean value);
public int getCorePoolSize();
public int getMaximumPoolSize();
public long getKeepAliveTime(TimeUnit unit);
public BlockingQueue<Runnable> getQueue();
public RejectedExecutionHandler getRejectedExecutionHandler();
public boolean allowsCoreThreadTimeOut();
protected void beforeExecute(Thread t, Runnable r);
protected void afterExecute(Runnable r, Throwable t);
现在大多数的互联网项目其实都会微服务化布置,有一套自己的服务办理体系,微服务组件中的分布式装备中心扮演的便是动态修正装备, 实时收效的人物。那么咱们是否能够结合装备中心来做运转时线程池参数的动态调整呢?答案是肯定的,并且装备中心相对都是高可用的, 运用它也不必过于忧虑装备推送出现问题这类事儿,并且也能削减研制动态线程池组件本身的难度和工作量。
综上,能够总结出以下的布景
-
广泛性:在 Java 开发中,想要进步体系功用,线程池已经是一个 90%以上的人都会选择运用的基础工具
-
不确认性:项目中或许会创建很多线程池,既有 IO 密集型的,也有 CPU 密集型的,但线程池的参数并不好确认;需求有套机制在运转过程中动态去调整参数
-
无感知性:线程池运转过程中的各项方针一般感知不到;需求有套监控报警机制在事前、事中就能让开发人员感知到线程池的运转状况,及时处理
-
高可用性:装备改变需求及时推送到客户端,需求有高可用的装备办理推送服务,装备中心是现在大多数互联网体系都会运用的组件,与之结合能够极大进步体系可用性
特性
根据以上布景分析,咱们对线程池 ThreadPoolExecutor 做一些扩展增强,首要完成以下方针
1.完成对运转中线程池参数的动态修正,实时收效
2.实时监控线程池的运转状态,触发设置的报警战略时报警,报警信息推送办公渠道
3.守时收集线程池方针数据,配合像 grafana 这种可视化监控渠道做大盘监控
经过多个版别的迭代,现在最新版别 v1.0.9 具有以下特性 ✅
-
代码零侵入:一切装备都放在装备中心,对事务代码零侵入
-
轻量简略:根据 SpringBoot 完成,引进 starter,接入只需简略 4 步就可完成,顺利 3 分钟搞定
-
高可扩展:结构中心功用都供给 SPI 接口供用户自定义个性化完成(装备中心、装备文件解析、告诉告警、监控数据收集、使命包装等等)
-
线上大规模运用:参阅美团线程池实践,美团内部已经有该理论老练的运用经历
-
多渠道告诉报警:供给多种报警维度(装备改变告诉、活性报警、容量阈值报警、回绝触发报警、使命执行或等待超时报警),已支撑企业微信、钉钉、飞书、邮件报警,一起供给 SPI 接口可自定义扩展完成
-
监控:守时收集线程池方针数据,支撑经过 MicroMeter、JsonLog 日志输出、Endpoint 三种方法,可经过 SPI 接口自定义扩展完成
-
使命增强:供给使命包装功用,完成 TaskWrapper 接口即可,如 MdcTaskWrapper、TtlTaskWrapper、SwTraceTaskWrapper,能够支撑线程池上下文信息传递
-
兼容性:JUC 一般线程池和 Spring 中的 ThreadPoolTaskExecutor 也能够被结构监控,@Bean 定义时加 @DynamicTp 注解即可
-
可靠性:结构供给的线程池完成 Spring 生命周期办法,能够在 Spring 容器关闭前尽或许多的处理行列中的使命
-
多形式:参阅 Tomcat 线程池供给了 IO 密集型场景运用的 EagerDtpExecutor 线程池
-
支撑多装备中心:根据主流装备中心完成线程池参数动态调整,实时收效,已支撑 Nacos、Apollo、Zookeeper、Consul、Etcd,一起也供给 SPI 接口可自定义扩展完成
-
中间件线程池办理:集成办理常用第三方组件的线程池,已集成Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix、Grpc、Motan、Okhttp3、Brpc、Tars 等组件的线程池办理(调参、监控报警)
架构设计
结构功用大体能够分为以下几个模块
1.装备改变监听模块
2.服务内部线程池办理模块
3.三方组件线程池办理模块
4.监控模块
5.告诉告警模块
代码结构

1.adapter 模块:首要是适配一些第三方组件的线程池办理,现在已经完成的有 SpringBoot 内置的三大 web 容器(TTomcat、Jetty、Undertow)、Dubbo、RocketMq、Hystrix、Grpc、Motan、Okhttp3、Brpc、Tars 的线程池办理,后续会接入其他常用组件的线程池办理。
2.common 模块:首要是一些各个模板都会用到的类,解耦依靠,复用代码,大家日常开发中或许也经常会这样做。
3.core 模块:该结构的中心代码都在这个模块里,包含动态调整参数,监控报警,以及串联整个项目流程都在此。
4.example 模块:供给一个简略运用示例,便利运用者参照
5.extension 模块:放一些扩展功用完成,比方根据 redis 的流控扩展、邮件发送扩展、skywalking 上下文传递扩展等
6.logging 模块:用于装备结构内部日志的输出,现在首要用于输出线程池监控方针数据到指定文件
7.starter模块:供给独立功用模块的依靠封装、主动装备等相关。
装备改变监听模块
1.监听特定装备中心的指定装备文件(已完成 Nacos、Apollo、Zookeeper、Consul、Etcd),可经过内部供给的SPI接口扩展其他完成
2.解析装备文件内容,内置完成 yml、properties、json 装备文件的解析,可经过内部供给的 SPI 接口扩展其他完成
3.告诉线程池办理模块完成参数的改写
服务内部线程池办理模块
1.服务发动时从装备中心拉取装备,生成线程池实例注册到内部线程池注册中心以及 Spring 容器中
2.承受装备监听模块的改写事情,完成线程池参数的改写
3.代码中经过依靠注入(推荐)或者 DtpRegistry.getDtpExecutor() 办法根据线程池名称来获取线程池实例
三方组件线程池办理
1.服务发动获取第三方中间件的线程池,被结构办理起来
2.承受参数改写、方针收集、告诉报警事情,进行相应的处理
监控模块
完成监控方针收集以及输出,默认供给以下三种方法,也可经过内部供给的 SPI 接口扩展其他完成
1.默认完成 JsonLog 输出到磁盘,能够自己收集解析日志,存储展现
2.MicroMeter收集,引进 MicroMeter 相关依靠,暴露相关端点,收集方针数据,结合 Grafana 做监控大盘
3.暴雷自定义 Endpoint 端点(dynamic-tp),可经过 http 方法实时拜访
告诉告警模块
对接办公渠道,完成告诉告警功用,已支撑钉钉、企微、飞书、邮件,可经过内部供给的 SPI 接口扩展其他完成,告诉告警类型如下
1.线程池首要参数改变告诉
2.堵塞行列容量到达设置的告警阈值
3.线程池活性到达设置的告警阈值
4.触发回绝战略告警,格局:A/B,A:该报警项前后两次报警区间累加数量,B:该报警项累计总数
5.使命执行超时告警,格局:A/B,A:该报警项前后两次报警区间累加数量,B:该报警项累计总数
6.使命等待超时告警,格局:A/B,A:该报警项前后两次报警区间累加数量,B:该报警项累计总数

接入运用
运用步骤
1.引进相应装备中心的依靠,具体见官网 maven 依靠
2.装备中心装备线程池实例,装备示例见官网(给出的是全装备项,不必的能够删去)
3.发动类加 @EnableDynamicTp 注解
4.运用 @Resource 或 @Autowired 进行依靠注入,或经过 DtpRegistry.getDtpExecutor(“name”) 获取
5.经过以上4步就能够运用了,是不是感觉很简略啊
服务发动看到有如下日志输出说明接入成功
| __ \ (_) |__ __|
| | | |_ _ _ __ __ _ _ __ ___ _ ___| |_ __
| | | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \
| |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
|_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/
__/ | | |
|___/ |_|
:: Dynamic Thread Pool ::
DynamicTp register dtpExecutor, source: beanPostProcessor, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-1, corePoolSize=6, maxPoolSize=8, keepAliveTime=50, queueType=VariableLinkedBlockingQueue, queueCapacity=200, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false)
DtpRegistry initialization end, remote dtpExecutors: [dtpExecutor1, dtpExecutor2], local dtpExecutors: [ioIntensiveExecutor], local commonExecutors: [commonExecutor]
项目地址
现在累计 2.5k star,欢迎了解运用。
官网:dynamictp.cn
gitee地址:gitee.com/dromara/dyn…
github地址:github.com/dromara/dyn…