前提
公司在做一些金融相关事务,某些时候由于数据供给商定时维护或许特别原因需求暂停某些服务的顾客。之前选用的音讯行列技能栈是RabbitMQ,用于微服务之间的音讯投递,关于这类需求暂停顾客的场景是选用注释掉顾客Bean中的相应Spring(Boot)注解从头发布来完成,后边需求从头发动消费便是解开对应的注释再发布一次。这样的处理流程既繁琐,也显得没有技能含量,所以笔者就这个问题结合已有的装备中心Nacos集群做了一个计划,运用Nacos的装备准实时改写功用去控制某个微服务实例的一切RabbitMQ顾客(容器)的停止和发动。
计划原理
下面讨论一下计划的原理和可行性,首要包括:
- RabbitMQ顾客生命周期办理
- Nacos长轮询与装备改写
由于工作中的首要技能栈是SpringBoot + RabbitMQ,下文是讨论场景针对spring-boot-starter-amqp(下面简称amqp)展开。
运用SpringBoot版别为2.3.0.RELEASE,spring-cloud-alibaba-nacos-config的版别为2.2.0.RELEASE
RabbitMQ顾客生命周期办理
查看RabbitAnnotationDrivenConfiguration的源码:
amqp中默许启用spring.rabbitmq.listener.type=simple,运用的RabbitListenerContainerFactory(音讯监听器容器工厂)完成为SimpleRabbitListenerContainerFactory,运用的MessageListenerContainer(音讯监听器容器)完成为SimpleMessageListenerContainer。在amqp中,不管注解声明式或许编程式注册的顾客终究都会封装为MessageListenerContainer实例,因而顾客生命周期能够直接经过MessageListenerContainer进行办理,MessageListenerContainer的生命周期办理API会直接作用于最底层的实在顾客完成BlockingQueueConsumer。几者的联系如下:
一般声明式顾客注册方法如下:
@Slf4j
@RabbitListener(id = "SingleAnnoMethodDemoConsumer", queues = "srd->srd.demo")
@Component
public class SingleAnnoMethodDemoConsumer {
@RabbitHandler
public void onMessage(Message message) {
log.info("SingleAnnoMethodDemoConsumer.onMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
@RabbitListener(id = "MultiAnnoMethodDemoConsumer", queues = "srd->srd.demo")
@Component
@Slf4j
public class MultiAnnoMethodDemoConsumer {
@RabbitHandler
public void firstOnMessage(Message message) {
log.info("MultiAnnoMethodDemoConsumer.firstOnMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
@RabbitHandler
public void secondOnMessage(Message message) {
log.info("MultiAnnoMethodDemoConsumer.secondOnMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
@Component
@Slf4j
public class MultiAnnoInstanceDemoConsumer {
@RabbitListener(id = "MultiAnnoInstanceDemoConsumer-firstOnInstanceMessage", queues = "srd->srd.demo")
public void firstOnInstanceMessage(Message message) {
log.info("MultiAnnoInstanceDemoConsumer.firstOnInstanceMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
@RabbitListener(id = "MultiAnnoInstanceDemoConsumer-secondOnInstanceMessage", queues = "srd->srd.sec")
public void secondOnInstanceMessage(Message message) {
log.info("MultiAnnoInstanceDemoConsumer.secondOnInstanceMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
关于根据@RabbitListener进行声明式注册的顾客,每个被@RabbitListener修饰的Bean或许办法终究都会独自生成一个SimpleMessageListenerContainer实例,这些SimpleMessageListenerContainer实例的唯一标识由@RabbitListener的id特点指定,缺省值为org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#N,主张在运用时候经过规范束缚有必要界说此id特点。分析源码能够得知这类型的顾客经过RabbitListenerAnnotationBeanPostProcessor进行发现和主动注册,而且在RabbitListenerEndpointRegistry缓存了注册信息,因而能够经过RabbitListenerEndpointRegistry直接获取这些声明式的顾客容器实例:
RabbitListenerEndpointRegistry endpointRegistry = configurableListableBeanFactory.getBean(
RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
RabbitListenerEndpointRegistry.class);
Set<String> listenerContainerIds = endpointRegistry.getListenerContainerIds();
for (String containerId : listenerContainerIds) {
MessageListenerContainer messageListenerContainer = endpointRegistry.getListenerContainer(containerId);
// do something with messageListenerContainer
}
一般编程式顾客注册方法如下:
// MessageListenerDemoConsumer
@Component
@Slf4j
public class MessageListenerDemoConsumer implements MessageListener {
@Override
public void onMessage(Message message) {
log.info("MessageListenerDemoConsumer.onMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
// CustomMethodDemoConsumer
@Component
@Slf4j
public class CustomMethodDemoConsumer {
public void customOnMessage(Message message) {
log.info("CustomMethodDemoConsumer.customOnMessage => {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
// configuration class
// 经过现存的MessageListener实例进行消费
@Bean
public SimpleMessageListenerContainer messageListenerDemoConsumerContainer(
ConnectionFactory connectionFactory,
@Qualifier("messageListenerDemoConsumer") MessageListener messageListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setListenerId("MessageListenerDemoConsumer");
container.setConnectionFactory(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setQueueNames("srd->srd.demo");
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setPrefetchCount(10);
container.setAutoStartup(true);
container.setMessageListener(messageListener);
return container;
}
// 经过IOC容器中某个Bean的详细办法进行消费
@Bean
public SimpleMessageListenerContainer customMethodDemoConsumerContainer(
ConnectionFactory connectionFactory,
CustomMethodDemoConsumer customMethodDemoConsumer) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setListenerId("CustomMethodDemoConsumer");
container.setConnectionFactory(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setQueueNames("srd->srd.demo");
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setPrefetchCount(10);
container.setAutoStartup(true);
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
messageListenerAdapter.setDelegate(customMethodDemoConsumer);
messageListenerAdapter.setDefaultListenerMethod("customOnMessage");
container.setMessageListener(messageListenerAdapter);
return container;
}
编程式注册的SimpleMessageListenerContainer能够直接从IOC容器中获取:
Map<String, MessageListenerContainer> messageListenerContainerBeans
= configurableListableBeanFactory.getBeansOfType(MessageListenerContainer.class);
if (!CollectionUtils.isEmpty(messageListenerContainerBeans)) {
messageListenerContainerBeans.forEach((beanId, messageListenerContainer) -> {
// do something with messageListenerContainer
});
}
至此,咱们知道能够比较轻松地拿到服务中一切的MessageListenerContainer的实例,从而能够办理服务内一切顾客的生命周期。
Nacos长轮询与装备改写
Nacos的客户端经过LongPolling(长轮询)的方法监听Nacos服务端集群对应dataId和group的装备数据改变,详细能够参阅ClientWorker的源码完成,完成的进程大致如下:
在非Spring(Boot)体系中,能够经过ConfigService#addListener()进行装备改变监听,示例代码如下:
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
properties.put(PropertyKeyConst.NAMESPACE, "LOCAL");
ConfigService configService = NacosFactory.createConfigService(properties);
Executor executor = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName("NacosConfigSyncWorker");
return thread;
});
configService.addListener("application-aplha.properties", "customer-service", new Listener() {
@Override
public Executor getExecutor() {
return executor;
}
@Override
public void receiveConfigInfo(String configInfo) {
// do something with 'configInfo'
}
});
这种LongPolling的方法现在来看可靠性是比较高,由于Nacos服务端集群一般在出产布置是大于3的奇数个实例节点,而且底层根据raft一致算法完成集群通讯,只需不是同一时间超越对折节点宕机集群仍是能正常供给服务。可是从完成上来看会有一些局限性:
- 假如注册过多的装备改变监听器有或许会对
Nacos服务端形成比较大的压力,毕竟是多个客户端进行轮询 - 装备改变是由
Nacos客户端向Nacos服务端发起请求,因而监听器回调有或许不是实时的(有或许延迟到客户端下一轮的LongPolling提交) -
Nacos客户端会缓存每次从Nacos服务端拉取的装备内容,假如要改变装备文件过大有或许导致缓存的数据占用大量内存,影响客户端所在服务的性能
关于装备改变监听其实有其他候选的计划,例如Redis的发布订阅,Zookeeper的节点途径改变监听乃至是运用音讯行列进行告诉,本文运用Nacos装备改变监听的原因是更好的划分不同运用装备文件的修改查看权限方便进行办理,其他候选计划要完成分权限办理需求二次开发
运用SpringCloudAlibaba供给的spring-cloud-alibaba-nacos-config能够愈加简洁地运用Nacos装备改写监听,而且会把改变的PropertySource从头绑定到对应的装备特点Bean。引进依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
</dependency>
详细的装备类是NacosConfigProperties:
红圈中是需求重视的装备项,refreshEnabled是装备改写的开关,默许是开启的。sharedConfigs和extensionConfigs虽然命名不同,可是两者完成和功用没有差异,都是相似于同享或许说扩展装备,每个同享(扩展)装备支撑独自装备改写开关。举个比如,在Nacos服务端的某个装备如下图:
为了支撑装备改变和对应的实体类成员变量更新,对应客户端的装备文件是这样的:
spring.cloud.nacos.config.refresh-enabled=true
spring.cloud.nacos.config.shared-configs[0].data-id=shared.properties
spring.cloud.nacos.config.shared-configs[0].group=shared-conf
spring.cloud.nacos.config.shared-configs[0].refresh=true
对应的装备特点Bean如下:
@Data
@ConfigurationProperties(prefix = "shared")
public class SharedProperties {
private String foo;
}
只需客户端所在SpringBoot服务发动完成后,修改Nacos服务端对应dataId为shared.properties的shared.foo特点值,那边SharedProperties的foo特点就会准实时改写。能够在SharedProperties增加一个@PostConstruct来调查这个特点更新的进程:
@Slf4j
@Data
@ConfigurationProperties(prefix = "shared")
public class SharedProperties {
private final AtomicBoolean firstInit = new AtomicBoolean();
private String foo;
@PostConstruct
public void postConstruct() {
if (!firstInit.compareAndSet(false, true)) {
log.info("SharedProperties refresh...");
} else {
log.info("SharedProperties first init...");
}
}
}
计划实施
整个计划实施包括下面几步:
- 装备改变告诉与装备类改写
- 发现一切顾客容器
- 办理顾客容器生命周期
初始化一个Maven项目,引进下面的依赖:
org.projectlombok:lombok:1.18.12org.springframework.boot:spring-boot-starter-web:2.3.0.RELEASEorg.springframework.boot:spring-boot-starter-amqp:2.3.0.RELEASEcom.alibaba.cloud:spring-cloud-alibaba-nacos-config:2.2.0.RELEASEcom.alibaba.nacos:nacos-client:1.4.4
下载Nacos服务而且发动一个单机实例(当前2023-02的最新稳定版为2.2.0),新建命名空间LOCAL而且增加四份装备文件:
能够运用1.x的Nacos客户端去衔接2.x的Nacos服务端,这个是Nacos做的向下兼容,反过来不行
前文说到的Nacos客户端中,ConfigService是经过dataId和group定位到详细的装备文件,一般dataId依照装备文件的内容命名,关于SpringBoot的运用装备文件一般命名为application-${profile}.[properties,yml],group是装备文件的分组,关于SpringBoot的运用装备文件一般命名为${spring.application.name}。笔者在在这份SpringBoot的运用装备文件中只增加了RabbitMQ的装备:
保证本地或许长途有一个可用的RabbitMQ服务,接下来往下开端实施计划。
装备改变告诉与装备类改写
前面已经说到过SpringBoot结合Nacos进行装备特点Bean的成员变量改写,在项目的Classpath(resources文件夹)增加bootstrap.properties文件,内容如下:
spring.application.name=rabbitmq-rocketmq-demo
spring.profiles.active=default
# nacos装备
spring.cloud.nacos.config.enabled=true
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
spring.cloud.nacos.config.namespace=LOCAL
spring.cloud.nacos.config.group=rabbitmq-rocketmq-demo
spring.cloud.nacos.config.prefix=application
spring.cloud.nacos.config.file-extension=properties
spring.cloud.nacos.config.refresh-enabled=true
spring.cloud.nacos.config.shared-configs[0].data-id=shared.properties
spring.cloud.nacos.config.shared-configs[0].group=shared-conf
spring.cloud.nacos.config.shared-configs[0].refresh=true
spring.cloud.nacos.config.extension-configs[0].data-id=extension.properties
spring.cloud.nacos.config.extension-configs[0].group=extension-conf
spring.cloud.nacos.config.extension-configs[0].refresh=true
spring.cloud.nacos.config.extension-configs[1].data-id=rabbitmq-toggle.properties
spring.cloud.nacos.config.extension-configs[1].group=rabbitmq-rocketmq-demo
spring.cloud.nacos.config.extension-configs[1].refresh=true
这儿profile界说为default也便是会关联到Nacos中dataId = 'application.properties', group = 'rabbitmq-rocketmq-demo'那份装备文件,首要是用于界说amqp需求的装备特点。关于RabbitMQ顾客的开关,界说在dataId = 'rabbitmq-toggle.properties', group = 'rabbitmq-rocketmq-demo'的文件中。增加RabbitmqToggleProperties:
// RabbitmqToggleProperties
@Slf4j
@Data
@ConfigurationProperties(prefix = "rabbitmq.toggle")
public class RabbitmqToggleProperties {
private final AtomicBoolean firstInit = new AtomicBoolean();
private List<RabbitmqConsumer> consumers;
@PostConstruct
public void postConstruct() {
if (!firstInit.compareAndSet(false, true)) {
StaticEventPublisher.publishEvent(new RabbitmqToggleRefreshEvent(this));
log.info("RabbitmqToggleProperties refresh, publish RabbitmqToggleRefreshEvent...");
} else {
log.info("RabbitmqToggleProperties first init...");
}
}
@Data
public static class RabbitmqConsumer {
private String listenerId;
private Integer concurrentConsumers;
private Integer maxConcurrentConsumers;
private Boolean enable;
}
}
// RabbitmqToggleRefreshEvent
@Getter
public class RabbitmqToggleRefreshEvent extends ApplicationEvent {
private final RabbitmqToggleProperties rabbitmqToggleProperties;
public RabbitmqToggleRefreshEvent(RabbitmqToggleProperties rabbitmqToggleProperties) {
super("RabbitmqToggleRefreshEvent");
this.rabbitmqToggleProperties = rabbitmqToggleProperties;
}
}
// StaticEventPublisher
public class StaticEventPublisher {
private static ApplicationEventPublisher PUBLISHER = null;
public static void publishEvent(ApplicationEvent applicationEvent) {
if (Objects.nonNull(PUBLISHER)) {
PUBLISHER.publishEvent(applicationEvent);
}
}
public static void attachApplicationEventPublisher(ApplicationEventPublisher publisher) {
PUBLISHER = publisher;
}
}
这儿prefix界说为rabbitmq.toggle,为了和rabbitmq-toggle.properties的特点逐个绑定,该文件中的装备Key有必要以rabbitmq.toggle为前缀。RabbitmqToggleProperties首次回调@PostConstruct办法只打印初始化日志,再次回调@PostConstruct办规律发布RabbitmqToggleRefreshEvent事情,用于后边告诉对应的顾客容器Bean进行启停。
发现一切顾客容器
为了统一办理服务中一切顾客容器Bean,需求界说一个相似于顾客容器注册或许缓存中心类,缓存Key能够考虑运用listenerId,Value就直接运用MessageListenerContainer实例即可:
private final ConcurrentMap<String, MessageListenerContainer> containerCache = Maps.newConcurrentMap();
这儿已然选定了listenerId作为缓存的Key,那么有必要界说好规范,要求不管注解声明式界说的顾客仍是编程式界说的顾客,有必要明确指定详细含义的listenerId,否则到时候存在Key的格局为org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#N会比较紊乱
接下来发现和缓存一切顾客容器:
private ConfigurableListableBeanFactory configurableListableBeanFactory;
private ApplicationEventPublisher applicationEventPublisher;
// ----------------------------------------------------------------------
// 获取声明式顾客容器
RabbitListenerEndpointRegistry endpointRegistry = configurableListableBeanFactory.getBean(
RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
RabbitListenerEndpointRegistry.class);
Set<String> listenerContainerIds = endpointRegistry.getListenerContainerIds();
for (String containerId : listenerContainerIds) {
MessageListenerContainer messageListenerContainer = endpointRegistry.getListenerContainer(containerId);
containerCache.putIfAbsent(containerId, messageListenerContainer);
}
// 获取编程式顾客容器
Map<String, MessageListenerContainer> messageListenerContainerBeans
= configurableListableBeanFactory.getBeansOfType(MessageListenerContainer.class);
if (!CollectionUtils.isEmpty(messageListenerContainerBeans)) {
messageListenerContainerBeans.forEach((beanId, bean) -> {
if (bean instanceof AbstractMessageListenerContainer) {
AbstractMessageListenerContainer abstractMessageListenerContainer = (AbstractMessageListenerContainer) bean;
String listenerId = abstractMessageListenerContainer.getListenerId();
if (StringUtils.hasLength(listenerId)) {
containerCache.putIfAbsent(listenerId, abstractMessageListenerContainer);
} else {
containerCache.putIfAbsent(beanId, bean);
}
} else {
containerCache.putIfAbsent(beanId, bean);
}
});
}
Set<String> listenerIds = containerCache.keySet();
listenerIds.forEach(listenerId -> log.info("Cache message listener container => {}", listenerId));
// 一切顾客容器Bean发现完成后才接纳改写事情
StaticEventPublisher.attachApplicationEventPublisher(this.applicationEventPublisher);
StaticEventPublisher中的ApplicationEventPublisher特点延迟到一切顾客容器缓存完成后赋值,避免过早的特点改变告诉导致部分顾客容器的启停操作被忽略。
办理顾客容器生命周期
接纳到RabbitmqToggleRefreshEvent事情后,然后遍历传递过来的RabbitmqToggleProperties里边的consumers,再根据已经发现的顾客容器进行处理,代码大概如下:
@EventListener(classes = RabbitmqToggleRefreshEvent.class)
public void onRabbitmqToggleRefreshEvent(RabbitmqToggleRefreshEvent event) {
RabbitmqToggleProperties rabbitmqToggleProperties = event.getRabbitmqToggleProperties();
List<RabbitmqToggleProperties.RabbitmqConsumer> consumers = rabbitmqToggleProperties.getConsumers();
if (!CollectionUtils.isEmpty(consumers)) {
consumers.forEach(consumerConf -> {
String listenerId = consumerConf.getListenerId();
if (StringUtils.hasLength(listenerId)) {
MessageListenerContainer messageListenerContainer = containerCache.get(listenerId);
if (Objects.nonNull(messageListenerContainer)) {
// running -> stop
if (messageListenerContainer.isRunning() && Objects.equals(Boolean.FALSE, consumerConf.getEnable())) {
messageListenerContainer.stop();
log.info("Message listener container => {} stop successfully", listenerId);
}
// modify concurrency
if (messageListenerContainer instanceof SimpleMessageListenerContainer) {
SimpleMessageListenerContainer simpleMessageListenerContainer
= (SimpleMessageListenerContainer) messageListenerContainer;
if (Objects.nonNull(consumerConf.getConcurrentConsumers())) {
simpleMessageListenerContainer.setConcurrentConsumers(consumerConf.getConcurrentConsumers());
}
if (Objects.nonNull(consumerConf.getMaxConcurrentConsumers())) {
simpleMessageListenerContainer.setMaxConcurrentConsumers(consumerConf.getMaxConcurrentConsumers());
}
}
// stop -> running
if (!messageListenerContainer.isRunning() && Objects.equals(Boolean.TRUE, consumerConf.getEnable())) {
messageListenerContainer.start();
log.info("Message listener container => {} start successfully", listenerId);
}
}
}
});
}
}
修改Nacos服务里边的rabbitmq-toggle.properties文件,输入内容如下:
rabbitmq.toggle.consumers[0].listenerId=MultiAnnoInstanceDemoConsumer-firstOnInstanceMessage
rabbitmq.toggle.consumers[0].enable=true
rabbitmq.toggle.consumers[1].listenerId=MultiAnnoInstanceDemoConsumer-secondOnInstanceMessage
rabbitmq.toggle.consumers[1].enable=true
rabbitmq.toggle.consumers[2].listenerId=MultiAnnoMethodDemoConsumer
rabbitmq.toggle.consumers[2].enable=true
rabbitmq.toggle.consumers[3].listenerId=SingleAnnoMethodDemoConsumer
rabbitmq.toggle.consumers[3].enable=true
rabbitmq.toggle.consumers[4].listenerId=CustomMethodDemoConsumer
rabbitmq.toggle.consumers[4].enable=true
rabbitmq.toggle.consumers[5].listenerId=MessageListenerDemoConsumer
rabbitmq.toggle.consumers[5].enable=true
发动项目,调查RabbitMQ WebUI对应的行列顾客数量:
然后随机修改rabbitmq-toggle.properties文件某个顾客容器设置为enable = 'fasle',调查服务日志和调查RabbitMQ WebUI的改变:
可见RabbitMQ WebUI中行列顾客数量削减,服务日志也提示listenerId = 'MessageListenerDemoConsumer'的顾客容器被停止了。
一些思考
为了更精确控制有顾客容器的启停,能够考虑在装备文件中界说关闭顾客容器的主动发动开关:
spring.rabbitmq.listener.simple.auto-startup=false
能够考虑在RabbitmqToggleProperties首次回调@PostConstruct办法时候发布RabbitmqToggleInitEvent事情,然后监听此事情发动一切已经发现的顾客容器。这样就能做到运用内部的顾客的启停行为总是以Nacos的开关装备文件为准,而且能够完成在线启停和动态调整最小最大顾客数量。
另外,假如细心的话能够调查到服务日志中,每逢监听到Nacos装备变动会打印Started application in N seconds (JVM running for M)的日志,这个并不是服务重启了,而是发动了一个Spring子容器用于构建一个全新的StandardEnvironment(见文末Demo项目中的EnvironmentCaptureApplicationRunner)用来承载改写后的装备文件内容,然后再复制或许覆盖到当前的Spring容器中的PropertySources,这个进程的代码完成相似这样:
小结
本文讨论了一种经过Nacos装备改写方法办理SpringBoot服务中RabbitMQ顾客生命周期办理的计划,现在只是供给了完好的思路和一些Demo级别代码,后续应该会完善计划和详细的工程级别编码完成。
本文Demo项目库房:
- framework-mesh/rabbitmq-rocketmq-demo
(本文完 c-3-d e-a-20230212)











