布景

项目中不同的事务可能会运用多个kafka,按默许的Kafka装备,最多是支撑顾客和生产者运用不同的Kafka,假如两个生产者运用不同的Kafka则需求自定义装备,生成对应的bean。

解决计划

多生产者,多顾客,运用不同的前缀来区别,依据前缀来区别装备,加载装备,实例化对应前缀的KafkaProperties kafkaListenerContainerFactory KafkaTemplate ,每个bean的名称都是带前缀的,运用的时分,依照需求注入对应的bean。

YML装备

spring:
  kafka:
    product:
      bootstrap-servers: 55.1.40.231:9091,55.6.70.231:9091,55.5.70.231:9091
      properties:
        sasl:
          mechanism: PLAIN
          jaas:
            config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="xxxx";
        security:
          protocol: SASL_PLAINTEXT
      producer:
        retries: 0
        acks: -1
        batch-size: 16384
        linger-ms: 0
        buffer-memory: 33554432
      consumer:
        group-id: consumer-group-id
        enable-auto-commit: true
        auto-commit-interval-ms: 1000
        auto-offset-reset: latest
        session-timeout-ms: 120000 
        request-timeout-ms: 180000
    order:
      bootstrap-servers: 55.10.33.132:9091,55.10.33.132:9092,55.10.33.132:9093,55.10.33.132:9094,55.10.33.132:9095,55.10.33.132:9096,55.10.33.132:9097,55.10.33.132:9098,55.10.33.132:9099,55.10.33.132:9100
      properties:
        sasl:
          mechanism: PLAIN
          jaas:
            config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user_order" password="xxxxxxx";
        security:
          protocol: SASL_PLAINTEXT
      producer:
        retries: 3
        acks: -1
        batch-size: 16384
        linger-ms: 0
        buffer-memory: 33554432
      consumer:
        group-id: order-migration
        enable-auto-commit: true
        auto-commit-interval-ms: 1000
        auto-offset-reset: latest
        session-timeout-ms: 120000
        request-timeout-ms: 180000

自定义KafkaProperties

运用KafkaProperties接纳装备,可是需求修正下前缀,可是KafkaProperties源码改不了,新写一个类继承KafkaProperties

@Component
@Primary
@ConfigurationProperties(prefix = "spring.kafka.order")
public class OrderKafkaProperties extends KafkaProperties{
}

假如没有Kafka默许装备,Kafka会主动实例化默许的KafkaProperties,假如有多个KafkaProperties实例,需求指定一个首选的bean,否则KafkaAnnotationDrivenConfiguration类中结构函数会报错。

Kafka多生产者消费者自动配置

所以在其间一个加上@Primary注解

KafkaTemplate和KafkaListenerContainerFactory装备

有了KafkaProperties就能够生成KafkaTemplateKafkaListenerContainerFactory实例

@Configuration
public class KafkaConfig {
    @Autowired
    private OrderKafkaProperties orderKafkaProperties;
    @Bean("orderKafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    private Map<String, Object> producerConfigs() {
        return contractKafkaProperties.buildProducerProperties();
    }
    @Bean("orderKafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(10);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    private ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    private Map<String, Object> consumerConfigs() {
        return contractKafkaProperties.buildConsumerProperties();
    }
}

这样就能够在其他当地直接运用了,生产者就直接@Autowired orderKafkaTemplate,假如是顾客,直接在@KafkaListenercontainerFactory特点指定orderKafkaListenerContainerFactory

假如有多个生产者顾客,就添加对应的装备即可。这样简化了装备的读取,除了加了前缀,其他的装备都是和Kafka默许装备一样的,复用Springboot的特点绑定,后续假如有其他装备,加上后能直接生效,无需修正代码。假如修正装备的结构需求代码中读取,然后手动设置,后期修正YML装备和代码都需求修正,比较费事。

计划演进

上述计划,假如需求新增一个Kafka的装备,需求新增一个前缀,然后新增对应装备代码,来生成KafkaPropertiesKafkaTemplateKafkaListenerContainerFactory实例,可是不同的前缀生成不同的实例代码都是重复的,并且所有的前缀、特点值都由YML装备能够得到,所以代码中生成带前缀的bean能够由代码主动生成,并注册到spring容器中。依据这个思路,写一个BeanFactoryAware的实现类。(Aware接口是结构提供给用户用户获取结构中一些目标的接口,比如BeanFactoryAware便是获取BeanFactory,结构会调用重写的setBeanFactory办法,将BeanFactory传给我们的实现类)

