前语

目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜爱的话,给个关注❤️ ~

本节给咱们讲一下Kafka中偏移量(offset)的概念并结合经典面试题来看下它的实践运用场景~

好了, 废话不多说直接开整吧~

什么是分区 & Partition

在讲之前呢,先理一下什么是分区,在第一节的时分有给咱们提到过

Kafka中,一个主题(topic)能够分红多个分区。每个分区都是一个有序音讯行列,它们能够在不同的服务器上进行仿制,以提高可靠性可扩展性。每个分区都有一个仅有的标识符(partition ID),用于标识该分区。

什么是偏移量 & Offset

偏移量Kafka中用于标识音讯在分区中方位的一个数字。每个音讯都有一个仅有的偏移量,它是由Kafka分配的,而且在分区中是递增的。偏移量能够用于回溯分区中的音讯,也能够用于跟踪现已消费的音讯。

   +---------+          +---------+
   |         |          |         |
   |Topic    |          |Topic    |
   |         |          |         |
   +---------+          +---------+
   |         |          |         |
   |Partition 1          Partition 2
   |         |          |         |
   +---------+          +---------+
   |         |          |         |
   |Offset 1 |          |Offset 1 |
   |         |          |         |
   +---------+          +---------+
   |         |          |         |
   |Offset 2 |          |Offset 2 |
   |         |          |         |
   +---------+          +---------+
   |         |          |         |
   |   ...   |          |   ...   |
   |         |          |         |
   +---------+          +---------+

在这个暗示图中,Kafka中有两个主题(Topic),每个主题都有两个分区(Partition)。每个分区都有一个仅有的分区ID,而且包含一系列有序的音讯。每个音讯都有一个仅有偏移量(Offset),用于标识它在分区中的方位。

实践运用 & offset

Kafka中的偏移量(Offset)是一个非常重要的概念,它指的是顾客在一个特定分区的音讯中的方位Kafka运用偏移量来确保顾客能够从前次离开的地方持续消费,从而确保音讯的次序性可靠性

以下是一些实践运用场景,演示了怎么运用偏移量来处理不同的状况:

  • 从头消费音讯:假定一个顾客在处理音讯时发生了毛病或错误,导致它无法处理后续的音讯。在这种状况下,咱们能够将顾客的偏移量重置为较早的方位,以从头消费之前未能处理的音讯。

  • 手动提交偏移量Kafka顾客API支持手动提交偏移量的方法,这能够用于优化顾客的性能操控偏移量的提交。在手动提交偏移量时,顾客能够依据自己的需求决定何时提交偏移量,而且能够依据音讯的处理状况进行批量提交或独自提交。

  • 顾客组和谐器Kafka顾客API中的顾客组和谐器担任办理顾客组中的偏移量。当顾客参加或离开顾客组时,和谐器将从头分配偏移量,以确保顾客能够从正确的方位开端消费。

  • 并发消费:在某些状况下,咱们或许需求运用多个顾客来并发地消费同一个主题的音讯。在这种状况下,每个顾客都能够独登时办理自己的偏移量,并依据自己的需求进行提交,以确保每个顾客都能够独登时处理音讯。

总之,偏移量在Kafka中具有重要的作用,它能够协助咱们完成音讯的次序性可靠性,并供给了一些方便的方法来处理不同的运用场景。

Kafka 音讯丢掉

Kafka一定能确保音讯不丢掉吗?答案是否定的。前面几节在讲顾客消费音讯的时分都是主动提交偏移量,这儿说一下主动提交的概念,

默许状况下,Kafka会运用主动提交偏移量的方法来办理偏移量。这意味着,每当顾客Kafka中拉取一批音讯并消费结束后,它将主动提交偏移量,以便下一次顾客拉取音讯时能够从前次提交的偏移量处开端消费音讯。

接下来,咱们就简单的模仿一下音讯丢掉的场景~

新建一个OffsetController,完成一个小需求,首要发送10条音讯到topic1,消费成功后将结果发送到topic2

