前言
在应用进程中,链路信息的传递实际便是Span的传递,咱们之前是根据ThreadLocal将Span作为线程本地变量来传递的,这在同步的场景下是没有问题的,可是一旦涉及到异步场景例如异步调用下流或许异步查询数据库等,那么Span信息就丢了,原因便是ThreadLocal设置的本地变量无法跨线程传递,所以本文将根据TransmittableThreadLocal完成异步链路追寻以处理异步场景下Span信息无法传递的问题。
关于TransmittableThreadLocal,能够参阅图解Java线程间本地变量传递,本文将不再赘述TransmittableThreadLocal的原理。
transmittable-thread-local版本:2.11.4
github地址:honey-tracing
正文
1. 异步链路追寻改造设计与完成
其实要完成异步链路追寻很简单,回忆一下之前的完成中,Span会被包装为ThreadLocalScope,然后交由ThreadLocalScopeManager来进行传递,而ThreadLocalScopeManager传递ThreadLocalScope的方法便是将ThreadLocalScope经过ThreadLocal设置为线程的本地变量,那么问题就在这儿,ThreadLocal设置的线程本地变量无法跨线程传递,所以要处理这个问题,便是将能够跨线程传递本地变量的TransmittableThreadLocal替换掉ThreadLocal,问题就完美处理了。
首要引进TransmittableThreadLocal的依靠,如下所示。
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.11.4</version>
<scope>provided</scope>
</dependency>
其次供给异步链路追寻的开关装备,修改后的特点装备类HoneyTracingProperties如下所示。
/**
* 分布式链路追寻装备特点类。
*/
@ConfigurationProperties("honey.tracing")
public class HoneyTracingProperties {
private boolean enabled;
private HttpUrlProperties httpUrl = new HttpUrlProperties();
private AsyncProperties async = new AsyncProperties();
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public HttpUrlProperties getHttpUrl() {
return httpUrl;
}
public void setHttpUrl(HttpUrlProperties httpUrl) {
this.httpUrl = httpUrl;
}
public AsyncProperties getAsync() {
return async;
}
public void setAsync(AsyncProperties async) {
this.async = async;
}
public static class HttpUrlProperties {
/**
* 按照/url1,/url2这样装备。
*/
private String urlPattern = "/*";
/**
* 按照/url1|/honey.*这样装备。
*/
private String skipPattern = "";
public String getUrlPattern() {
return urlPattern;
}
public void setUrlPattern(String urlPattern) {
this.urlPattern = urlPattern;
}
public String getSkipPattern() {
return skipPattern;
}
public void setSkipPattern(String skipPattern) {
this.skipPattern = skipPattern;
}
}
public static class AsyncProperties {
private boolean enabled;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}
}
由于ThreadLocalScope和ThreadLocalScopeManager的内部完成已经写死,所以咱们需求自己供给完成类来替换掉它们,咱们自己的完成类如下所示。
/**
* 依靠{@link TransmittableThreadLocal}的{@link Scope}完成。
*/
public class HoneyTtlScope implements Scope {
private final HoneyTtlScopeManager scopeManager;
private final Span wrapped;
private final HoneyTtlScope toRestore;
HoneyTtlScope(HoneyTtlScopeManager scopeManager, Span wrapped) {
this.scopeManager = scopeManager;
this.wrapped = wrapped;
this.toRestore = scopeManager.tlsScope.get();
scopeManager.tlsScope.set(this);
}
@Override
public void close() {
if (scopeManager.tlsScope.get() != this) {
return;
}
scopeManager.tlsScope.set(toRestore);
}
Span span() {
return wrapped;
}
}
/**
* 根据{@link TransmittableThreadLocal}的{@link ScopeManager}完成。
*/
public class HoneyTtlScopeManager implements ScopeManager {
final InheritableThreadLocal<HoneyTtlScope> tlsScope = new TransmittableThreadLocal<>();
@Override
public Scope activate(Span span) {
return new HoneyTtlScope(this, span);
}
@Override
public Span activeSpan() {
HoneyTtlScope scope = tlsScope.get();
return scope == null ? null : scope.span();
}
}
然后咱们需求将HoneyTtlScopeManager替换掉ThreadLocalScopeManager,这一步在创立Tracer的时分完成,所以修改HoneyTracingConfig如下所示。
/**
* 分布式链路追寻装备类。
*/
@Configuration
@EnableConfigurationProperties({HoneyTracingProperties.class})
@ConditionalOnProperty(prefix = "honey.tracing", name = "enabled", havingValue = "true", matchIfMissing = true)
public class HoneyTracingConfig {
@Bean
@ConditionalOnMissingBean(Sampler.class)
public Sampler sampler() {
return new ProbabilisticSampler(DEFAULT_SAMPLE_RATE);
}
@Bean
@ConditionalOnMissingBean(Reporter.class)
public Reporter reporter() {
return new HoneySpanReporter();
}
@Bean
@ConditionalOnProperty(prefix = "honey.tracing.async", name = "enabled", havingValue = "true")
public ScopeManager honeyTtlScopeManager() {
return new HoneyTtlScopeManager();
}
@Bean
@ConditionalOnProperty(prefix = "honey.tracing.async", name = "enabled", havingValue = "false", matchIfMissing = true)
public ScopeManager threadLocalScopeManager() {
return new ThreadLocalScopeManager();
}
@Bean
@ConditionalOnMissingBean(Tracer.class)
public Tracer tracer(Sampler sampler, Reporter reporter, ScopeManager scopeManager) {
return new JaegerTracer.Builder(HONEY_TRACER_NAME)
.withTraceId128Bit()
.withZipkinSharedRpcSpan()
.withSampler(sampler)
.withReporter(reporter)
.withScopeManager(scopeManager)
.build();
}
}
那么至此,异步链路追寻改造完成。
二. 异步链路追寻运用说明
咱们对之前搭建好的example-service-1进行改造,来演示异步链路追寻的运用。
在开端演示前,需求先引进TransmittableThreadLocal的依靠,如下所示。
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.11.4</version>
</dependency>
然后经过装备打开异步链路追寻,如下所示。
honey:
tracing:
async:
enabled: true
1. new Thread
首要演示直接经过new一个Thread的方法来异步履行使命,在RestTemplateController新增如下代码。
@RestController
public class RestTemplateController {
@Autowired
private RestTemplate restTemplate;
......
@GetMapping("/async/thread/send")
public void syncSendByThread(String url) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
restTemplate.getForEntity(url, Void.class);
countDownLatch.countDown();
}
}).start();
countDownLatch.await();
}
......
}
一起把example-service-1和example-service-2发动起来,并调用如下接口。
http://localhost:8080/async/thread/send?url=http://localhost:8081/receive
在example-service-1中打印如下链路日志。
{
"traceId": "96effadb892bd423c19d32ed70b9a406",
"spanId": "c19d32ed70b9a406",
"parentSpanId": "0000000000000000",
"timestamp": "1708599959132",
"duration": "5",
"httpCode": "200",
"host": "http://localhost:8080",
"requestStacks": [
{
"subSpanId": "43731381f988131a",
"subHttpCode": "200",
"subTimestamp": "1708599959133",
"subDuration": "3",
"subHost": "localhost:8081"
}
]
}
在example-service-2中打印如下链路日志。
{
"traceId": "96effadb892bd423c19d32ed70b9a406",
"spanId": "43731381f988131a",
"parentSpanId": "c19d32ed70b9a406",
"timestamp": "1708599959134",
"duration": "0",
"httpCode": "200",
"host": "http://localhost:8081",
"requestStacks": []
}
可见异步链路是正常作业的。
2. 直接运用ThreadPoolExecutor
通常咱们做异步操作,是不会直接new一个Thread来履行异步使命的,而是会把使命丢到线程池,让线程池来履行使命,假设咱们的使命是一个Runnable,那么应该像下面这样来运用。
@RestController
public class RestTemplateController {
@Autowired
private RestTemplate restTemplate;
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
......
@GetMapping("/async/thread-pool/send")
public void asyncSendByThreadPool(String url) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
threadPoolExecutor.execute(TtlRunnable.get(new Runnable() {
@Override
public void run() {
restTemplate.getForEntity(url, Void.class);
countDownLatch.countDown();
}
}));
countDownLatch.await();
}
......
}
其间运用的线程池由ThreadPoolExecutorConfig注册到了Spring容器中,如下所示。
@Configuration
public class ThreadPoolExecutorConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
return new ThreadPoolExecutor(
1,
1,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "Pool-Thread-1");
}
});
}
}
所以直接运用ThreadPoolExecutor来履行Runnable时,需求先将Runnable转换为TtlRunnable,然后再丢到ThreadPoolExecutor中履行,同理,如果是直接运用ThreadPoolExecutor来履行Callable,那么需求先将Callable转换为TtlCallable。
一起把example-service-1和example-service-2发动起来,并调用如下接口。
http://localhost:8080/async/thread-pool/send?url=http://localhost:8081/receive
在example-service-1中打印如下链路日志。
{
"traceId": "2a198cc0a513f2ca455c98d9a320ebc7",
"spanId": "455c98d9a320ebc7",
"parentSpanId": "0000000000000000",
"timestamp": "1708601760228",
"duration": "10",
"httpCode": "200",
"host": "http://localhost:8080",
"requestStacks": [
{
"subSpanId": "7332de988308e305",
"subHttpCode": "200",
"subTimestamp": "1708601760232",
"subDuration": "5",
"subHost": "localhost:8081"
}
]
}
在example-service-2中打印如下链路日志。
{
"traceId": "2a198cc0a513f2ca455c98d9a320ebc7",
"spanId": "7332de988308e305",
"parentSpanId": "455c98d9a320ebc7",
"timestamp": "1708601760235",
"duration": "1",
"httpCode": "200",
"host": "http://localhost:8081",
"requestStacks": []
}
可见异步链路是正常作业的。
3. 将ThreadPoolExecutor包装为ExecutorServiceTtlWrapper
如果每次异步履行使命时,都要将Runnable包装为TtlRunnable才干完成异步链路追寻,这样的代码写起来实在是太繁琐了,此时既然不想包装Runnable,那么咱们能够挑选包装ThreadPoolExecutor,就像下面这样。
@Configuration
public class ThreadPoolExecutorConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
return new ThreadPoolExecutor(
1,
1,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "Pool-Thread-1");
}
});
}
@Bean
public ExecutorService wrappedThreadPool() {
return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(
1,
1,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "Pool-Thread-1");
}
}));
}
}
经过TtlExecutors能够将咱们供给的ThreadPoolExecutor包装为ExecutorServiceTtlWrapper,后续直接将Runnable丢给ExecutorServiceTtlWrapper,也能完成异步链路追寻,如下所示。
@RestController
public class RestTemplateController {
@Autowired
private RestTemplate restTemplate;
@Autowired
private ExecutorService wrappedThreadPool;
......
@GetMapping("/async/wrapped-thread-pool/send")
public void asyncSendByWrappedThreadPool(String url) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
wrappedThreadPool.execute(new Runnable() {
@Override
public void run() {
restTemplate.getForEntity(url, Void.class);
countDownLatch.countDown();
}
});
countDownLatch.await();
}
......
}
一起把example-service-1和example-service-2发动起来,并调用如下接口。
http://localhost:8080/async/wrapped-thread-pool/send?url=http://localhost:8081/receive
在example-service-1中打印如下链路日志。
{
"traceId": "075b13d4cb10d633fc7eb71ca8d0079a",
"spanId": "fc7eb71ca8d0079a",
"parentSpanId": "0000000000000000",
"timestamp": "1708602413533",
"duration": "7",
"httpCode": "200",
"host": "http://localhost:8080",
"requestStacks": [
{
"subSpanId": "eee3dae02dd9139e",
"subHttpCode": "200",
"subTimestamp": "1708602413535",
"subDuration": "4",
"subHost": "localhost:8081"
}
]
}
在example-service-2中打印如下链路日志。
{
"traceId": "075b13d4cb10d633fc7eb71ca8d0079a",
"spanId": "eee3dae02dd9139e",
"parentSpanId": "fc7eb71ca8d0079a",
"timestamp": "1708602413538",
"duration": "0",
"httpCode": "200",
"host": "http://localhost:8081",
"requestStacks": []
}
可见异步链路是正常作业的。
4. 将ScheduledThreadPoolExecutor包装为ScheduledExecutorServiceTtlWrapper
不单能够把ThreadPoolExecutor包装为ExecutorServiceTtlWrapper,也能将ScheduledThreadPoolExecutor包装为ScheduledExecutorServiceTtlWrapper,就像下面这样。
@Configuration
public class ThreadPoolExecutorConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
return new ThreadPoolExecutor(
1,
1,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "Pool-Thread-1");
}
});
}
@Bean
public ExecutorService wrappedThreadPool() {
return TtlExecutors.getTtlExecutorService(new ThreadPoolExecutor(
1,
1,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "Pool-Thread-1");
}
}));
}
@Bean
public ScheduledExecutorService wrappedScheduledThreadPool() {
return TtlExecutors.getTtlScheduledExecutorService(new ScheduledThreadPoolExecutor(
1,
new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "Pool-Thread-1");
}
}));
}
}
后续直接将Runnable丢给ScheduledExecutorServiceTtlWrapper,也是能完成异步链路追寻的,如下所示。
@RestController
public class RestTemplateController {
@Autowired
private RestTemplate restTemplate;
@Autowired
private ScheduledExecutorService wrappedScheduledThreadPool;
......
@GetMapping("/async/wrapped-scheduled-thread-pool/send")
public void asyncSendByWrappedScheduledThreadPool(String url) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
wrappedScheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
restTemplate.getForEntity(url, Void.class);
countDownLatch.countDown();
}
}, 1, TimeUnit.SECONDS);
countDownLatch.await();
}
......
}
一起把example-service-1和example-service-2发动起来,并调用如下接口。
http://localhost:8080/async/wrapped-scheduled-thread-pool/send?url=http://localhost:8081/receive
在example-service-1中打印如下链路日志。
{
"traceId": "8726f2fcbe845aa884d1beee49950cdf",
"spanId": "84d1beee49950cdf",
"parentSpanId": "0000000000000000",
"timestamp": "1708602842299",
"duration": "1009",
"httpCode": "200",
"host": "http://localhost:8080",
"requestStacks": [
{
"subSpanId": "372acae257de2b85",
"subHttpCode": "200",
"subTimestamp": "1708602843302",
"subDuration": "4",
"subHost": "localhost:8081"
}
]
}
在example-service-2中打印如下链路日志。
{
"traceId": "8726f2fcbe845aa884d1beee49950cdf",
"spanId": "372acae257de2b85",
"parentSpanId": "84d1beee49950cdf",
"timestamp": "1708602843305",
"duration": "1",
"httpCode": "200",
"host": "http://localhost:8081",
"requestStacks": []
}
可见异步链路是正常作业的。
5. @Async注解
如果是运用@Async注解来履行异步操作,异步链路追寻同样也是支撑的,只需求把咱们注册到了Spring容器中的包装好的ExecutorServiceTtlWrapper设置给@Async注解,就能完成异步链路追寻。
首要编写一个AsyncService如下所示。
@Service
public class AsyncService {
@Async("wrappedThreadPool")
public void send(RestTemplate restTemplate, CountDownLatch countDownLatch, String url) {
restTemplate.getForEntity(url, Void.class);
countDownLatch.countDown();
}
}
然后RestTemplateController中添加如下接口。
@RestController
public class RestTemplateController {
@Autowired
private RestTemplate restTemplate;
@Autowired
private AsyncService asyncService;
@Autowired
private ExecutorService wrappedThreadPool;
......
@GetMapping("/async/annotation/send")
public void asyncSendByAsyncAnnotation(String url) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
asyncService.send(restTemplate, countDownLatch, url);
countDownLatch.await();
}
}
最后需求在发动类上添加@EnableAsync注解来开启对@Async注解的支撑,如下所示。
@EnableAsync
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
一起把example-service-1和example-service-2发动起来,并调用如下接口。
http://localhost:8080/async/annotation/send?url=http://localhost:8081/receive
在example-service-1中打印如下链路日志。
{
"traceId": "026fb86839e40a54026924f8fa247e7b",
"spanId": "026924f8fa247e7b",
"parentSpanId": "0000000000000000",
"timestamp": "1708604677860",
"duration": "5",
"httpCode": "200",
"host": "http://localhost:8080",
"requestStacks": [
{
"subSpanId": "b90af03e0bed026c",
"subHttpCode": "200",
"subTimestamp": "1708604677860",
"subDuration": "3",
"subHost": "localhost:8081"
}
]
}
在example-service-2中打印如下链路日志。
{
"traceId": "026fb86839e40a54026924f8fa247e7b",
"spanId": "b90af03e0bed026c",
"parentSpanId": "026924f8fa247e7b",
"timestamp": "1708604677861",
"duration": "0",
"httpCode": "200",
"host": "http://localhost:8081",
"requestStacks": []
}
可见异步链路是正常作业的。
总结
异步链路追寻完成的要害,便是运用TransmittableThreadLocal来完成ScopeManager,然后替换掉默许的ThreadLocalScopeManager。异步链路追寻主要便是根据TransmittableThreadLocal供给的跨线程传递本地变量的方法来跨线程传递了Span,所以在实际运用时,也得遵从TransmittableThreadLocal的规矩来对线程池或许使命做一些改造包装。