Spring Boot 自界说线程池完结异步开发相信看过陈某的文章都了解,可是在实践开发中需求在父子线程之间传递一些数据,比方用户信息,链路信息等等

比方用户登录信息运用ThreadLocal存放保证线程阻隔,代码如下:

/**
 * @author 大众号:码猿技能专栏
 * @description 用户上下文信息
 */
public class OauthContext {
    private static  final  ThreadLocal<LoginVal> loginValThreadLocal=new ThreadLocal<>();
    public static  LoginVal get(){
        return loginValThreadLocal.get();
    }
    public static void set(LoginVal loginVal){
        loginValThreadLocal.set(loginVal);
    }
    public static void clear(){
        loginValThreadLocal.remove();
    }
}

那么子线程想要获取这个LoginVal怎么做呢?

今日就来介绍几种高雅的办法完结Spring Boot 内部的父子线程的数据传递。

用这4招 优雅的实现Spring Boot 异步线程间数据传递

文章首发大众号:码猿技能专栏;作者:不才陈某

1. 手动设置

每履行一次异步线程都要分为两步:

  1. 获取父线程的LoginVal
  2. 将LoginVal设置到子线程,到达复用

代码如下:

public void handlerAsync() {
        //1. 获取父线程的loginVal
        LoginVal loginVal = OauthContext.get();
        log.info("父线程的值:{}",OauthContext.get());
        CompletableFuture.runAsync(()->{
            //2. 设置子线程的值,复用
           OauthContext.set(loginVal);
           log.info("子线程的值:{}",OauthContext.get());
        });
    }

尽管能够完结目的,可是每次开异步线程都需求手动设置,重复代码太多,看了头疼,你认为高雅吗?

2. 线程池设置TaskDecorator

TaskDecorator是什么?官方api的大致意思:这是一个履行回调办法的装修器,主要使用于传递上下文,或许供给使命的监控/计算信息。

知道有这么一个东西,怎么去运用?

TaskDecorator是一个接口,首先需求去完结它,代码如下:

/**
 * @author 大众号:码猿技能专栏
 * @description 上下文装修器
 */
public class ContextTaskDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        //获取父线程的loginVal
        LoginVal loginVal = OauthContext.get();
        return () -> {
            try {
                // 将主线程的请求信息,设置到子线程中
                OauthContext.set(loginVal);
                // 履行子线程,这一步不要忘了
                runnable.run();
            } finally {
                // 线程结束,清空这些信息,否则可能造成内存泄漏
                OauthContext.clear();
            }
        };
    }
}

重视大众号:码猿技能专栏,回复关键词:1111 获取阿里内部Java性能调优手册!

这儿我仅仅设置了LoginVal,实践开发中其他的共享数据,比方SecurityContextRequestAttributes….

TaskDecorator需求结合线程池运用,实践开发中异步线程主张运用线程池,只需求在对应的线程池配置一下,代码如下:

@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
        poolTaskExecutor.setCorePoolSize(xx);
        poolTaskExecutor.setMaxPoolSize(xx);
        // 设置线程活跃时间(秒)
        poolTaskExecutor.setKeepAliveSeconds(xx);
        // 设置行列容量
        poolTaskExecutor.setQueueCapacity(xx);
        //设置TaskDecorator,用于处理父子线程间的数据复用
        poolTaskExecutor.setTaskDecorator(new ContextTaskDecorator());
        poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等候一切使命结束后再封闭线程池
        poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        return poolTaskExecutor;
    }

此刻事务代码就不需求去设置子线程的值,直接运用即可,代码如下:

public void handlerAsync() {
        log.info("父线程的用户信息:{}", OauthContext.get());
        //履行异步使命,需求指定的线程池
        CompletableFuture.runAsync(()-> log.info("子线程的用户信息:{}", OauthContext.get()),taskExecutor);
    }

来看一下成果,如下图:

用这4招 优雅的实现Spring Boot 异步线程间数据传递

这儿运用的是CompletableFuture履行异步使命,运用@Async这个注解同样是可行的。

留意:不管运用何种办法,都需求指定线程池

3. InheritableThreadLocal

这种计划不主张运用,InheritableThreadLocal尽管能够完结父子线程间的复用,可是在线程池中运用会存在复用的问题,详细的能够看陈某之前的文章:微服务中运用阿里开源的TTL,高雅的完结身份信息的线程间复用

这种计划运用也是十分简略,直接用InheritableThreadLocal替换ThreadLocal即可,代码如下:

/**
 * @author 大众号:码猿技能专栏
 * @description 用户上下文信息
 */
public class OauthContext {
    private static  final  InheritableThreadLocal<LoginVal> loginValThreadLocal=new InheritableThreadLocal<>();
    public static  LoginVal get(){
        return loginValThreadLocal.get();
    }
    public static void set(LoginVal loginVal){
        loginValThreadLocal.set(loginVal);
    }
    public static void clear(){
        loginValThreadLocal.remove();
    }
}

