前语

本文将从Kafka供给的扩展机制出发,结合OpentracingJaeger,完成Kafka的分布式链路追寻。

kafka-clients版别:3.1.2
spring-kafka版别:2.8.11

正文

一. Kafka的阻拦器和监听器

先看一下Kafka的阻拦器和监听器的接口界说。

首先是阻拦器,分为出产者阻拦器ProducerInterceptor和顾客阻拦器ConsumerInterceptor,接口界说如下。

public interface ProducerInterceptor<K, V> extends Configurable {
    // 音讯被发送前履行
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    // 音讯发送出现反常或许发送后触发回调时履行
    void onAcknowledgement(RecordMetadata metadata, Exception exception);
    void close();
}
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
    // 音讯拉取回来前履行
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
    // 提交offset后履行
    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
    void close();
}

再看一下监听器的接口界说,如下所示。

public interface ProducerListener<K, V> {
    // 音讯发送成功后触发回调时履行
    default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
    }
    // 音讯发送有反常触发回调时履行
    default void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata,
                         Exception exception) {
    }
}

那么出产者出产音讯和顾客拉取音讯这两个动作,结合阻拦器和监听器,一个简单的图示图下。

8. Kafka分布式链路追寻完成规划

二. Kafka分布式链路追寻Span模型规划

Kafka作为音讯中间件,支持音讯的pushpull,这姿态的信息传递方法,有别于运用RestTemplate恳求传递音讯,可是实质还是将音讯传递到了下流,那么咱们稍加笼统,就能够得到如下的Span模型。

8. Kafka分布式链路追寻完成规划

其实便是在出产者这里发送音讯时创立代表下流的Span,然后将Span经过RecordHeader来传递给到下流,这姿态全体的思路就和咱们运用RestTemplate恳求下流时,把Span放到HTTP恳求头里传递给下流是相同的。

三. Kafka出产者链路追寻规划和完成

1. Kafka出产者链路追寻规划

初步看好像能够根据出产者阻拦器来完成,例如在onSend() 办法中创立出来Span并加到ProducerRecordHeader中,然后在onAcknowledgement() 办法中调用Spanfinish() 办法来记载Span,可是实际是不能根据出产者阻拦器来完成的,理由如下。

  1. onSend()onAcknowledgement() 办法中,入参都只要发送的音讯,而咱们记载Span时需求的一些其它信息,很难拿到;
  2. 没有办法把Tracer和装修器等对象设置给出产者阻拦器,这是因为出产者阻拦器的初始化是在KafkaProducer中读取到阻拦器的全限定名然后再经过反射的方法创立出来的,咱们很难把Tracer和装修器等对象设置给出产者阻拦器。

鉴于上述的问题,咱们就不能根据出产者阻拦器来完成Kafka出产者链路追寻,有一个很好的替代的思路是,咱们在KafkaTemplate之上再包装一层,作为咱们自己的增强的KafkaTemplate,这里命名为HoneyKafkaTemplate,然后咱们再界说自己的阻拦器接口HoneyKafkaProducerInterceptor,当运用HoneyKafkaTemplate来发送音讯时,会先进入到HoneyKafkaProducerInterceptor,创立Span和记载Span的功用都写在HoneyKafkaProducerInterceptor中,终究发送音讯时,还是将这个发送的动作委派给原生的KafkaTemplate,全体的结构如下所示。

8. Kafka分布式链路追寻完成规划

2. Kafka出产者链路追寻完成

首先咱们完成HoneyKafkaTemplate来替代原生的KafkaTemplateHoneyKafkaTemplate如下所示。

public class HoneyKafkaTemplate<K, V> extends KafkaTemplate<K, V> {
    public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory) {
        super(producerFactory);
    }
    public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory, Map<String, Object> configOverrides) {
        super(producerFactory, configOverrides);
    }
    public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
        super(producerFactory, autoFlush);
    }
    public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush, Map<String, Object> configOverrides) {
        super(producerFactory, autoFlush, configOverrides);
    }
}

然后供给一个主动安装的装备类HoneyKafkaTemplateConfig,如下所示。

@Configuration
@AutoConfigureBefore(KafkaAutoConfiguration.class)
public class HoneyKafkaTemplateConfig {
    @Bean
    public HoneyKafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
        return new HoneyKafkaTemplate<>(kafkaProducerFactory);
    }
}