@Component
@Slf4j
public class EmallBeanFactoryAware implements BeanFactoryAware {
    @Autowired
    private Environment environment;
    private static final String SPRING_KAFKA_PREFIX = "spring.kafka";
    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        if (beanFactory instanceof DefaultListableBeanFactory) {
            DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;
            Binder binder = Binder.get(environment);
            //将YML中特点值映射到MAP中,后边依据装备前缀生成bean并注册到容器中,TODO 绑定可能有反常,加try catch稳一点
            BindResult<Map> bindResultWithPrefix = binder.bind(SPRING_KAFKA_PREFIX, Bindable.of(Map.class));
            if (!bindResultWithPrefix.isBound()) {
                return;
            }
            Map map = bindResultWithPrefix.get();
            Set set = map.keySet();
            Set<String> kafkaPropertyFiledNames = getKafkaPropertyFiledNames();
            //假如装备多个primary, 只设置第一个,TODO项目发动过程中,这个变量是否有并发问题
            boolean hasSetPrimary = false;
            //实例化每个带前缀的KafkaProperties、KafkaTemplate、
            for (Object object : set) {
                String prefix = object.toString();
                if (kafkaPropertyFiledNames.contains(prefix)) {
                    //不带前缀的正常装备疏忽
                    continue;
                }
                String configPrefix = SPRING_KAFKA_PREFIX + "." + prefix;
                BindResult<KafkaProperties> kafkaPropertiesBindResult;
                try {
                    kafkaPropertiesBindResult = binder.bind(configPrefix, Bindable.of(KafkaProperties.class));
                    if (!kafkaPropertiesBindResult.isBound()) {
                        continue;
                    }
                } catch (Exception e) {
                    //一些装备不是在KafkaProperties特点,可是也不是前缀装备,在这一步会绑定失败,比如spring.kafka.topics装备,
                    //一些装备的名称是带-,KafkaProperties特点是驼峰,绑定是会出反常的,反常疏忽
                    log.error("auto register kafka properties error, prefix is: {}", configPrefix);
                    continue;
                }
                //注册生产者(TODO 没装备生产者是否会报错)
                KafkaProperties kafkaProperties = kafkaPropertiesBindResult.get();
                String propertiesBeanName = prefix + "KafkaProperties";
                boolean isBeanExist = defaultListableBeanFactory.containsBean(propertiesBeanName);
                if (!isBeanExist) {
                    String primaryConfig = configPrefix + ".primary";
                    //没有默许的kafka装备,需求设置下primary
                    BindResult<Boolean> primaryBindResult = binder.bind(primaryConfig, Bindable.of(Boolean.class));
                    if (primaryBindResult.isBound() && primaryBindResult.get() && !hasSetPrimary) {
                        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(KafkaProperties.class);
                        defaultListableBeanFactory.registerBeanDefinition(propertiesBeanName, beanDefinitionBuilder.getBeanDefinition());
                        defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);
                        defaultListableBeanFactory.getBeanDefinition(propertiesBeanName).setPrimary(true);
                        hasSetPrimary = true;
                    } else {
                        defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);
                    }
                }
				//注册生产者KafkaTemplate
                String templateBeanName = prefix + "KafkaTemplate";
                if (!defaultListableBeanFactory.containsBean(templateBeanName)) {
                    KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(
                            new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()));
                    defaultListableBeanFactory.registerSingleton(templateBeanName, kafkaTemplate);
                }
                String beanName = prefix + "KafkaListenerContainerFactory";
                if (!defaultListableBeanFactory.containsBean(beanName)) {
                    //注册顾客listener(TODO 没装备顾客是否会报错)
                    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
                    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
                    factory.setConcurrency(10);
                    factory.getContainerProperties().setPollTimeout(3000);
                    defaultListableBeanFactory.registerSingleton(beanName, factory);
                }
            }
        }
    }
    private static Set<String> getKafkaPropertyFiledNames () {
        Set<String> names = new HashSet<>();
        Field[] declaredFields = KafkaProperties.class.getDeclaredFields();
        if (declaredFields.length == 0) {
            return names;
        }
        for (Field declaredField : declaredFields) {
            names.add(declaredField.getName());
        }
        return names;
    }
}

遇到的问题

手动注册的bean代码中@Autowire无法注入

手动注册的无法@Autowire,直接加@Lazy注解,先疏忽bean注册的先后顺序

多个KafkaProperties实例,无法确定运用哪一个

因为运用前缀的装备方法,bean名称也是带前缀的,没有默许的Kafka装备,结构会主动生成对应的bean,KafkaAnnotationDrivenConfiguration中的KafkaProperties 特点是依据类型注入的,假如装备有多个前缀,注入的时分无法确定运用哪一个,所以添加一个primary装备,主动生成的时分设置下。

既有带前缀,又有不带前缀运用默许装备的

主动装备代码中有一段是依据yml中装备的key,判别是否是KafkaProperties类中的字段,假如是就疏忽,让结构主动按默许装备,有些字段yml中是带-,如bootstrap-serversKafkaProperties中是驼峰命名bootstrapServers,绑定的时分会抛反常,影响应用发动,这种反常能够疏忽,直接用try catch捕获。

设置Bean为Primary

第二个问题中,多个相同类型的Bean如何设置其间一个bean为Primary,手动注册bean,假如有实例目标,能够直接运用BeanFactoryregisterSingleton(beanName, object),假如没有实例目标,能够直接运用类名,经过BeanFactoryregisterBeanDefinition(beanName, beanDefinition)来注册,假如要设置bean为Primary,必须经过BeanDefinition来设置,可是经过结构的绑定是直接生成实例目标的,假如经过registerSingleton来注册,经过beanName获取BeanDefinition是会抛反常的,因为没有BeanDefinition,所以需求将目标实例和BeanDefinition相关起来,便是上面这段代码

//注册BeanDefinition
defaultListableBeanFactory.registerBeanDefinition(propertiesBeanName, beanDefinitionBuilder.getBeanDefinition());
//注册目标实例,运用相同的bean名称
defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);
//再获取BeanDefinition就能获取到,并且这个bean便是上面注册的实例目标
defaultListableBeanFactory.getBeanDefinition(propertiesBeanName).setPrimary(true);