4. TransmittableThreadLocal

TransmittableThreadLocal是阿里开源的东西,弥补了InheritableThreadLocal的缺陷,在运用线程池等会池化复用线程的履行组件情况下,供给ThreadLocal值的传递功能,处理异步履行时上下文传递的问题。

运用起来也是十分简略,增加依赖如下:

<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>transmittable-thread-local</artifactId>
	<version>2.14.2</version>
</dependency>

OauthContext改造代码如下:

/**
 * @author 大众号:码猿技能专栏
 * @description 用户上下文信息
 */
public class OauthContext {
    private static  final TransmittableThreadLocal<LoginVal> loginValThreadLocal=new TransmittableThreadLocal<>();
    public static  LoginVal get(){
        return loginValThreadLocal.get();
    }
    public static void set(LoginVal loginVal){
        loginValThreadLocal.set(loginVal);
    }
    public static void clear(){
        loginValThreadLocal.remove();
    }
}

关于TransmittableThreadLocal想深化了解其原理能够看陈某之前的文章:微服务中运用阿里开源的TTL,高雅的完结身份信息的线程间复用,使用还是十分广泛的

TransmittableThreadLocal原理

从界说来看,TransimittableThreadLocal继承于InheritableThreadLocal,并完结TtlCopier接口,它里边只有一个copy办法。所以主要是对InheritableThreadLocal的扩展。

public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T>

TransimittableThreadLocal中增加holder特点。这个特点的效果便是被符号为具有线程传递资格的目标都会被增加到这个目标中。

要符号一个类,比较简略想到的办法,便是给这个类新增一个Type字段,还有一个办法便是将具有这种类型的的目标都增加到一个静态大局调集中。之后运用时,这个调集里的一切值都具有这个符号。

// 1. holder本身是一个InheritableThreadLocal目标
// 2. 这个holder目标的value是WeakHashMap<TransmittableThreadLocal<Object>, ?>
//   2.1 WeekHashMap的value总是null,且不可能被运用。
//    2.2 WeekHasshMap支撑value=null
private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
  @Override
  protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
    return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
  }
  /**
   * 重写了childValue办法,完结上直接将父线程的特点作为子线程的本地变量目标。
   */
  @Override
  protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
    return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
  }
};

使用代码是经过TtlExecutors东西类对线程池目标进行包装。东西类仅仅简略的判别,输入的线程池是否现已被包装过、非空校验等,然后回来包装类ExecutorServiceTtlWrapper。根据不同的线程池类型,有不同和的包装类。

@Nullable
public static ExecutorService getTtlExecutorService(@Nullable ExecutorService executorService) {
  if (TtlAgent.isTtlAgentLoaded() || executorService == null || executorService instanceof TtlEnhanced) {
    return executorService;
  }
  return new ExecutorServiceTtlWrapper(executorService);
}

进入包装类ExecutorServiceTtlWrapper。能够留意到不论是经过ExecutorServiceTtlWrapper#submit办法或许是ExecutorTtlWrapper#execute办法,都会将线程目标包装成TtlCallable或许TtlRunnable,用于在真实履行run办法前做一些事务逻辑。

/**
 * 在ExecutorServiceTtlWrapper完结submit办法
 */
@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {
  return executorService.submit(TtlCallable.get(task));
}
/**
 * 在ExecutorTtlWrapper完结execute办法
 */
@Override
public void execute(@NonNull Runnable command) {
  executor.execute(TtlRunnable.get(command));
}

所以,重点的核心逻辑应该是在TtlCallable#call()或许TtlRunnable#run()中。以下以TtlCallable为例,TtlRunnable同理类似。在剖析call()办法之前,先看一个类Transmitter

