前语

目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~

本节给咱们讲一下Kafka怎么整合SpringBoot以及它的根本运用~

好了, 废话不多说直接开整吧~

项目建立

相同的,需求咱们建立一个maven工程,整合十分的简略,需求用到:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

来一同看下完好的pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>springboot-kafka-all</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
    </parent>
    <dependencies>
        <!--web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--test-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <!-- kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!--Hutool依赖-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.4</version>
        </dependency>
        <!--fast-json-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
        <dependency>
            <groupId> org.slf4j </groupId>
            <artifactId> slf4j-api </artifactId>
            <version> 1.6.4 </version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.1.3.RELEASE</version>
            </plugin>
        </plugins>
    </build>
</project>

配置也很简略 application.yml

server:
  port: 8081
spring:
  kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092

然后新建一个发动类,看下控制台是否成功链接了Kafka,在发动之前别忘了敞开咱们的Kafka集群哦~

根本运用

先从一个简略的例子,来快速体验一下Kafka,新建HelloController

@Slf4j
@RestController
public class HelloController {
    private static final String topic = "test";
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;
    // 接纳音讯
    @KafkaListener(id = "helloGroup", topics = topic)
    public void listen(String msg) {
        log.info("hello receive value: {}" , msg);
        // hello receive value: hello kafka
    }
    @GetMapping("/hello")
    public String hello() {
        // 发送音讯
        kafkaTemplate.send(topic, "hello kafka");
        return "hello";
    }
}

咱们通过KafkaTemplate进行音讯的发送, 通过@KafkaListener进行音讯的消费,咱们能够指定顾客ID以及监听的topic,请求localhost:8081/hello调查控制台的改变。请求后,发现音讯发送和接纳的十分快,咱们也能够调查UI后台的音讯详情,同步对比

topic创立

之前咱们的topic是在UI后台创立的,那么在SpringBoot中怎么创立呢? 下面咱们试着发送一个不存在的topic

 // 当topic不存在时 会默许创立一个topic
    // num.partitions = 1 #默许Topic分区数
    // num.replica.fetchers = 1 #默许副本数
    @GetMapping("/hello1")
    public String hello1() {
        // 发送音讯
        kafkaTemplate.send("hello1", "hello1");
        return "hello1";
    }
    // 接纳音讯
    @KafkaListener(id = "hello1Group", topics = "hello1")
    public void listen1(String msg) {
        log.info("hello1 receive value: {}" , msg);
        // hello1 receive value: hello1
    }

请求之后,调查控制台以及办理后台,发现并没有报错,而且给咱们主动创立了一个topic,在主动创立下,默许的参数是:

  num.partitions = 1 #默许Topic分区数
  num.replica.fetchers = 1 #默许副本数

假如我想手动创立呢?咱们能够通过NewTopic来手动创立:

@Configuration
public class KafkaConfig {
    @Bean
    public KafkaAdmin admin(KafkaProperties properties){
        KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
        // 默许False,在Broker不行用时,假如你觉得Broker不行用影响正常业务需求显示的将这个值设置为True
        admin.setFatalIfBrokerNotAvailable(true);
        // setAutoCreate(false) : 默许值为True,也就是Kafka实例化后会主动创立已经实例化的NewTopic对象
        // initialize():当setAutoCreate为false时,需求咱们程序显示的调用admin的initialize()方法来初始化NewTopic对象
        return admin;
    }
    /**
     * 创立指定参数的 topic
     * @return
     */
    @Bean
    public NewTopic topic() {
        return new NewTopic("hello2", 0, (short) 0);
    }
}

假如要更新呢?也十分的简略

 /**
     * 更新 topic
     * @return
     */
    @Bean
    public NewTopic topicUpdate() {
        return new NewTopic("hello2", 1, (short) 1);
    }

留意这儿的参数只能+不能-

那么又有小伙伴问了,这种方法太简略了,假如我想在代码逻辑中来创立呢?咱们能够通过AdminClient来手动创立

  /**
     * AdminClient 创立
     */
    @Autowired
    private KafkaProperties properties;
    @GetMapping("/create/{topicName}")
    public String createTopic(@PathVariable String topicName) {
        AdminClient client = AdminClient.create(properties.buildAdminProperties());
        if(client !=null){
            try {
                Collection<NewTopic> newTopics = new ArrayList<>(1);
                newTopics.add(new NewTopic(topicName,1,(short) 1));
                client.createTopics(newTopics);
            }catch (Throwable e){
                e.printStackTrace();
            }finally {
                client.close();
            }
        }
        return topicName;
    }

调查下办理后台,发现topic都创立成功了~

获取音讯发送的成果

有时候咱们发送音讯不知道是不是发成功了,需求有一个成果告知。有两种方法,一种是同步一种是异步

同步获取成果

/**
     * 获取告知成果
     * @return
     */
    @GetMapping("/hello2")
    public String hello2() {
        // 同步获取成果
        ListenableFuture<SendResult<Object,Object>> future = kafkaTemplate.send("hello2","hello2");
        try {
            SendResult<Object,Object> result = future.get();
            log.info("success >>> {}", result.getRecordMetadata().topic()); // success >>> hello2
        }catch (Throwable e){
            e.printStackTrace();
        }
        return "hello2";
    }

