“我报名参加金石方案1期挑战——分割10万奖池,这是我的第1篇文章,点击检查活动概况”

在阅读本文之前,你能够经过 Creating Asynchronous Methods 指导来体验下创立异步办法的运用办法。

为什么要写这篇文章,本质上关于这些 Spring 已经封装好的才能,并不需求去重视它底层到底是怎样玩的,比方 @Async,你必定能够猜到关于打了这个注解的办法(或许类),在履行这个办法(或许类下一切办法)时,Spring 结构会将当时办法丢进到一个独自的线程池中去履行,以到达办法异步履行的目的。

本篇文章的原始诉求来自于需求对 @Async 描述的办法进行 trace 埋点,当时大多数基于线程上下文传递 traceContext 的办法显然关于跨线程问题是不能满足的,需求特别的处理;那么就需求对这些技术点进行剖析,以寻求切入点。

前言

@Async 是经过注解符号来敞开办法的异步履行的;关于注解的底层完结,除了 java 原生供给那种依赖编译期植入的之外,其他的根本都差不多,即运行时经过反射等办法阻拦到打了注解的类或许办法,然后履行时进行横切阻拦;别的这儿还有一个点便是办法异步履行,所以关于 @Async 的剖析,就必定绕不开两个根本的知识点,便是署理和线程池。
在了解到这些之后,我们来拆解下 @Async 的根本原理。

怎样敞开收效?

The @EnableAsync annotation switches on Spring’s ability to run @Async methods in a background thread pool.

经过 @EnableAsync 来敞开异步办法的才能。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
// ...`
}

@EnableAsync 注解 Import 了 AsyncConfigurationSelector,这个在 SpringBoot 中是非常常见的一种写法,这儿需求重视的是挑选了哪个自动配置类;adviceMode 默许是 false,这儿就以 ProxyAsyncConfiguration 为例:

@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
    switch (adviceMode) {
        case PROXY:
            return new String[] {ProxyAsyncConfiguration.class.getName()};
        case ASPECTJ:
            return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
        default:
            return null;
    }
}

AsyncAnnotationBeanPostProcessor

org.springframework.scheduling.annotation.ProxyAsyncConfiguration中最首要的便是创立 AsyncAnnotationBeanPostProcessor,从姓名看,AsyncAnnotationBeanPostProcessor 便是来处理 @Async 注解的;目的很清晰,便是创立对应 bean 的署理方针,以便于履行办法时能够进行 AOP 阻拦(详细细节能够看 org.springframework.aop.framework.AbstractAdvisingBeanPostProcessor#postProcessAfterInitialization这个办法)。

ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
   evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);

AnnotationAsyncExecutionInterceptor

这儿涉及到 AOP 的一些基础知识,能够查阅之前写的 /post/684490… 这篇文章

AOP 中最外层的是署理类,然后是织入器(advisor),再接着是切面(advice he PointCut);前面已经将创立署理方针的逻辑进行了介绍,所以接下来是织入器(advisor)和切面的创立。实际上织入器(advisor)的创立逻辑也是在 AsyncAnnotationBeanPostProcessor 中完结的。

@Override
public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);
	// 创立  advisor
    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
        advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }
    advisor.setBeanFactory(beanFactory);
    this.advisor = advisor;
}

在 AsyncAnnotationAdvisor 的构造函数中,会构建 Advice 和 Pointcut

public AsyncAnnotationAdvisor(
        @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
	// 省掉其他代码
	/// ...
    // 创立 advice
    this.advice = buildAdvice(executor, exceptionHandler);
    // 创立 pointcut
    this.pointcut = buildPointcut(asyncAnnotationTypes);
}

Advice 便是详细履行阻拦的逻辑,这儿的 advice 实际上 AnnotationAsyncExecutionInterceptor(why ? 因饰Advice 是 MethodInterceptor 的父类)。

protected Advice buildAdvice(
        @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
	// 这儿
    AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
    interceptor.configure(executor, exceptionHandler);
    return interceptor;
}

到这儿,关于 @EnableAsync 是怎样敞开创立异步办法的逻辑根本就介绍完了;本质上仍是 Spring AOP 的那套逻辑。

Tips

除了 adviceMode,一般状况下还会涉及到别的一个参数,即 proxyTargetClass;proxyTargetClass 在设置为 true 和 false 时,对应运用的署理机制大致如下:

  • true

    • 方针方针完结了接口 – 运用 CGLIB 署理机制
    • 方针方针没有接口(只要完结类) – 运用 CGLIB 署理机制
  • false

    • 方针方针完结了接口 – 运用 JDK 动态署理机制(署理一切完结了的接口)
    • 方针方针没有接口(只要完结类) – 运用 CGLIB 署理机制