因为KafkaAutoConfiguration在注册KafkaTemplate时,添加了@ConditionalOnMissingBean(KafkaTemplate.class),所以只需求保证咱们的HoneyKafkaTemplateConfigKafkaAutoConfiguration之前被处理,那么咱们注册的HoneyKafkaTemplate就能替代原生的KafkaTemplate来到达以假乱真的作用。

终究要在spring.factories文件中指定咱们的主动安装类以及在pom文件中添加spring-kafka依靠,如下所示。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=
 com.honey.tracing.config.HoneyTracingConfig,
 com.honey.tracing.config.HoneyTracingFilterConfig,
 com.honey.tracing.config.HoneyRestTemplateTracingConfig,
 com.honey.tracing.config.HoneyKafkaTemplateConfig
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <scope>provided</scope>
</dependency>

现在再给出自界说阻拦器HoneyKafkaProducerInterceptor的界说,如下所示。

public interface HoneyKafkaProducerInterceptor<K, V> {
    /**
     * 阻拦Kafka出产者的音讯发送。
     *
     * @param producerRecord 出产者发送的音讯。
     */
    ListenableFuture<SendResult<K, V>> intercept(
            ProducerRecord<K, V> producerRecord,
            HoneyKafkaProducerInterceptorChain<K, V> kafkaProducerInterceptorChain);
}

咱们再供给一个完成类,如下所示。

public class HoneyKafkaProducerTracingInterceptor<K, V> implements HoneyKafkaProducerInterceptor<K, V> {
    private final Tracer tracer;
    private final List<HoneyKafkaTracingProducerDecorator<K, V>> kafkaTracingProducerDecorators;
    public HoneyKafkaProducerTracingInterceptor(Tracer tracer, List<HoneyKafkaTracingProducerDecorator<K, V>> kafkaTracingProducerDecorators) {
        this.tracer = tracer;
        this.kafkaTracingProducerDecorators = kafkaTracingProducerDecorators;
    }
    @Override
    public ListenableFuture<SendResult<K, V>> intercept(ProducerRecord<K, V> producerRecord,
                                                        HoneyKafkaProducerInterceptorChain<K, V> kafkaProducerInterceptorChain) {
        if (tracer.activeSpan() == null) {
            return kafkaProducerInterceptorChain.intercept(producerRecord);
        }
        // 生成Kafka出产者对应的Span
        // 类似于RestTemplate调用前的Span
        Span span = tracer.buildSpan(HONEY_KAFKA_NAME)
                .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
                .start();
        for (HoneyKafkaTracingProducerDecorator<K, V> kafkaTracingProducerDecorator : kafkaTracingProducerDecorators) {
            try {
                kafkaTracingProducerDecorator.onSend(span, producerRecord);
            } catch (Exception e) {
                // do nothing
            }
        }
        ListenableFuture<SendResult<K, V>> result;
        try (Scope scope = tracer.activateSpan(span)) {
            // 设置Kafka服务端的host
            String hostString = kafkaProducerInterceptorChain.getHoneyKafkaTemplate().getProducerFactory()
                    .getConfigurationProperties().get(BOOTSTRAP_SERVERS_CONFIG).toString();
            span.setTag(FIELD_HOST, hostString.substring(1, hostString.length() - 1));
            // host需求传递给顾客
            span.setBaggageItem(FIELD_HOST, hostString.substring(1, hostString.length() - 1));
            // 把SpanContext注入到ProducerRecord的Headers中
            tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new HoneyKafkaCarrier(producerRecord.headers()));
            try {
                result = kafkaProducerInterceptorChain.intercept(producerRecord);
            } catch (Exception e1) {
                for (HoneyKafkaTracingProducerDecorator<K, V> kafkaTracingProducerDecorator : kafkaTracingProducerDecorators) {
                    try {
                        kafkaTracingProducerDecorator.onError(span, producerRecord);
                    } catch (Exception e2) {
                        // do nothing
                    }
                }
                throw e1;
            }
            for (HoneyKafkaTracingProducerDecorator<K, V> kafkaTracingProducerDecorator : kafkaTracingProducerDecorators) {
                try {
                    kafkaTracingProducerDecorator.onSuccess(span, producerRecord);
                } catch (Exception e) {
                    // do nothing
                }
            }
        } finally {
            span.finish();
            tracer.activeSpan().log(RequestStackUtil.assembleRequestStack((JaegerSpan) span));
        }
        return result;
    }
}

