概述

因为分布式体系节点很多,排查过错日志要涉及到多个节点,假如在多个节点中没有唯一的恳求id来把各个节点的恳求日志串联起来,那么查询起来就会耗时耗力,因此Spring Sleuth出现了(Spring Sleuth基于Google dapper论文完成,详细了解能够检查此论文),Sleuth会在接收恳求的入口经过Filter生成唯一的标识TraceId,这个TraceId会一向跟随恳求路径传递到各个节点,只需有日志输出就会把TraceId的值打印出来,如下图(正常还会生成SpanId,为了便于了解没展现)

Spring-Cloud如何异步跨线程查询链路日志(附实例)

假设线上产生问题,要排查日志,那么依据这个TraceId,就能够快速查询到各个节点对应的恳求日志,但是唯一的惋惜是异步履行会丢失TraceId,因此这儿介绍异步跨线程下如何确保TraceId不丢失的问题

咱们在官方文档中找到了异步传递Traceid阐明,如下图

Spring-Cloud如何异步跨线程查询链路日志(附实例)

大致意思Sleuth默许支撑@Async传递TraceId,并且支撑spring.sleuth.async.enabled进行操控,同时提供了

  • LazyTraceExecutor
  • TraceableExecutorService
  • TraceableScheduledExecutorService

线程包装类,来支撑跨线程传递TraceId,其中TraceableScheduledExecutorService是ScheduledExecutorService类的完成,用于完成守时使命触发,个人觉得这种需求不是特别多,所以只介绍常用的一些装备,比如@Async装备、线程池装备、EventBus装备,具体检查后续章节

Asnc装备

默许Sleuth是支撑@Async注解异步传递TraceId的,但是假如自定义线程池,装备不对的状况或许就会导致失效,因为Spring在这快有个bug,详细了解请检查以下链接:

github.com/spring-proj…

github.com/spring-proj…

所以正确装备办法有如下3种

装备办法

方法1(引荐)

这儿用到了Sleuth的LazyTraceExecutor包装了线程池,这样能够确保trace目标传到下一个线程中

@Configuration
@EnableAsync
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SpringAsyncConfig extends AsyncConfigurerSupport {
    @Autowired
    private BeanFactory beanFactory;
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("AsyncExecutor-");
        executor.initialize();
        return new LazyTraceExecutor(this.beanFactory, executor);
    }
}

方法2

Sleuth初始化时会默许查找TaskExecutor作为Async的线程池,假如查找不到会获取默许的线程池

@EnableAsync
@Configuration
public class WebConfig {
    @Bean
    public TaskExecutor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("AsyncExecutor-");
        executor.initialize();
        return executor;
    }
}

方法3

假如默许不装备任何线程池,只在工程中加了@EnableAsync 注解,那么Sleuth会运用自带的线程池SimpleAsyncTaskExecutor,这个线程池每次调用都会创立新线程,假如调用量比较多,创立的线程也会非常多,咱们知道体系资源是有限的,假如线程数过多,会导致程序内存吃紧,从而导致OOM,所以不引荐运用这种方法

测验验证

测验代码

Async装备

@Configuration
@EnableAsync
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SpringAsyncConfig extends AsyncConfigurerSupport {
    @Autowired
    private BeanFactory beanFactory;
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("AsyncExecutor-");
        executor.initialize();
        return new LazyTraceExecutor(this.beanFactory, executor);
    }
}

Service

@Service
@Slf4j
public class TestService {
@Async
public void printAsyncLog() {
    log.info("async log.....");
}
}

Controller

@Slf4j
@RestController
@RequestMapping("/test/async")
public class AsyncTestWeb {
    @Autowired
    private TestService testService;
   @RequestMapping(value = "/print/log", method = RequestMethod.GET)
   public String printLog() {
    log.info("sync log.....1.....");
    testService.printAsyncLog();
    log.info("sync log.....2.....");
    return "success";
  }
}

恳求测验

履行恳求test/async/print/log,输出以下信息,能够看到TraceId一样,只要Spanid产生了变化,线程称号前缀AsyncExecutor与设置前缀相同

19:44:54.818, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO  [] com.example.elkdemo.web.AsyncTestWeb printLog:30 - sync log.....1.....
19:44:54.819, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO  [] com.example.elkdemo.web.AsyncTestWeb printLog:32 - sync log.....2..... 
19:44:54.819, [fae1c9449e12695f 2d51edbb45896bd8] [AsyncExecutor-2] INFO  [] c.e.elkdemo.service.TestService printAsyncLog:50 - async log..... 

线程池装备

线程池履行是经过TraceableExecutorService包装了ExecutorService,而且在初始化的时分需要注入进去BeanFactory目标,所以线程池作为全局变量和局部变量装备稍有不同,注意下面线程池设置只是示例代码,实际运用中能够依据需求自行修正

全局变量装备

结构函数初始化(引荐)

@Service
@Slf4j
public class TestService{
final BeanFactory beanFactory;
private TraceableExecutorService traceableExecutorService;
public TestService(BeanFactory beanFactory1) {
    this.beanFactory = beanFactory1;
    this.traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}
/**
 * 异步输出线程池日志
 */
public void printThreadPoolLog() {
    traceableExecutorService.execute(() -> log.info("async thread pool log....."));
}
}

单例初始化

