前言

在应用进程中,链路信息的传递实际便是Span的传递,咱们之前是根据ThreadLocalSpan作为线程本地变量来传递的,这在同步的场景下是没有问题的,可是一旦涉及到异步场景例如异步调用下流或许异步查询数据库等,那么Span信息就丢了,原因便是ThreadLocal设置的本地变量无法跨线程传递,所以本文将根据TransmittableThreadLocal完成异步链路追寻以处理异步场景下Span信息无法传递的问题。

关于TransmittableThreadLocal,能够参阅图解Java线程间本地变量传递,本文将不再赘述TransmittableThreadLocal的原理。

transmittable-thread-local版本:2.11.4

github地址:honey-tracing

正文

1. 异步链路追寻改造设计与完成

其实要完成异步链路追寻很简单,回忆一下之前的完成中,Span会被包装为ThreadLocalScope,然后交由ThreadLocalScopeManager来进行传递,而ThreadLocalScopeManager传递ThreadLocalScope的方法便是将ThreadLocalScope经过ThreadLocal设置为线程的本地变量,那么问题就在这儿,ThreadLocal设置的线程本地变量无法跨线程传递,所以要处理这个问题,便是将能够跨线程传递本地变量的TransmittableThreadLocal替换掉ThreadLocal,问题就完美处理了。

首要引进TransmittableThreadLocal的依靠,如下所示。

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.11.4</version>
    <scope>provided</scope>
</dependency>

其次供给异步链路追寻的开关装备,修改后的特点装备类HoneyTracingProperties如下所示。

/**
 * 分布式链路追寻装备特点类。
 */
@ConfigurationProperties("honey.tracing")
public class HoneyTracingProperties {
    private boolean enabled;
    private HttpUrlProperties httpUrl = new HttpUrlProperties();
    private AsyncProperties async = new AsyncProperties();
    public boolean isEnabled() {
        return enabled;
    }
    public void setEnabled(boolean enabled) {
        this.enabled = enabled;
    }
    public HttpUrlProperties getHttpUrl() {
        return httpUrl;
    }
    public void setHttpUrl(HttpUrlProperties httpUrl) {
        this.httpUrl = httpUrl;
    }
    public AsyncProperties getAsync() {
        return async;
    }
    public void setAsync(AsyncProperties async) {
        this.async = async;
    }
    public static class HttpUrlProperties {
        /**
         * 按照/url1,/url2这样装备。
         */
        private String urlPattern = "/*";
        /**
         * 按照/url1|/honey.*这样装备。
         */
        private String skipPattern = "";
        public String getUrlPattern() {
            return urlPattern;
        }
        public void setUrlPattern(String urlPattern) {
            this.urlPattern = urlPattern;
        }
        public String getSkipPattern() {
            return skipPattern;
        }
        public void setSkipPattern(String skipPattern) {
            this.skipPattern = skipPattern;
        }
    }
    public static class AsyncProperties {
        private boolean enabled;
        public boolean isEnabled() {
            return enabled;
        }
        public void setEnabled(boolean enabled) {
            this.enabled = enabled;
        }
    }
}

由于ThreadLocalScopeThreadLocalScopeManager的内部完成已经写死,所以咱们需求自己供给完成类来替换掉它们,咱们自己的完成类如下所示。

/**
 * 依靠{@link TransmittableThreadLocal}的{@link Scope}完成。
 */
public class HoneyTtlScope implements Scope {
    private final HoneyTtlScopeManager scopeManager;
    private final Span wrapped;
    private final HoneyTtlScope toRestore;
    HoneyTtlScope(HoneyTtlScopeManager scopeManager, Span wrapped) {
        this.scopeManager = scopeManager;
        this.wrapped = wrapped;
        this.toRestore = scopeManager.tlsScope.get();
        scopeManager.tlsScope.set(this);
    }
    @Override
    public void close() {
        if (scopeManager.tlsScope.get() != this) {
            return;
        }
        scopeManager.tlsScope.set(toRestore);
    }
    Span span() {
        return wrapped;
    }
}
/**
 * 根据{@link TransmittableThreadLocal}的{@link ScopeManager}完成。
 */
public class HoneyTtlScopeManager implements ScopeManager {
    final InheritableThreadLocal<HoneyTtlScope> tlsScope = new TransmittableThreadLocal<>();
    @Override
    public Scope activate(Span span) {
        return new HoneyTtlScope(this, span);
    }
    @Override
    public Span activeSpan() {
        HoneyTtlScope scope = tlsScope.get();
        return scope == null ? null : scope.span();
    }
}

然后咱们需求将HoneyTtlScopeManager替换掉ThreadLocalScopeManager,这一步在创立Tracer的时分完成,所以修改HoneyTracingConfig如下所示。

/**
 * 分布式链路追寻装备类。
 */