和之前完成的RestTemplate的阻拦器逻辑几乎是一模相同,区别就在于一个是Span放在HTTP恳求头,一个是Span放在音讯的Header中,以及咱们在Inject时运用了一个自界说的HoneyKafkaCarrier,如下所示。

public class HoneyKafkaCarrier implements TextMap {
    private final Headers headers;
    public HoneyKafkaCarrier(Headers headers) {
        this.headers = headers;
    }
    @NotNull
    @Override
    public Iterator<Map.Entry<String, String>> iterator() {
        Map<String, String> headerMap = new HashMap<>();
        for (Header header : headers) {
            headerMap.put(header.key(), new String(header.value()));
        }
        return headerMap.entrySet().iterator();
    }
    @Override
    public void put(String key, String value) {
        headers.add(key, value.getBytes());
    }
}

特别注意到HoneyKafkaProducerInterceptorChain,咱们将所有的HoneyKafkaProducerInterceptor创立为了一个连接器链,如下所示。

public class HoneyKafkaProducerInterceptorChain<K, V> {
    private final Iterator<HoneyKafkaProducerInterceptor<K, V>> kafkaProducerInterceptors;
    private final HoneyKafkaTemplate<K, V> honeyKafkaTemplate;
    public HoneyKafkaProducerInterceptorChain(Iterator<HoneyKafkaProducerInterceptor<K, V>> kafkaProducerInterceptors,
                                              HoneyKafkaTemplate<K, V> honeyKafkaTemplate) {
        this.kafkaProducerInterceptors = kafkaProducerInterceptors;
        this.honeyKafkaTemplate = honeyKafkaTemplate;
    }
    public HoneyKafkaTemplate<K, V> getHoneyKafkaTemplate() {
        return honeyKafkaTemplate;
    }
    public ListenableFuture<SendResult<K, V>> intercept(ProducerRecord<K, V> producerRecord) {
        if (kafkaProducerInterceptors.hasNext()) {
            // 阻拦器没履行完则先履行阻拦器
            return kafkaProducerInterceptors.next().intercept(producerRecord, this);
        }
        // 阻拦器悉数履行完后才发送音讯
        return honeyKafkaTemplate.actuallySend(producerRecord);
    }
}

意图如下。

  1. 支持后续添加更多的阻拦器;
  2. 让连接器链持有HoneyKafkaTemplate从而所有阻拦器都持有了HoneyKafkaTemplate

至于装修器,咱们暂时就界说一个空接口出来,后边按需添加完成类,如下所示。

/**
 * Kafka出产者链路追寻装修器。
 */
public interface HoneyKafkaTracingProducerDecorator<K, V> {
    void onSend(Span span, ProducerRecord<K, V> producerRecord);
    void onSuccess(Span span, ProducerRecord<K, V> producerRecord);
    void onError(Span span, ProducerRecord<K, V> producerRecord);
}

现在已经有阻拦器链了,接下来要做的事情便是把阻拦器链给到咱们的HoneyKafkaTemplate,如下所示。

public class HoneyKafkaTemplate<K, V> extends KafkaTemplate<K, V> {
    private List<HoneyKafkaProducerInterceptor<K, V>> kafkaProducerInterceptors;
    public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory,
                              List<HoneyKafkaProducerInterceptor<K, V>> kafkaProducerInterceptors) {
        super(producerFactory);
        this.kafkaProducerInterceptors = kafkaProducerInterceptors;
    }
    public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory, Map<String, Object> configOverrides) {
        super(producerFactory, configOverrides);
    }
    public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
        super(producerFactory, autoFlush);
    }
    public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush, Map<String, Object> configOverrides) {
        super(producerFactory, autoFlush, configOverrides);
    }
    public ListenableFuture<SendResult<K, V>> actuallySend(ProducerRecord<K, V> producerRecord) {
        return super.doSend(producerRecord);
    }
    @NotNull
    @Override
    public ListenableFuture<SendResult<K, V>> doSend(@NotNull ProducerRecord<K, V> producerRecord) {
        HoneyKafkaProducerInterceptorChain<K, V> kafkaProducerInterceptorChain
                = new HoneyKafkaProducerInterceptorChain<>(kafkaProducerInterceptors.iterator(), this);
        return kafkaProducerInterceptorChain.intercept(producerRecord);
    }
}