@Service
@Slf4j
public class TestService {
@Autowired
private BeanFactory beanFactory;
volatile TraceableExecutorService traceableExecutorService;
public TraceableExecutorService getTraceableExecutorService() {
    if (traceableExecutorService == null) {
        synchronized (TraceableExecutorService.class) {
            if (traceableExecutorService == null) {
                traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
            }
        }
    }
    return traceableExecutorService;
}
/**
 * 异步输出线程池日志
 */
public void printThreadPoolLog() {
    TraceableExecutorService executorService = getTraceableExecutorService();
    executorService.execute(() -> log.info("async thread pool log....."));
}
}

经过InitializingBean的afterPropertiesSet进行初始化

@Service
@Slf4j
public class TestService implements InitializingBean {
@Autowired
private BeanFactory beanFactory;
private TraceableExecutorService traceableExecutorService;
@Override
public void afterPropertiesSet() {
    traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}
/**
 * 异步输出线程池日志
 */
public void printThreadPoolLog() {
    traceableExecutorService.execute(() -> log.info("async thread pool log....."));
}
}

局部变量装备

/**
 * 异步输出线程池日志
 */
public void printThreadPoolLog2() {
    TraceableExecutorService  executorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
    executorService.execute(() -> log.info("async thread pool log....."));
}

测验验证

这儿选用全局变量装备方法测验

测验代码

Controller

@Slf4j
@RestController
@RequestMapping("/test/async")
public class AsyncTestWeb {
    @Autowired
    private TestService testService;
    @RequestMapping(value = "/print/threadPool/log", method = RequestMethod.GET)
    public String printThreadPoolLog() {
        log.info("sync log.....1.....");
        testService.printThreadPoolLog();
        log.info("sync log.....2.....");
        return "success";
    }
}

Service

service选用结构函数方法进行初始化

@Service
@Slf4j
public class TestService{
final BeanFactory beanFactory;
private TraceableExecutorService traceableExecutorService;
public TestService(BeanFactory beanFactory1) {
    this.beanFactory = beanFactory1;
    this.traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}
/**
 * 异步输出线程池日志
 */
public void printThreadPoolLog() {
    traceableExecutorService.execute(() -> log.info("async thread pool log....."));
}
}

恳求测验

履行恳求/test/async/print/threadPool/log,输出以下信息,能够看到Traceid一样,只要Spanid产生了变化

19:35:13.799, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO  [] com.example.elkdemo.web.AsyncTestWeb printThreadPoolLog:38 - sync log.....1.....
19:35:13.801, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO  [] com.example.elkdemo.web.AsyncTestWeb printThreadPoolLog:40 - sync log.....2..... 
19:35:13.801, [884212fb58c658c5 70008b8d3a97602d] [pool-4-thread-2] INFO  [] c.e.elkdemo.service.TestService lambda$printThreadPoolLog$0:37 - async thread pool log..... 

EventBus装备

EventBus装备与线程池装备相似,把TraceableExecutorService注入到AsyncEventBus中即可,因TraceableExecutorService类引用了BeanFactory实例,所以比原生方法杂乱了一点,以下只介绍结构函数的初始化方法,其他初始化方法与线程池装备相似,所以这儿就不再举例阐明

结构函数进行初始化

@Component
@Slf4j
public class PushEventBus {
    private EventBus eventBus;
    public PushEventBus(BeanFactory beanFactory) {
        Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
        this.eventBus = new AsyncEventBus(traceableExecutorService);
    }
    public void register(Object obj) {
        eventBus.register(obj);
    }
    public void post(Object obj) {
        eventBus.post(obj);
    }
}

测验验证

测验代码

EventBus

@Component
@Slf4j
public class PushEventBus {
    private EventBus eventBus;
    public PushEventBus(BeanFactory beanFactory) {
        Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
        this.eventBus = new AsyncEventBus(traceableExecutorService);
    }
    public void register(Object obj) {
        eventBus.register(obj);
    }
    public void post(Object obj) {
        eventBus.post(obj);
    }
}

监听类

@Slf4j
public class EventListener {
  /**
   * 监听 Integer 类型的消息
   */
  @Subscribe
  public void listenInteger(Integer param) {
    log.info("EventListener#listenInteger->{}",param);
  }
  /**
   * 监听 String 类型的消息
   */
  @Subscribe
  public void listenString(String param) {
    log.info("EventListener#listenString->{}",param);
  }
}

controller

@Slf4j
@RestController
@RequestMapping("/test/async")
public class AsyncTestWeb {
@Autowired
private PushEventBus pushEventBus;
@RequestMapping(value = "/print/guava/log", method = RequestMethod.GET)
public String printGuavaLog() {
    pushEventBus.register(new EventListener());
    log.info("sync log.....1.....");
    pushEventBus.post("11");
    log.info("sync log.....2.....");
    return "success";
}
}

恳求测验

履行恳求/test/async/print/guava/log,输出以下信息,能够看到Traceid一样,只要Spanid产生了变化

19:27:44.234, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO  [] com.example.elkdemo.web.AsyncTestWeb printGuavaLog:48 - sync log.....1.....
19:27:44.236, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO  [] com.example.elkdemo.web.AsyncTestWeb printGuavaLog:50 - sync log.....2..... 
19:27:44.236, [50844e0d3909868c 702bf55c84873f17] [pool-3-thread-1] INFO  [] c.e.elkdemo.service.EventListener listenString:21 - EventListener#listenString->11 

作者公众号《架构生长攻略》欢迎重视!