@Configuration
@EnableConfigurationProperties({HoneyTracingProperties.class})
@ConditionalOnProperty(prefix = "honey.tracing", name = "enabled", havingValue = "true", matchIfMissing = true)
public class HoneyTracingConfig {
    @Bean
    @ConditionalOnMissingBean(Sampler.class)
    public Sampler sampler() {
        return new ProbabilisticSampler(DEFAULT_SAMPLE_RATE);
    }
    @Bean
    @ConditionalOnMissingBean(Reporter.class)
    public Reporter reporter() {
        return new HoneySpanReporter();
    }
    @Bean
    @ConditionalOnProperty(prefix = "honey.tracing.async", name = "enabled", havingValue = "true")
    public ScopeManager honeyTtlScopeManager() {
        return new HoneyTtlScopeManager();
    }
    @Bean
    @ConditionalOnProperty(prefix = "honey.tracing.async", name = "enabled", havingValue = "false", matchIfMissing = true)
    public ScopeManager threadLocalScopeManager() {
        return new ThreadLocalScopeManager();
    }
    @Bean
    @ConditionalOnMissingBean(Tracer.class)
    public Tracer tracer(Sampler sampler, Reporter reporter, ScopeManager scopeManager) {
        return new JaegerTracer.Builder(HONEY_TRACER_NAME)
                .withTraceId128Bit()
                .withZipkinSharedRpcSpan()
                .withSampler(sampler)
                .withReporter(reporter)
                .withScopeManager(scopeManager)
                .build();
    }
}

那么至此,异步链路追寻改造完成。

二. 异步链路追寻运用说明

咱们对之前搭建好的example-service-1进行改造,来演示异步链路追寻的运用。

在开端演示前,需求先引进TransmittableThreadLocal的依靠,如下所示。

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.11.4</version>
</dependency>

然后经过装备打开异步链路追寻,如下所示。

honey:
  tracing:
    async:
      enabled: true

1. new Thread

首要演示直接经过new一个Thread的方法来异步履行使命,在RestTemplateController新增如下代码。

@RestController
public class RestTemplateController {
    @Autowired
    private RestTemplate restTemplate;
    ......
    @GetMapping("/async/thread/send")
    public void syncSendByThread(String url) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                restTemplate.getForEntity(url, Void.class);
                countDownLatch.countDown();
            }
        }).start();
        countDownLatch.await();
    }
    ......
}

一起把example-service-1example-service-2发动起来,并调用如下接口。

http://localhost:8080/async/thread/send?url=http://localhost:8081/receive

example-service-1中打印如下链路日志。

{
    "traceId": "96effadb892bd423c19d32ed70b9a406",
    "spanId": "c19d32ed70b9a406",
    "parentSpanId": "0000000000000000",
    "timestamp": "1708599959132",
    "duration": "5",
    "httpCode": "200",
    "host": "http://localhost:8080",
    "requestStacks": [
        {
            "subSpanId": "43731381f988131a",
            "subHttpCode": "200",
            "subTimestamp": "1708599959133",
            "subDuration": "3",
            "subHost": "localhost:8081"
        }
    ]
}

example-service-2中打印如下链路日志。

{
    "traceId": "96effadb892bd423c19d32ed70b9a406",
    "spanId": "43731381f988131a",
    "parentSpanId": "c19d32ed70b9a406",
    "timestamp": "1708599959134",
    "duration": "0",
    "httpCode": "200",
    "host": "http://localhost:8081",
    "requestStacks": []
}

可见异步链路是正常作业的。

2. 直接运用ThreadPoolExecutor

通常咱们做异步操作,是不会直接new一个Thread来履行异步使命的,而是会把使命丢到线程池,让线程池来履行使命,假设咱们的使命是一个Runnable,那么应该像下面这样来运用。

@RestController
public class RestTemplateController {
    @Autowired
    private RestTemplate restTemplate;
    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;
    ......
    @GetMapping("/async/thread-pool/send")
    public void asyncSendByThreadPool(String url) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        threadPoolExecutor.execute(TtlRunnable.get(new Runnable() {
            @Override
            public void run() {
                restTemplate.getForEntity(url, Void.class);
                countDownLatch.countDown();
            }
        }));
        countDownLatch.await();
    }
    ......
}

其间运用的线程池由ThreadPoolExecutorConfig注册到了Spring容器中,如下所示。

@Configuration
public class ThreadPoolExecutorConfig {
    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        return new ThreadPoolExecutor(
                1,
                1,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable runnable) {
                        return new Thread(runnable, "Pool-Thread-1");
                    }
                });
    }
}

所以直接运用ThreadPoolExecutor来履行Runnable时,需求先将Runnable转换为TtlRunnable,然后再丢到ThreadPoolExecutor中履行,同理,如果是直接运用ThreadPoolExecutor来履行Callable,那么需求先将Callable转换为TtlCallable