这样在运用HoneyKafkaTemplate来发送音讯时,能够先让阻拦器链履行,然后再把音讯委派给原生KafkaTemplate来发送。

现在需求修正注册HoneyKafkaTemplate的主动安装类,如下所示。

@Configuration
@ConditionalOnBean(KafkaTemplate.class)
@AutoConfigureBefore(KafkaAutoConfiguration.class)
public class HoneyKafkaTemplateConfig {
    @Bean
    public HoneyKafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
                                                  List<HoneyKafkaProducerInterceptor<Object, Object>> kafkaProducerInterceptors) {
        return new HoneyKafkaTemplate<>(kafkaProducerFactory, kafkaProducerInterceptors);
    }
}

即在创立HoneyKafkaTemplate时需求把注册到容器中的HoneyKafkaProducerInterceptor设置进去。一起还要为咱们的阻拦器的完成HoneyKafkaProducerTracingInterceptor供给一个主动安装类,如下所示。

@Configuration
@ConditionalOnBean(KafkaTemplate.class)
@AutoConfigureAfter(HoneyTracingConfig.class)
public class HoneyKafkaTracingConfig {
    @Bean
    public HoneyKafkaProducerInterceptor kafkaProducerTracingInterceptor(
            Tracer tracer, List<HoneyKafkaTracingProducerDecorator<Object, Object>> kafkaTracingProducerDecorators) {
        return new HoneyKafkaProducerTracingInterceptor<>(tracer, kafkaTracingProducerDecorators);
    }
}

对应修正spring.factories文件如下所示。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=
 com.honey.tracing.config.HoneyTracingConfig,
 com.honey.tracing.config.HoneyTracingFilterConfig,
 com.honey.tracing.config.HoneyRestTemplateTracingConfig,
 com.honey.tracing.config.HoneyKafkaTemplateConfig,
 com.honey.tracing.config.HoneyKafkaTracingConfig

至此咱们的具有分布式链路追寻功用的HoneyKafkaTemplate就能够运用啦,终究给出上述运用到的常量类,如下所示。

public class CommonConstants {
    public static final double DEFAULT_SAMPLE_RATE = 1.0;
    public static final String HONEY_TRACER_NAME = "HoneyTracer";
    public static final String HONEY_REST_TEMPLATE_NAME = "HoneyRestTemplate";
    public static final String HONEY_KAFKA_NAME = "HoneyKafka";
    public static final String FIELD_HOST = "host";
    public static final String FIELD_API = "api";
    public static final String FIELD_HTTP_CODE = "httpCode";
    public static final String FIELD_SUB_SPAN_ID = "subSpanId";
    public static final String FIELD_SUB_HTTP_CODE = "subHttpCode";
    public static final String FIELD_SUB_TIMESTAMP = "subTimestamp";
    public static final String FIELD_SUB_DURATION = "subDuration";
    public static final String FIELD_SUB_HOST = "subHost";
    public static final String HOST_PATTERN_STR = "(?<=(https://|http://)).*?(?=/)";
    public static final String SLASH = "/";
    public static final String LOG_EVENT_KIND = "logEventKind";
    public static final String LOG_EVENT_KIND_REQUEST_STACK = "requestStack";
}

四. Kafka顾客链路追寻规划和完成

1. Kafka顾客链路追寻规划

同样的,Kafka顾客阻拦器也不适合来完成分布式链路追寻,理由和第三节第1末节中基本共同。

通常在Spring中完成Kafka音讯消费时,咱们运用@KafkaListener注解,被该注解修饰的办法用于消费音讯并处理,入参是ConsumerRecord,所以咱们能够挑选供给切面来切这些被@KafkaListener注解修饰的办法并在办法履行前后操作Span,一起为了和@KafkaListener注解解耦,咱们能够专门界说一个注解来作为咱们的切点,那么现在的音讯消费流程就成下面这样了。

8. Kafka分布式链路追寻完成规划

2. Kafka顾客链路追寻完成

首先界说作为切点的注解,如下所示。

