动态线程池的简略完成思路

什么是动态线程池?

在线程池日常实践中咱们常常会遇到以下问题:

  • 代码中创立了一个线程池却不知道中心参数设置多少比较合适。
  • 参数设置好后,上线发现需求调整,改代码重启服务非常麻烦。
  • 线程池相对于开发人员来说是个黑箱,运行状况在出现问题 前很难被感知。

因此,动态可监控线程池一种针对以上痛点开发的线程池办理工具。主要可完成功用有:供给对 Spring 应用内线程池实例的全局管控、应用运行时动态变更线程池参数以及线程池数据采集和监控阈值报警。

现已完成的优秀开源动态线程池

hippo4j、dynamic-tp…..

完成思路

中心办理类

需求能完成对线程池的

  • 服务注册

  • 获取现已注册好的线程池

  • 以及对注册号线程池参数的改写。

对于每一个线程池,咱们运用一个线程池姓名作为标识每个线程池的仅有ID。

伪代码完成

public class DtpRegistry {
    /**
     * 储存线程池
     */
    private static final Map<String, Executor> EXECUTOR_MAP = new ConcurrentHashMap<>();
    /**
     * 获取线程池
     * @param executorName 线程池姓名
     */
    public static Executor getExecutor(String executorName) {
        return EXECUTOR_MAP.get(executorName);
    }
    /**
     * 线程池注册
     * @param executorName 线程池姓名
     */
    public static void registry(String executorName, Executor executor) {
        //注册
        EXECUTOR_MAP.put(executorName, executorWrapper);
    }
    /**
     * 改写线程池参数
     * @param executorName 线程池姓名
     * @param properties 线程池参数
     */
    public static void refresh(String executorName, ThreadPoolProperties properties) {
        Executor executor = EXECUTOR_MAP.get(executorName)
        //改写参数
        //.......
    }
}

如何创立线程池?

STEP 1. 咱们能够运用yml装备文件的方式装备一个线程池,将线程池实例的创立交由Spring容器

相关装备

public class DtpProperties {
    private List<ThreadPoolProperties> executors;
}
public class ThreadPoolProperties {
    /**
     * 标识每个线程池的仅有姓名
     */
    private String poolName;
    private String poolType = "common";
    /**
     * 是否为守护线程
     */
    private boolean isDaemon = false;
    /**
     * 以下都是中心参数
     */
    private int corePoolSize = 1;
    private int maximumPoolSize = 1;
    private long keepAliveTime;
    private TimeUnit timeUnit = TimeUnit.SECONDS;
    private String queueType = "arrayBlockingQueue";
    private int queueSize = 5;
    private String threadFactoryPrefix = "-td-";
    private String RejectedExecutionHandler;
}

yml example:

spring:
  dtp:
    executors:
      # 线程池1
      - poolName: dtpExecutor1
        corePoolSize: 5
        maximumPoolSize: 10
      # 线程池2
      - poolName: dtpExecutor2
        corePoolSize: 2
        maximumPoolSize: 15
STEP 2 根据装备信息添加线程池的BeanDefinition

要害类

@Slf4j
public class DtpImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {
    private Environment environment;
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        log.info("注册");
        //绑定资源
        DtpProperties dtpProperties = new DtpProperties();
        ResourceBundlerUtil.bind(environment, dtpProperties);
        List<ThreadPoolProperties> executors = dtpProperties.getExecutors();
        if (Objects.isNull(executors)) {
            log.info("未检测本地到装备文件线程池");
            return;
        }
        //注册beanDefinition
        executors.forEach((executorProp) -> {
            BeanUtil.registerIfAbsent(registry, executorProp);
        });
    }
    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }
}
/**
 *
 * 工具类
 *
 */