public static class Transmitter {
  /**
    * 捕获当时线程中的是一切TransimittableThreadLocal和注册ThreadLocal的值。
    */
  @NonNull
  public static Object capture() {
    return new Snapshot(captureTtlValues(), captureThreadLocalValues());
  }
    /**
    * 捕获TransimittableThreadLocal的值,将holder中的一切值都增加到HashMap后回来。
    */
  private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
    HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = 
      new HashMap<TransmittableThreadLocal<Object>, Object>();
    for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
      ttl2Value.put(threadLocal, threadLocal.copyValue());
    }
    return ttl2Value;
  }
  /**
    * 捕获注册的ThreadLocal的值,也便是本来线程中的ThreadLocal,能够注册到TTL中,在
    * 进行线程池本地变量传递时也会被传递。
    */
  private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = 
      new HashMap<ThreadLocal<Object>, Object>();
    for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){
      final ThreadLocal<Object> threadLocal = entry.getKey();
      final TtlCopier<Object> copier = entry.getValue();
      threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
    }
    return threadLocal2Value;
  }
  /**
    * 将捕获到的本地变量进行替换子线程的本地变量,而且回来子线程现有的本地变量副本backup。
    * 用于在履行run/call办法之后,将本地变量副本康复。
    */
  @NonNull
  public static Object replay(@NonNull Object captured) {
    final Snapshot capturedSnapshot = (Snapshot) captured;
    return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), 
                        replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
  }
  /**
    * 替换TransmittableThreadLocal
    */
  @NonNull
  private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {
    // 创立副本backup
    HashMap<TransmittableThreadLocal<Object>, Object> backup = 
      new HashMap<TransmittableThreadLocal<Object>, Object>();
    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
      TransmittableThreadLocal<Object> threadLocal = iterator.next();
      // 对当时线程的本地变量进行副本复制
      backup.put(threadLocal, threadLocal.get());
      // 若出现调用线程中不存在某个线程变量,而线程池中线程有,则删除线程池中对应的本地变量
      if (!captured.containsKey(threadLocal)) {
        iterator.remove();
        threadLocal.superRemove();
      }
    }
    // 将捕获的TTL值打入线程池获取到的线程TTL中。
    setTtlValuesTo(captured);
    // 是一个扩展点,调用TTL的beforeExecute办法。默许完结为空
    doExecuteCallback(true);
    return backup;
  }
  private static HashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> captured) {
    final HashMap<ThreadLocal<Object>, Object> backup = 
      new HashMap<ThreadLocal<Object>, Object>();
    for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) {
      final ThreadLocal<Object> threadLocal = entry.getKey();
      backup.put(threadLocal, threadLocal.get());
      final Object value = entry.getValue();
      if (value == threadLocalClearMark) threadLocal.remove();
      else threadLocal.set(value);
    }
    return backup;
  }
  /**
    * 铲除单线线程的一切TTL和TL,并回来铲除之气的backup
    */
  @NonNull
  public static Object clear() {
    final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = 
      new HashMap<TransmittableThreadLocal<Object>, Object>();
    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = 
      new HashMap<ThreadLocal<Object>, Object>();
    for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){
      final ThreadLocal<Object> threadLocal = entry.getKey();
      threadLocal2Value.put(threadLocal, threadLocalClearMark);
    }
    return replay(new Snapshot(ttl2Value, threadLocal2Value));
  }
  /**
    * 还原
    */
  public static void restore(@NonNull Object backup) {
    final Snapshot backupSnapshot = (Snapshot) backup;
    restoreTtlValues(backupSnapshot.ttl2Value);
    restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
  }
  private static void restoreTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {
    // 扩展点,调用TTL的afterExecute
    doExecuteCallback(false);
    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
      TransmittableThreadLocal<Object> threadLocal = iterator.next();
      if (!backup.containsKey(threadLocal)) {
        iterator.remove();
        threadLocal.superRemove();
      }
    }
    // 将本地变量康复成备份版别
    setTtlValuesTo(backup);
  }
  private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
    for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {
      TransmittableThreadLocal<Object> threadLocal = entry.getKey();
      threadLocal.set(entry.getValue());
    }
  }
  private static void restoreThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> backup) {
    for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
      final ThreadLocal<Object> threadLocal = entry.getKey();
      threadLocal.set(entry.getValue());
    }
  }
  /**
   * 快照类,保存TTL和TL
   */
  private static class Snapshot {
    final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value;
    private Snapshot(HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value,
                     HashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
      this.ttl2Value = ttl2Value;
      this.threadLocal2Value = threadLocal2Value;
    }
  }

进入TtlCallable#call()办法。

@Override
public V call() throws Exception {
  Object captured = capturedRef.get();
  if (captured == null || releaseTtlValueReferenceAfterCall && 
      !capturedRef.compareAndSet(captured, null)) {
    throw new IllegalStateException("TTL value reference is released after call!");
  }
  // 调用replay办法将捕获到的当时线程的本地变量,传递给线程池线程的本地变量,
  // 而且获取到线程池线程掩盖之前的本地变量副本。
  Object backup = replay(captured);
  try {
    // 线程办法调用
    return callable.call();
  } finally {
    // 运用副本进行康复。
    restore(backup);
  }
}

到这基本上线程池办法传递本地变量的核心代码现已大概看完了。总的来说在创立TtlCallable目标是,调用capture()办法捕获调用方的本地线程变量,在call()履行时,将捕获到的线程变量,替换到线程池所对应获取到的线程的本地变量中,而且在履行完结之后,将其本地变量康复到调用之前。

总结

上述列举了4种计划,陈某这儿推荐计划2和计划4,其间两种计划的缺陷十分显着,实践开发中也是采用的计划2或许计划4