/**
 * Kafka顾客办法运用该注解。
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface HoneyKafkaTracing {
}

对应的切面如下所示。

@Aspect
public class HoneyKafkaConsumerTracingAspect {
    private final HoneyKafkaConsumerInterceptor kafkaConsumerTracingInterceptor;
    public HoneyKafkaConsumerTracingAspect(HoneyKafkaConsumerInterceptor kafkaConsumerTracingInterceptor) {
        this.kafkaConsumerTracingInterceptor = kafkaConsumerTracingInterceptor;
    }
    @Pointcut("@annotation(com.honey.tracing.kafka.consumer.interceptor.HoneyKafkaTracing)")
    private void kafkaTracing() {
    }
    @Around("kafkaTracing()")
    public Object intercept(ProceedingJoinPoint joinPoint) throws Throwable {
        return kafkaConsumerTracingInterceptor.intercept(joinPoint);
    }
}

在盘绕告诉中,会调用到咱们自界说的Kafka顾客阻拦器,接口界说如下。

public interface HoneyKafkaConsumerInterceptor {
    Object intercept(ProceedingJoinPoint joinPoint) throws Throwable;
}

具体的完成类,如下所示。

public class HoneyKafkaConsumerTracingInterceptor<K, V> implements HoneyKafkaConsumerInterceptor {
    private final Tracer tracer;
    private final List<HoneyKafkaTracingConsumerDecorator<K, V>> kafkaTracingConsumerDecorators;
    public HoneyKafkaConsumerTracingInterceptor(
            Tracer tracer, List<HoneyKafkaTracingConsumerDecorator<K, V>> kafkaTracingConsumerDecorators) {
        this.tracer = tracer;
        this.kafkaTracingConsumerDecorators = kafkaTracingConsumerDecorators;
    }
    @Override
    public Object intercept(ProceedingJoinPoint joinPoint) throws Throwable {
        ConsumerRecord consumerRecord = null;
        // 找到Kafka音讯
        Object[] args = joinPoint.getArgs();
        for (Object arg : args) {
            if (arg instanceof ConsumerRecord) {
                consumerRecord = (ConsumerRecord) arg;
            }
        }
        if (consumerRecord == null) {
            // 没有获取到Kafka音讯则不处理链路
            return joinPoint.proceed();
        }
        SpanContext extractSpanContext = tracer.extract(Format.Builtin.HTTP_HEADERS,
                new HoneyKafkaCarrier(consumerRecord.headers()));
        Span span = tracer.buildSpan(HONEY_KAFKA_NAME)
                .asChildOf(extractSpanContext)
                .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
                .start();
        span.setTag(FIELD_HOST, span.getBaggageItem(FIELD_HOST));
        for (HoneyKafkaTracingConsumerDecorator<K, V> kafkaTracingConsumerDecorator : kafkaTracingConsumerDecorators) {
            try {
                kafkaTracingConsumerDecorator.onReceive(span, consumerRecord);
            } catch (Exception e) {
                // do nothing
            }
        }
        Object result;
        try (Scope scope = tracer.activateSpan(span)) {
            try {
                result = joinPoint.proceed();
            } catch (Exception e1) {
                for (HoneyKafkaTracingConsumerDecorator<K, V> kafkaTracingConsumerDecorator : kafkaTracingConsumerDecorators) {
                    try {
                        kafkaTracingConsumerDecorator.onError(span, consumerRecord);
                    } catch (Exception e2) {
                        // do nothing
                    }
                }
                throw e1;
            }
            for (HoneyKafkaTracingConsumerDecorator<K, V> kafkaTracingConsumerDecorator : kafkaTracingConsumerDecorators) {
                try {
                    kafkaTracingConsumerDecorator.onFinished(span, consumerRecord);
                } catch (Exception e) {
                    // do nothing
                }
            }
        } finally {
            span.finish();
        }
        return result;
    }
}

同样的咱们也界说了一个装修器接口,如下所示。

/**
 * Kafka顾客链路追寻装修器。
 */
public interface HoneyKafkaTracingConsumerDecorator<K, V> {
    void onReceive(Span span, ConsumerRecord<K, V> consumerRecord);
    void onFinished(Span span, ConsumerRecord<K, V> consumerRecord);
    void onError(Span span, ConsumerRecord<K, V> consumerRecord);
}

然后需求修正一下主动安装类HoneyKafkaTracingConfig来将咱们自界说的Kafka顾客阻拦器和切面注册到容器中,修正如下。