一起把example-service-1example-service-2发动起来,并调用如下接口。

http://localhost:8080/async/thread-pool/send?url=http://localhost:8081/receive

example-service-1中打印如下链路日志。

{
    "traceId": "2a198cc0a513f2ca455c98d9a320ebc7",
    "spanId": "455c98d9a320ebc7",
    "parentSpanId": "0000000000000000",
    "timestamp": "1708601760228",
    "duration": "10",
    "httpCode": "200",
    "host": "http://localhost:8080",
    "requestStacks": [
        {
            "subSpanId": "7332de988308e305",
            "subHttpCode": "200",
            "subTimestamp": "1708601760232",
            "subDuration": "5",
            "subHost": "localhost:8081"
        }
    ]
}

example-service-2中打印如下链路日志。

{
    "traceId": "2a198cc0a513f2ca455c98d9a320ebc7",
    "spanId": "7332de988308e305",
    "parentSpanId": "455c98d9a320ebc7",
    "timestamp": "1708601760235",
    "duration": "1",
    "httpCode": "200",
    "host": "http://localhost:8081",
    "requestStacks": []
}

可见异步链路是正常作业的。

3. 将ThreadPoolExecutor包装为ExecutorServiceTtlWrapper

如果每次异步履行使命时,都要将Runnable包装为TtlRunnable才干完成异步链路追寻,这样的代码写起来实在是太繁琐了,此时既然不想包装Runnable,那么咱们能够挑选包装ThreadPoolExecutor,就像下面这样。

@Configuration
public class ThreadPoolExecutorConfig {
    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        return new ThreadPoolExecutor(
                1,
                1,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable runnable) {
                        return new Thread(runnable, "Pool-Thread-1");
                    }
                });
    }
    @Bean
    public ExecutorService wrappedThreadPool() {
        return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(
                1,
                1,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable runnable) {
                        return new Thread(runnable, "Pool-Thread-1");
                    }
                }));
    }
}

经过TtlExecutors能够将咱们供给的ThreadPoolExecutor包装为ExecutorServiceTtlWrapper,后续直接将Runnable丢给ExecutorServiceTtlWrapper,也能完成异步链路追寻,如下所示。

@RestController
public class RestTemplateController {
    @Autowired
    private RestTemplate restTemplate;
    @Autowired
    private ExecutorService wrappedThreadPool;
    ......
    @GetMapping("/async/wrapped-thread-pool/send")
    public void asyncSendByWrappedThreadPool(String url) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        wrappedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                restTemplate.getForEntity(url, Void.class);
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
    }
    ......
}

一起把example-service-1example-service-2发动起来,并调用如下接口。

http://localhost:8080/async/wrapped-thread-pool/send?url=http://localhost:8081/receive

example-service-1中打印如下链路日志。

{
    "traceId": "075b13d4cb10d633fc7eb71ca8d0079a",
    "spanId": "fc7eb71ca8d0079a",
    "parentSpanId": "0000000000000000",
    "timestamp": "1708602413533",
    "duration": "7",
    "httpCode": "200",
    "host": "http://localhost:8080",
    "requestStacks": [
        {
            "subSpanId": "eee3dae02dd9139e",
            "subHttpCode": "200",
            "subTimestamp": "1708602413535",
            "subDuration": "4",
            "subHost": "localhost:8081"
        }
    ]
}

example-service-2中打印如下链路日志。

{
    "traceId": "075b13d4cb10d633fc7eb71ca8d0079a",
    "spanId": "eee3dae02dd9139e",
    "parentSpanId": "fc7eb71ca8d0079a",
    "timestamp": "1708602413538",
    "duration": "0",
    "httpCode": "200",
    "host": "http://localhost:8081",
    "requestStacks": []
}

可见异步链路是正常作业的。

4. 将ScheduledThreadPoolExecutor包装为ScheduledExecutorServiceTtlWrapper

不单能够把ThreadPoolExecutor包装为ExecutorServiceTtlWrapper,也能将ScheduledThreadPoolExecutor包装为ScheduledExecutorServiceTtlWrapper,就像下面这样。

@Configuration
public class ThreadPoolExecutorConfig {
    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        return new ThreadPoolExecutor(
                1,
                1,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable runnable) {
                        return new Thread(runnable, "Pool-Thread-1");
                    }
                });
    }
    @Bean
    public ExecutorService wrappedThreadPool() {
        return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(
                1,
                1,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable runnable) {
                        return new Thread(runnable, "Pool-Thread-1");
                    }
                }));
    }
    @Bean
    public ScheduledExecutorService wrappedScheduledThreadPool() {
        return TtlExecutors.getTtlScheduledExecutorService(new ScheduledThreadPoolExecutor(
                1,
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable runnable) {
                        return new Thread(runnable, "Pool-Thread-1");
                    }
                }));
    }
}

