1、介绍

Apache Avro 是一种高效的数据序列化体系,用于在不同的应用程序和渠道之间传输和存储数据。它供给了一种紧凑且高效的二进制数据编码格局,比较其他常见的序列化方法,Avro能够完成更快的序列化和更小的数据存储。

而Confluent Schema Registry是由Confluent公司供给的一个开源组件,旨在处理分布式体系中的数据模式演化和兼容性的问题。它是建立在Apache Avro之上的一个服务,能够用于集中管理和存储Avro数据的模式(Schema),确保分布式体系中的数据一致性和兼容性。它广泛应用于事件流处理渠道(如Kafka),为数据流的可靠性和互操作性供给了支持。

本文将介绍如安在Spring Boot应用程序中整合Apache Avro和Confluent Schema Registry,以完成高效的数据序列化和管理。

本文代码示例: GitHub库房地址


2、Confluent Schema

1、下载

软件下载地址:Previous Versions – Confluent

本次运用:confluent-community-7.3.3 社区版,下载上传至Linux解压。

2、修正装备

修正装备文件:在 confluent-7.3.3/etc 文件夹下

confluent-7.3.3/etc/schema-registry/schema-registry.properties

# 装备Confluent Schema Registry 服务的访问IP和端口
listeners=http://0.0.0.0:8081
# 修正 Kafka集群指定引导服务器
kafkastore.bootstrap.servers=PLAINTEXT://xx.xx.xx.xx:9092,xx.xx.xx.xx:9192
# kafkastore.connection.url 装备zookeeper地址方法已弃用 
# 存储 schema 的 topic
kafkastore.topic=_schemas
# If true, API requests that fail will include extra debugging information, including stack traces
debug=false

3、发动

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

4、验证

curl -X POST 'http://localhost:8081/subjects/topic-test/versions' \
	 -H "Content-Type: application/vnd.schemaregistry.v1+json" \
     -d '{"schema": "{\"type\": \"string\"}"}'
 返回成果:{"id":1}

2、Springboot整合

1、引进xml

    <dependencies>
		<!--kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>3.0.7</version>
        </dependency>
        <!--avro-->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.1</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>7.4.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/io.confluent/kafka-schema-registry-client -->
        <!--schema-registry-client-->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>7.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
        </dependency>
    </dependencies>
    <!--schema-registry-client 远程库房-->
    <repositories>
        <!-- other maven repositories the project -->
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

2、导入Avro构建插件

  <plugin>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-maven-plugin</artifactId>
      <version>1.11.1</version>
      <executions>
          <execution>
              <phase>generate-sources</phase>
              <goals>
                  <goal>schema</goal>
              </goals>
              <configuration>
                  <!--schema 文件所在目录-->
                  <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
                  <!--依据schema 文件生成的类文件目录-->
                  <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
              </configuration>
          </execution>
      </executions>
  </plugin>

3、生成实体类

  • 在 ${project.basedir}/src/main/resources/avro/ 下创建 user.avsc文件。

  • 通过 Maven mvn compile 命令生产实体类

{
  "namespace": "com.jinunn.kraft.avro",  // 实体类寄存途径
  "type": "record",
  "name": "User",     // 实体类文件名
  "fields": [       // 实体类属性
    {
      "name": "id",  // 属性名
      "type": "int"  // 类型
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": "int"
    }
  ]
}

SpringBoot3 配置文件整合 Apache Avro

4、SpringBoot装备文件

server:
  port: 8086
spring:
  application:
    name: kafka
  kafka:
  	# 集群地址
    bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9192,xx.xx.xx.xx:9292
    producer:
      # 设置key的序列化类
      key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      # 设置value的序列化类
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      # ack策略
      # 0:生产者发送音讯就不管了,效率高,但是容易丢数据,且没有重试机制
      # 1:音讯发送到Leader并落盘后就返回,假如Leader挂了而且Follower还没有同步数据就会丢失数据
      #-1:音讯要所有副本都拷贝才返回,确保数据不丢失(但是有或许重复消费)
      acks: 1
      # 失败重试次数
      retries: 3
      # 批量提交的数据巨细
      batch-size: 16384
      # 生产者暂存数据的缓冲区巨细
      buffer-memory: 33554432
    consumer:
      # key的反序列化类
      key-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      # 是否主动提交偏移量,假如要手动承认音讯,就要设置为false
      enable-auto-commit: false
      # 消费音讯后距离多长时间提交偏移量(ms)
      auto-commit-interval: 100
      # 默许的顾客组,假如不指定就会用这个
      group-id: groupId
      # kafka意外宕机时的音讯消费策略
      # earliest:当各分区下有已提交的offset时,从提交的offset开端消费;无提交的offset时,从头开端消费
      # latest:当各分区下有已提交的offset时,从提交的offset开端消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开端消费;只需有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
    listener:
      # 手动承认音讯
      ack-mode: manual_immediate
      # 顾客运行的线程数
      concurrency: 2
    properties:
      # confluent schema 地址
      schema:
        registry:
          url: http://xx.xx.xx.xx:8081

5、音讯生产者

@RestController
@RequestMapping("/send")
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class Producer {
    private final KafkaTemplate<String, User> kafkaTemplate;
    @GetMapping("/test")
    public void sendMsg() {
        for (int i = 0; i < 10; i++) {
            User user = new User();
            user.setId(i);
            user.setName("Jin" + i);
            user.setAge(35 + i);
            kafkaTemplate.send(AvroConsumer.TOPIC_NAME, user);
        }
    }
}

6、音讯顾客

@Slf4j
@Component
public class AvroConsumer {
    public static final String TOPIC_NAME = "test";
    @KafkaListener(topics = TOPIC_NAME, groupId = "test-group")
    public void consume(ConsumerRecord<String, User> record, Acknowledgment ack) {
        log.info("value #=>:{}", record.value());
        // 手动提交ack
        ack.acknowledge();
    }
}

7、成果

SpringBoot3 配置文件整合 Apache Avro