@Slf4j
@RestController
public class OffsetController {
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;
    @GetMapping("/hello")
    public String hello() throws Exception {
        // 发送音讯
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            kafkaTemplate.send("topic1", message);
            log.info("Sent message: {}", message);
            Thread.sleep(1000);
        }
        return "hello";
    }
    @KafkaListener(topics = "topic1", id = "to1")
    public void listen1(String message) {
        log.info("listen1 Received message >>> {}", message);
        kafkaTemplate.send("topic2", message);
    }
    @KafkaListener(topics = "topic2", id = "to2")
    public void listen2(String message) {
        log.info("listen2 Received message >>> {}", message);
    }
}

看下正常状况下的音讯消费:

2023-03-21 14:47:14.161  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 0
2023-03-21 14:47:14.177  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 0
2023-03-21 14:47:14.211  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 0
2023-03-21 14:47:15.168  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 1
2023-03-21 14:47:15.173  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 1
2023-03-21 14:47:15.178  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 1
2023-03-21 14:47:16.177  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 2
2023-03-21 14:47:16.184  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 2
2023-03-21 14:47:16.187  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 2
2023-03-21 14:47:17.189  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 3
2023-03-21 14:47:17.193  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 3
2023-03-21 14:47:17.198  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 3
2023-03-21 14:47:18.202  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 4
2023-03-21 14:47:18.205  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 4
2023-03-21 14:47:18.209  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 4
2023-03-21 14:47:19.205  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 5
2023-03-21 14:47:19.210  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 5
2023-03-21 14:47:19.218  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 5
2023-03-21 14:47:20.206  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 6
2023-03-21 14:47:20.210  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 6
2023-03-21 14:47:20.213  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 6
2023-03-21 14:47:21.221  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 7
2023-03-21 14:47:21.226  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 7
2023-03-21 14:47:21.232  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 7
2023-03-21 14:47:22.228  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 8
2023-03-21 14:47:22.232  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 8
2023-03-21 14:47:22.241  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 8
2023-03-21 14:47:23.230  INFO 117020 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 9
2023-03-21 14:47:23.234  INFO 117020 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 9
2023-03-21 14:47:23.238  INFO 117020 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 9

能够看到音讯是正常消费, 而且是次序消费

下面来改造一下顾客,假定顾客to1在某些状况下发生了反常或者宕机了

 @KafkaListener(topics = "topic1", id = "to1")
    public void listen1(String message) {
        if(message.contains("6"))
            throw new RuntimeException("体系反常");
        log.info("listen1 Received message >>> {}", message);
        kafkaTemplate.send("topic2", message);
    }
    @KafkaListener(topics = "topic2", id = "to2")
    public void listen2(String message) {
        if(String.format("Message %d", 6 + 1).equals(message)) {
            log.error("音讯丢掉, 音讯为 >>> {}", 6);
        }
        log.info("listen2 Received message >>> {}", message);
    }

看下反常状况下的输出

2023-03-21 15:21:28.405  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 0
2023-03-21 15:21:28.421  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 0
2023-03-21 15:21:28.430  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 0
2023-03-21 15:21:29.409  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 1
2023-03-21 15:21:29.413  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 1
2023-03-21 15:21:29.417  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 1
2023-03-21 15:21:30.415  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 2
2023-03-21 15:21:30.420  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 2
2023-03-21 15:21:30.424  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 2
2023-03-21 15:21:31.416  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 3
2023-03-21 15:21:31.420  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 3
2023-03-21 15:21:31.424  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 3
2023-03-21 15:21:32.430  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 4
2023-03-21 15:21:32.434  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 4
2023-03-21 15:21:32.438  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 4
2023-03-21 15:21:33.432  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 5
2023-03-21 15:21:33.436  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 5
2023-03-21 15:21:33.441  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 5
2023-03-21 15:21:34.435  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 6
2023-03-21 15:21:34.445 ERROR 130336 --- [      to1-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = topic1, partition = 0, offset = 73, CreateTime = 1679383294435, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Message 6)
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.kafka.study.controller.OffsetController.listen1(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: 体系反常; nested exception is java.lang.RuntimeException: 体系反常
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1272) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1261) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1188) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1159) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1099) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:934) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:750) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.RuntimeException: 体系反常
	at com.kafka.study.controller.OffsetController.listen1(OffsetController.java:36) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1224) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
	... 8 common frames omitted