后续直接将Runnable丢给ScheduledExecutorServiceTtlWrapper,也是能完成异步链路追寻的,如下所示。

@RestController
public class RestTemplateController {
    @Autowired
    private RestTemplate restTemplate;
    @Autowired
    private ScheduledExecutorService wrappedScheduledThreadPool;
    ......
    @GetMapping("/async/wrapped-scheduled-thread-pool/send")
    public void asyncSendByWrappedScheduledThreadPool(String url) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        wrappedScheduledThreadPool.schedule(new Runnable() {
            @Override
            public void run() {
                restTemplate.getForEntity(url, Void.class);
                countDownLatch.countDown();
            }
        }, 1, TimeUnit.SECONDS);
        countDownLatch.await();
    }
    ......
}

一起把example-service-1example-service-2发动起来,并调用如下接口。

http://localhost:8080/async/wrapped-scheduled-thread-pool/send?url=http://localhost:8081/receive

example-service-1中打印如下链路日志。

{
    "traceId": "8726f2fcbe845aa884d1beee49950cdf",
    "spanId": "84d1beee49950cdf",
    "parentSpanId": "0000000000000000",
    "timestamp": "1708602842299",
    "duration": "1009",
    "httpCode": "200",
    "host": "http://localhost:8080",
    "requestStacks": [
        {
            "subSpanId": "372acae257de2b85",
            "subHttpCode": "200",
            "subTimestamp": "1708602843302",
            "subDuration": "4",
            "subHost": "localhost:8081"
        }
    ]
}

example-service-2中打印如下链路日志。

{
    "traceId": "8726f2fcbe845aa884d1beee49950cdf",
    "spanId": "372acae257de2b85",
    "parentSpanId": "84d1beee49950cdf",
    "timestamp": "1708602843305",
    "duration": "1",
    "httpCode": "200",
    "host": "http://localhost:8081",
    "requestStacks": []
}

可见异步链路是正常作业的。

5. @Async注解

如果是运用@Async注解来履行异步操作,异步链路追寻同样也是支撑的,只需求把咱们注册到了Spring容器中的包装好的ExecutorServiceTtlWrapper设置给@Async注解,就能完成异步链路追寻。

首要编写一个AsyncService如下所示。

@Service
public class AsyncService {
    @Async("wrappedThreadPool")
    public void send(RestTemplate restTemplate, CountDownLatch countDownLatch, String url) {
        restTemplate.getForEntity(url, Void.class);
        countDownLatch.countDown();
    }
}

然后RestTemplateController中添加如下接口。

@RestController
public class RestTemplateController {
    @Autowired
    private RestTemplate restTemplate;
    @Autowired
    private AsyncService asyncService;
    @Autowired
    private ExecutorService wrappedThreadPool;
    ......
    @GetMapping("/async/annotation/send")
    public void asyncSendByAsyncAnnotation(String url) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        asyncService.send(restTemplate, countDownLatch, url);
        countDownLatch.await();
    }
}

最后需求在发动类上添加@EnableAsync注解来开启对@Async注解的支撑,如下所示。

@EnableAsync
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

一起把example-service-1example-service-2发动起来,并调用如下接口。

http://localhost:8080/async/annotation/send?url=http://localhost:8081/receive

example-service-1中打印如下链路日志。

{
    "traceId": "026fb86839e40a54026924f8fa247e7b",
    "spanId": "026924f8fa247e7b",
    "parentSpanId": "0000000000000000",
    "timestamp": "1708604677860",
    "duration": "5",
    "httpCode": "200",
    "host": "http://localhost:8080",
    "requestStacks": [
        {
            "subSpanId": "b90af03e0bed026c",
            "subHttpCode": "200",
            "subTimestamp": "1708604677860",
            "subDuration": "3",
            "subHost": "localhost:8081"
        }
    ]
}

example-service-2中打印如下链路日志。

{
    "traceId": "026fb86839e40a54026924f8fa247e7b",
    "spanId": "b90af03e0bed026c",
    "parentSpanId": "026924f8fa247e7b",
    "timestamp": "1708604677861",
    "duration": "0",
    "httpCode": "200",
    "host": "http://localhost:8081",
    "requestStacks": []
}

可见异步链路是正常作业的。

总结

异步链路追寻完成的要害,便是运用TransmittableThreadLocal来完成ScopeManager,然后替换掉默许的ThreadLocalScopeManager。异步链路追寻主要便是根据TransmittableThreadLocal供给的跨线程传递本地变量的方法来跨线程传递了Span,所以在实际运用时,也得遵从TransmittableThreadLocal的规矩来对线程池或许使命做一些改造包装。