异步获取

/**
     * 获取告知成果
     * @return
     */
    @GetMapping("/hello2")
    public String hello2() {
        // 发送音讯 - 异步获取告知成果
        kafkaTemplate.send("hello2", "async hello2").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("fail >>>>{}", throwable.getMessage());
            }
            @Override
            public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
                log.info("async success >>> {}", objectObjectSendResult.getRecordMetadata().topic()); // async success >>> hello2
            }
        });
        return "hello2";
    }

Kafka业务

相同的,音讯也会存在业务,假如第一条音讯发送成功,再发第二条音讯的时候出现反常,那么就会抛出反常并回滚第一条音讯,下面通过一个简略的例子领会一下

@GetMapping("/hello3")
public String hello3() {
    kafkaTemplate.executeInTransaction(t -> {
        t.send("hello3","msg1");
        if(true)
            throw new RuntimeException("failed");
        t.send("hello3","msg2");
        return true;
    });
    return "hello3";
}
// 接纳音讯
@KafkaListener(id = "hello3Group", topics = "hello3")
public void listen3(String msg) {
    log.info("hello3 receive value: {}" , msg);
}

默许情况下,Spring-kafka主动生成的KafkaTemplate实例,是不具有业务音讯发送才能的。咱们需求添加transaction-id-prefix来激活它

spring:
  kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092
      transaction-id-prefix: kafka_.

发动之后,调查控制台的改变~ ,除此之外,还能够运用注解的方法@Transactional来敞开业务

// 注解方法
    @Transactional(rollbackFor = RuntimeException.class)
    @GetMapping("/hello4")
    public String hello4() {
        kafkaTemplate.send("hello3","msg1");
        if(true)
            throw new RuntimeException("failed");
        kafkaTemplate.send("hello3","msg2");
        return "hello4";
    }

结束语

下节带咱们看下SpringBoot整合Kafka深入运用~

本着把自己知道的都告知咱们,假如本文对您有所帮助,点赞+关注鼓励一下呗~

相关文章

  • 一同来学kafka之Kafka集群建立

ElasticSearch 专题学习

  • 利用docker建立es集群

  • 一同来学ElasticSearch(一)

  • 一同来学ElasticSearch(二)

  • 一同来学ElasticSearch(三)

  • 一同来学ElasticSearch(四)

  • 一同来学ElasticSearch(五)

  • 一同来学ElasticSearch(六)

  • 一同来学ElasticSearch(七)

  • 一同来学ElasticSearch(八)

  • 一同来学ElasticSearch(九)

  • 一同来学ElasticSearch(十)

  • 一同来学ElasticSearch之整合SpringBoot(一)

  • 一同来学ElasticSearch之整合SpringBoot(二)

  • 一同来学ElasticSearch之整合SpringBoot(三)

项目源码(源码已更新 欢迎star⭐️)

  • springboot-es-all: https://github.com/qiuChengleiy/springboot-es-all

往期并发编程内容推荐

  • Java多线程专题之线程与进程概述
  • Java多线程专题之线程类和接口入门
  • Java多线程专题之进阶学习Thread(含源码剖析)
  • Java多线程专题之Callable、Future与FutureTask(含源码剖析)
  • 面试官: 有了解过线程组和线程优先级吗
  • 面试官: 说一下线程的生命周期进程
  • 面试官: 说一下线程间的通信
  • 面试官: 说一下Java的同享内存模型
  • 面试官: 有了解过指令重排吗,什么是happens-before
  • 面试官: 有了解过volatile关键字吗 说说看
  • 面试官: 有了解过Synchronized吗 说说看
  • Java多线程专题之Lock锁的运用
  • 面试官: 有了解过ReentrantLock的底层完成吗?说说看
  • 面试官: 有了解过CAS和原子操作吗?说说看
  • Java多线程专题之线程池的根本运用
  • 面试官: 有了解过线程池的工作原理吗?说说看
  • 面试官: 线程池是怎么做到线程复用的?有了解过吗,说说看
  • 面试官: 阻塞队列有了解过吗?说说看
  • 面试官: 阻塞队列的底层完成有了解过吗? 说说看
  • 面试官: 同步容器和并发容器有用过吗? 说说看
  • 面试官: CopyOnWrite容器有了解过吗? 说说看
  • 面试官: Semaphore在项目中有运用过吗?说说看(源码剖析)
  • 面试官: Exchanger在项目中有运用过吗?说说看(源码剖析)
  • 面试官: CountDownLatch有了解过吗?说说看(源码剖析)
  • 面试官: CyclicBarrier有了解过吗?说说看(源码剖析)
  • 面试官: Phaser有了解过吗?说说看
  • 面试官: Fork/Join 有了解过吗?说说看(含源码剖析)
  • 面试官: Stream并行流有了解过吗?说说看

推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)

  • springboot-all

  • 地址: github.com/qiuChenglei…

  • SpringBoot系列教程合集

  • 一同来学SpringCloud合集

  • SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 完成授权码形式的服务认证(一)

  • SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 完成授权码形式的服务认证(二)

博客(阅览体验较佳)

  • 我的博客(阅览体验较佳)

  • 地址: github.com/qiuChenglei…