public class BeanUtil{
    public static void registerIfAbsent(BeanDefinitionRegistry registry, ThreadPoolProperties executorProp) {
        register(registry, executorProp.getPoolName(), executorProp);
    }
    public static void register(BeanDefinitionRegistry registry, String beanName, ThreadPoolProperties executorProp) {
        Class<? extends Executor> executorType = ExecutorType.getClazz(executorProp.getPoolType());
        Object[] args = assembleArgs(executorProp);
        register(registry, beanName, executorType, args);
    }
    public static void register(BeanDefinitionRegistry registry, String beanName, Class<?> clazz, Object[] args) {
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz);
        for (Object arg : args) {
            builder.addConstructorArgValue(arg);
        }
        registry.registerBeanDefinition(beanName, builder.getBeanDefinition());
    }
    private static Object[] assembleArgs(ThreadPoolProperties executorProp) {
        return new Object[]{
                executorProp.getCorePoolSize(),
                executorProp.getMaximumPoolSize(),
                executorProp.getKeepAliveTime(),
                executorProp.getTimeUnit(),
                QueueType.getInstance(executorProp.getQueueType(), executorProp.getQueueSize()),
                new NamedThreadFactory(
                        executorProp.getPoolName() + executorProp.getThreadFactoryPrefix(),
                        executorProp.isDaemon()
                ),
                //先默认不做设置
                RejectPolicy.ABORT.getValue()
        };
    }
}

下面解释一下这个类的效果,environment实例中储存着spring发动时解析的yml装备,所以咱们spring供给的Binder将装备绑定到咱们前面定义的DtpProperties类中,便利后续运用。接下来的比较简略,便是将线程池的BeanDefinition注册到IOC容器中,让spring去帮咱们实例化这个bean。

STEP 3. 将现已实例化的线程池注册到中心类 DtpRegistry 中

咱们注册了 beanDefinition 后,spring会帮咱们实例化出来, 在这之后咱们能够根据需求将这个bean进行进一步的处理,spring也供给了许多机制让咱们对bean的生命周期办理进行更多的扩展。对应到这儿咱们便是将实例化出来的线程池注册到中心类 DtpRegistry 中进行办理。

这儿咱们运用 BeanPostProcessor 进行处理。

@Slf4j
public class DtpBeanPostProcessor implements BeanPostProcessor {
    private DefaultListableBeanFactory beanFactory;
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DtpExecutor) {
            //直接归入办理
            DtpRegistry.registry(beanName, (DtpExecutor) bean);
        }
        return bean;
    }
}

这儿的逻辑很简略, 便是判断一下这个bean是不是线程池,是就统一办理起来。

STEP 4. 启用 BeanDefinitionRegistrar 和 BeanPostProcessor

在springboot程序中,只需加一个@MapperScan注解就能启用mybatis的功用,咱们能够学习其在spring中的启用方式,自定义一个注解:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(DtpImportSelector.class)
public @interface EnableDynamicThreadPool {
}

其中,比较要害的是@Import注解,spring会导入注解中的类DtpImportSelector

DtpImportSelector这个类完成了:

public class DtpImportSelector implements DeferredImportSelector {
    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        return new String[]{
                DtpImportBeanDefinitionRegistrar.class.getName(),
                DtpBeanPostProcessor.class.getName()
        };
    }
}

这样,只需咱们再发动类或许装备类上加上@EnableDynamicThreadPool这个注解,spring就会将DtpImportBeanDefinitionRegistrarDtpBeanPostProcessor这两个类参加spring容器办理,然后完成咱们的线程池的注册。

@SpringBootApplication
@EnableDynamicThreadPool
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

如何完成线程池装备的动态改写

首先明确一点,对于线程池的完成类,例如:ThreadPoolExecutor等,都有供给中心参数对应的 set 办法,让咱们完成参数修改。因此,在中心类DtpRegistry中的refresh办法,咱们能够这样写:

public class DtpRegistry {
    /**
     * 储存线程池
     */
    private static final Map<String, ThreadPoolExecutor> EXECUTOR_MAP = new ConcurrentHashMap<>();
    /**
     * 改写线程池参数
     * @param executorName 线程池姓名
     * @param properties 线程池参数
     */
    public static void refresh(String executorName, ThreadPoolProperties properties) {
        ThreadPoolExecutor executor = EXECUTOR_MAP.get(executorName)
        //设置参数
        executor.setCorePoolSize(...);
        executor.setMaximumPoolSize(...);
        ......
    }
}

而这些新参数怎样来呢?咱们能够引入Nacos、Apollo等装备中心,完成他们的监听器办法,在监听器办法里调用DtpRegistry的refresh办法改写即可。