线程池

上一末节中,对 @EnableAsync 收效机制和对应的 AOP 方针创立逻辑进行了介绍;实际上 AOP 阻拦到详细的办法之后的首要目的便是将履行逻辑丢到线程池中去履行。那这儿就会涉及到本节的主题,即线程池。本节需求搞清楚几个问题:

  • 什么时候创立的线程池?
  • 创立的线程池类型是啥?
  • 办法履行任务是怎样被提交的?

创立 AnnotationAsyncExecutionInterceptor 时初始化线程池

线程池的创立是在创立 AnnotationAsyncExecutionInterceptor 方针时完结,代码如下:

public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
    super(defaultExecutor);
}

在其父类 AsyncExecutionAspectSupport 中完结详细线程池创立

this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));

在 getDefaultExecutor 办法中, 会先从 Spring 容器找 TaskExecutor 类型的线程池 Bean,假如找不到,会扩大范围找 Executor 类型的线程池 Bean,假如找不到,则返回 null。

这儿是个推迟载入的操作,即只要当异步办法被调用时,才会触发 SingletonSupplier get 操作,从而触发 getBean 的逻辑,假如你在 debug 时出现没有正常走到断点的状况,能够重视下这个场景。

默许线程池 SimpleAsyncTaskExecutor

@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
    Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
    return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

从这段逻辑看,假如从 Spring 容器中没有找到对应的线程池 Bean,那么就创立 SimpleAsyncTaskExecutor 作为默许的线程池。

This class also customizes the Executor by defining a new bean. Here, the method is named taskExecutor, since this is the specific method name for which Spring searches. In our case, we want to limit the number of concurrent threads to two and limit the size of the queue to 500. There are many more things you can tune. If you do not define an Executor bean, Spring creates a SimpleAsyncTaskExecutor and uses that.

办法履行任务的提交

基于前面的剖析,办法履行任务的提交必定是发生在阻拦到 @Async 注解时,也便是 AnnotationAsyncExecutionInterceptor 中;经过剖析代码,在其父类 AsyncExecutionInterceptor
中,验证了剖析。下面是部分中心逻辑:

public Object invoke(final MethodInvocation invocation) throws Throwable {
    // 1、拿到 Method
    // 2、根据 Method 获取 executor
    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    // 3、创立办法履行任务 task
    Callable<Object> task = () -> {
    // ...
    };
    // 4、提交 task
    return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

determineAsyncExecutor 中说明了, executor 是和办法方针绑定的,即每个办法都有一个自己的 executor;异步办法在第一次履行的时候创立自己的 executor,然后缓存到内存中。在 doSubmit 中,会根据 returnType 的类型进行相应的处理

protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
    // CompletableFuture
    if (CompletableFuture.class.isAssignableFrom(returnType)) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return task.call();
            }
            catch (Throwable ex) {
                throw new CompletionException(ex);
            }
        }, executor);
    }
    // ListenableFuture
    else if (ListenableFuture.class.isAssignableFrom(returnType)) {
        return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
    }
    // Future
    else if (Future.class.isAssignableFrom(returnType)) {
        return executor.submit(task);
    }
    // void
    else {
        executor.submit(task);
        return null;
    }
}

怎样自定义线程池

SpringBoot 供给了 org.springframework.scheduling.annotation.AsyncConfigurer 接口让开发人员能够自定义线程池履行器;结构默许供给了一个空的完结类 AsyncConfigurerSupport,两个办法体内部都是空完结。这部分逻辑在 org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers表现:

/**
* Collect any {@link AsyncConfigurer} beans through autowiring.
*/
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
    return;
}
if (configurers.size() > 1) {
    throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
// for this
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}

AsyncConfigurer 在项目中只能有一个完结 Bean,假如超越一个,将会抛出 IllegalStateException 异常。

总结

本文经过对 @Async 注解的剖析,和你解释了 @Async 是怎样让办法异步履行的吗? 这个问题;从剖析过程中能够知道,关于绝大多数面向工程师运用的注解或许工具,本质上是离不开那些最最根本知识点的。当然,经过剖析代码,一方面是能够进一步识别作者的意图,更首要的是能够看到那些意料之外的“骚操作” coding。