@Configuration
@AutoConfigureAfter(HoneyTracingConfig.class)
public class HoneyKafkaTracingConfig {
    @Bean
    public HoneyKafkaProducerInterceptor kafkaProducerTracingInterceptor(
            Tracer tracer, List<HoneyKafkaTracingProducerDecorator<Object, Object>> kafkaTracingProducerDecorators) {
        return new HoneyKafkaProducerTracingInterceptor<>(tracer, kafkaTracingProducerDecorators);
    }
    @Bean
    public HoneyKafkaConsumerInterceptor honeyKafkaConsumerInterceptor(
            Tracer tracer, List<HoneyKafkaTracingConsumerDecorator<Object, Object>> kafkaTracingProducerDecorators) {
        return new HoneyKafkaConsumerTracingInterceptor(tracer, kafkaTracingProducerDecorators);
    }
    @Bean
    public HoneyKafkaConsumerTracingAspect honeyKafkaConsumerTracingAspect(
            HoneyKafkaConsumerInterceptor kafkaConsumerTracingInterceptor) {
        return new HoneyKafkaConsumerTracingAspect(kafkaConsumerTracingInterceptor);
    }
}

终究,因为运用到了切面,所以需求引进SpringAOP的依靠,如下所示。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

五. 功用验证

现在运用一个简单的Kafka的例子来验证Kafka分布式链路追寻的功用。

咱们让example-service-1充任出产者,pom中引进Kafka依靠如下所示。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

然后添加出产者的装备,如下所示。

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

终究添加一个KafkaController来发送Kafka音讯,如下所示。

@RestController
public class KafkaController {
    private static final String TEST_TOPIC = "testTopic";
    private static final String TEST_MESSAGE = "testMessage";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @GetMapping("/kafka/send")
    public void send(String url) {
        kafkaTemplate.send(TEST_TOPIC, TEST_MESSAGE);
    }
}

咱们再让example-service-2充任顾客,pom中引进Kafka依靠如下所示。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

然后添加顾客的装备,如下所示。

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: kafka-tracing

终究添加一个KafkaService来消费音讯,如下所示。

@Service
public class KafkaService {
    private static final String TEST_TOPIC = "testTopic";
    @HoneyKafkaTracing
    @KafkaListener(topics = TEST_TOPIC)
    public void onMessage(ConsumerRecord<String, String> consumerRecord) {
        System.out.println(consumerRecord.value());
    }
}

终究example-service-1打印链路如下所示。

{
	"traceId": "904360d3ae85faa7ce5d698debda55fd",
	"spanId": "ce5d698debda55fd",
	"parentSpanId": "0000000000000000",
	"timestamp": "1708067308855",
	"duration": "306",
	"httpCode": "200",
	"host": "http://localhost:8080",
	"requestStacks": [{
		"subSpanId": "7cb4062ca0fa7cdd",
		"subHttpCode": "null",
		"subTimestamp": "1708067308876",
		"subDuration": "262",
		"subHost": "127.0.0.1:9092"
	}]
}

example-service-2打印链路如下所示。

{
	"traceId": "904360d3ae85faa7ce5d698debda55fd",
	"spanId": "7cb4062ca0fa7cdd",
	"parentSpanId": "ce5d698debda55fd",
	"timestamp": "1708067309171",
	"duration": "5",
	"httpCode": "null",
	"host": "127.0.0.1:9092",
	"requestStacks": []
}

可见链路信息经过Kafka音讯得到了传递。

总结

Kafka尽管是经过pushpull方法来传递音讯,可是咱们能够将这种方法笼统为出产者直接将音讯传递给到了顾客,传递介质便是Record,一起Record中预留了Header,咱们就能够将Span经过Header来传递,整个模式就和经过RestTemplate恳求下流一模相同了。

Kafka分布式链路追寻的完成,实质是阻拦Kafka的音讯发送和接纳,尽管Kafka供给了阻拦器,可是因为Kafka阻拦器的加载机制问题,运用起来并不是很方便,所以咱们挑选自界说阻拦器和切面的方法来阻拦Kafka的音讯发送和接纳,一起为了方便的运用咱们自界说的阻拦器,咱们还自界说了KafkaTemplate来对原生KafkaTemplate进行了包装,运用自界说KafkaTemplate发送音讯时,能够先在自界说KafkaTemplate中先应用自界说阻拦器的逻辑,然后真实发送音讯的动作就委派给原生KafkaTemplate

本文中,honey-starter-tracing的工程目录结构如下所示。

8. Kafka分布式链路追寻完成规划

测试demo的工程目录结构如下所示。

8. Kafka分布式链路追寻完成规划