教程将介绍如何在 Spring Boot 应用程序中运用 Kafka。Kafka 是一个分布式的发布-订阅音讯系统,它能够处理大量数据并提供高吞吐量。

在本教程中,咱们将运用 Spring Boot 2.5.4Kafka 2.8.0

过程一:增加依靠项


在 pom.xml 中增加以下依靠项:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.0</version>
</dependency>

过程二:装备 Kafka


application.yml 文件中增加以下装备:

sping:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

这里咱们装备了 Kafka 的服务地址为 localhost:9092,装备了一个顾客组 ID 为 my-group,并设置了一个最早的偏移量来读取音讯。在生产者方面,咱们装备了音讯序列化程序为 StringSerializer

过程三:创立一个生产者


现在,咱们将创立一个 Kafka 生产者,用于发送音讯到 Kafka 服务器。在这里,咱们将创立一个 RESTful 端点,用于接纳 POST 恳求并将音讯发送到 Kafka。

首要,咱们将创立一个 KafkaProducerConfig 类,用于装备 Kafka 生产者:

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

在上面的代码中,咱们运用 @Configuration 注解将 KafkaProducerConfig 类声明为装备类。然后,咱们运用 @Value 注解注入装备文件中的 bootstrap-servers 特点。

接下来,咱们创立了一个 producerConfigs 办法,用于设置 Kafka 生产者的装备。在这里,咱们设置了 BOOTSTRAP_SERVERS_CONFIGKEY_SERIALIZER_CLASS_CONFIGVALUE_SERIALIZER_CLASS_CONFIG 三个特点。

然后,咱们创立了一个 producerFactory 办法,用于创立 Kafka 生产者工厂。在这里,咱们运用了 DefaultKafkaProducerFactory 类,并传递了咱们的装备。

最后,咱们创立了一个 kafkaTemplate 办法,用于创立 KafkaTemplate 实例。在这里,咱们运用了刚刚创立的生产者工厂作为参数,然后回来 KafkaTemplate 实例。

接下来,咱们将创立一个 RESTful 端点,用于接纳 POST 恳求并将音讯发送到 Kafka。在这里,咱们将运用 @RestController 注解创立一个 RESTful 操控器:

@RestController
public class KafkaController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send("my-topic", message);
    }
}

在上面的代码中,咱们运用 @Autowired 注解将 KafkaTemplate 实例注入到 KafkaController 类中。然后,咱们创立了一个 sendMessage 办法,用于发送音讯到 Kafka。

在这里,咱们运用 kafkaTemplate.send 办法发送音讯到 my-topic 主题。send 办法回来一个 ListenableFuture 对象,用于异步处理结果。

过程四:创立一个顾客


现在,咱们将创立一个 Kafka 顾客,用于从 Kafka 服务器接纳音讯。在这里,咱们将创立一个顾客组,并将其装备为从 my-topic 主题读取音讯。

首要,咱们将创立一个 KafkaConsumerConfig 类,用于装备 Kafka 顾客:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

在上面的代码中,咱们运用 @Configuration 注解将 KafkaConsumerConfig 类声明为装备类,并运用 @EnableKafka 注解启用 Kafka。

然后,咱们运用 @Value 注解注入装备文件中的 bootstrap-serversconsumer.group-id 特点。

接下来,咱们创立了一个 consumerConfigs 办法,用于设置 Kafka 顾客的装备。在这里,咱们设置了 BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIGAUTO_OFFSET_RESET_CONFIGKEY_DESERIALIZER_CLASS_CONFIGVALUE_DESERIALIZER_CLASS_CONFIG 五个特点。

然后,咱们创立了一个 consumerFactory 办法,用于创立 Kafka 顾客工厂。在这里,咱们运用了 DefaultKafkaConsumerFactory 类,并传递了咱们的装备。

最后,咱们创立了一个 kafkaListenerContainerFactory 办法,用于创立一个 ConcurrentKafkaListenerContainerFactory 实例。在这里,咱们将顾客工厂注入到 kafkaListenerContainerFactory 实例中。

接下来,咱们将创立一个 Kafka 顾客类 KafkaConsumer,用于监听 my-topic 主题并接纳音讯:

@Service
public class KafkaConsumer {
    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

在上面的代码中,咱们运用 @KafkaListener 注解声明了一个顾客办法,用于接纳从 my-topic 主题中读取的音讯。在这里,咱们将顾客组 ID 设置为 my-group-id

现在,咱们现已完成了 Kafka 生产者和顾客的设置。咱们能够运用 mvn spring-boot:run 命令发动应用程序,并运用 curl 命令发送 POST 恳求到 http://localhost:8080/send 端点,以将音讯发送到 Kafka。然后,咱们能够在操控台上查看顾客接纳到的音讯。

这就是运用 Spring Boot 和 Kafka 的根本设置。咱们能够根据需要进行更改和扩展,以满意特定的需求。