2023-03-21 15:21:35.439  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 7
2023-03-21 15:21:35.442  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 7
2023-03-21 15:21:35.445 ERROR 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : 音讯丢掉, 音讯为 >>> 6
2023-03-21 15:21:35.445  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 7
2023-03-21 15:21:36.448  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 8
2023-03-21 15:21:36.451  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 8
2023-03-21 15:21:36.455  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 8
2023-03-21 15:21:37.449  INFO 130336 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 9
2023-03-21 15:21:37.455  INFO 130336 --- [      to1-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 9
2023-03-21 15:21:37.460  INFO 130336 --- [      to2-0-C-1] c.k.study.controller.OffsetController    : listen2 Received message >>> Message 9

从结果来看当顾客to1消费到Message 6这条音讯的时分报了反常导致音讯没有被消费成功,但是仍是正常提交了offset,接着持续消费,这就导致了音讯的丢掉。理论上讲没有消费成功的音讯应当从头消费,然后提交offset

那么怎么不主动提交offset呢? 这是一道比较经典的面试题

Kafka 手动提交 offset

由于咱们运用了主动提交偏移量的方法,而这个新的音讯是在顾客提交偏移量之后发送的,因而顾客不会收到这条新的音讯,这就导致了音讯丢掉的状况。为了避免这种状况,咱们应该运用手动提交偏移量的方法,以便在顾客完成一切音讯的消费后手动提交偏移量。这样一来,即便在顾客消费音讯的进程中出现反常或者顾客运用程序被封闭,咱们也能够确保音讯的完整性和可靠性。

接下来,咱们就去解决上述的问题。

首要更改一下KafkaConfig, 上节给咱们讲过,直接在这上边改, 在kafkaListenerContainerFactory()中增加如下代码

// 开启手动提交偏移量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

接着修正consumerConfigs(), 增加如下代码, 将主动提交封闭

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

然后更改下咱们的controller代码


 /**
    * 手动提交偏移量
    */
@GetMapping("/hello1")
public String hello1() throws Exception {
    // 发送音讯
    for (int i = 0; i < 10; i++) {
        String message = "Message " + i;
        kafkaTemplate.send("topic3", message);
        log.info("Sent message: {}", message);
        Thread.sleep(1000);
    }
    return "hello1";
}
@KafkaListener(topics = "topic3", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory")
public void onMessage(String message, Acknowledgment acknowledgment) {
    log.info("listen1 Received message >>> {}", message);
}

这个时分我并没有在监听器里手动提交offset, 运行之后恳求下,看下操控台信息

2023-03-21 16:45:38.408  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 1
2023-03-21 16:45:38.411  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 1
2023-03-21 16:45:39.415  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 2
2023-03-21 16:45:39.418  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 2
2023-03-21 16:45:40.419  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 3
2023-03-21 16:45:40.423  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 3
2023-03-21 16:45:41.434  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 4
2023-03-21 16:45:41.438  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 4
2023-03-21 16:45:42.447  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 5
2023-03-21 16:45:42.452  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 5
2023-03-21 16:45:43.460  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 6
2023-03-21 16:45:43.463  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 6
2023-03-21 16:45:44.462  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 7
2023-03-21 16:45:44.465  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 7
2023-03-21 16:45:45.474  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 8
2023-03-21 16:45:45.478  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 8
2023-03-21 16:45:46.484  INFO 135916 --- [nio-8081-exec-1] c.k.study.controller.OffsetController    : Sent message: Message 9
2023-03-21 16:45:46.488  INFO 135916 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 9

假如契合预期的话,顾客再次发动的时分,应该从前次消费的方位开端消费,下面咱们重启运用,过几秒后看下操控台信息

2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 0
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 1
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 2
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 3
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 4
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 5
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 6
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 7
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 8
2023-03-21 16:47:57.286  INFO 104896 --- [ntainer#0-0-C-1] c.k.study.controller.OffsetController    : listen1 Received message >>> Message 9

从结果来看,契合咱们的预期,由于Message 0这条音讯并没有手动提交offset所以下次进来的时分仍是从这个方位开端消费音讯

那怎么手动提交呢?其实很简单,增加如下代码即可

acknowledgment.acknowledge();

这样,在音讯被成功消费后,由运用本身手动提交offset这样能够确保咱们的音讯不会被丢掉

结束语

本节提到了ack的概念,它是kafka的一种承认机制,它用于确定音讯是否现已被成功处理,下节就结合实践运用场景给咱们顺一下这个概念~

本着把自己知道的都告知咱们,假如本文对您有所协助,点赞+关注鼓舞一下呗~

相关文章

  • 一起来学kafka之Kafka集群建立
  • 一起来学kafka之整合SpringBoot根本运用
  • 一起来学kafka之整合SpringBoot深入运用(一)

项目源码(源码已更新 欢迎star⭐️)

  • springboot-kafka-all: https://github.com/qiuChengleiy/springboot-kafka-all

ElasticSearch 专题学习

  • 运用docker建立es集群

  • 一起来学ElasticSearch(一)

  • 一起来学ElasticSearch(二)

  • 一起来学ElasticSearch(三)

  • 一起来学ElasticSearch(四)

  • 一起来学ElasticSearch(五)

  • 一起来学ElasticSearch(六)

  • 一起来学ElasticSearch(七)

  • 一起来学ElasticSearch(八)

  • 一起来学ElasticSearch(九)

  • 一起来学ElasticSearch(十)

  • 一起来学ElasticSearch之整合SpringBoot(一)

  • 一起来学ElasticSearch之整合SpringBoot(二)

  • 一起来学ElasticSearch之整合SpringBoot(三)

项目源码(源码已更新 欢迎star⭐️)

  • springboot-es-all: https://github.com/qiuChengleiy/springboot-es-all

往期并发编程内容引荐

  • Java多线程专题之线程与进程概述
  • Java多线程专题之线程类和接口入门
  • Java多线程专题之进阶学习Thread(含源码剖析)
  • Java多线程专题之Callable、Future与FutureTask(含源码剖析)
  • 面试官: 有了解过线程组和线程优先级吗
  • 面试官: 说一下线程的生命周期进程
  • 面试官: 说一下线程间的通讯
  • 面试官: 说一下Java的共享内存模型
  • 面试官: 有了解过指令重排吗,什么是happens-before
  • 面试官: 有了解过volatile关键字吗 说说看
  • 面试官: 有了解过Synchronized吗 说说看
  • Java多线程专题之Lock锁的运用
  • 面试官: 有了解过ReentrantLock的底层完成吗?说说看
  • 面试官: 有了解过CAS和原子操作吗?说说看
  • Java多线程专题之线程池的根本运用
  • 面试官: 有了解过线程池的作业原理吗?说说看
  • 面试官: 线程池是怎么做到线程复用的?有了解过吗,说说看
  • 面试官: 阻塞行列有了解过吗?说说看
  • 面试官: 阻塞行列的底层完成有了解过吗? 说说看
  • 面试官: 同步容器和并发容器有用过吗? 说说看
  • 面试官: CopyOnWrite容器有了解过吗? 说说看
  • 面试官: Semaphore在项目中有运用过吗?说说看(源码剖析)
  • 面试官: Exchanger在项目中有运用过吗?说说看(源码剖析)
  • 面试官: CountDownLatch有了解过吗?说说看(源码剖析)
  • 面试官: CyclicBarrier有了解过吗?说说看(源码剖析)
  • 面试官: Phaser有了解过吗?说说看
  • 面试官: Fork/Join 有了解过吗?说说看(含源码剖析)
  • 面试官: Stream并行流有了解过吗?说说看

引荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)

  • springboot-all

  • 地址: github.com/qiuChenglei…

  • SpringBoot系列教程合集

  • 一起来学SpringCloud合集

  • SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 完成授权码形式的服务认证(一)

  • SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 完成授权码形式的服务认证(二)

博客(阅览体会较佳)

  • 我的博客(阅览体会较佳)

  • 地址: github.com/qiuChenglei…