上篇文章描述了运用RabbitMQ原生API来完成音讯的处理,在实际项目开发中,略显繁琐。咱们就能够运用springboot的RabbitMQ插件进行简略装备就能够完成。

一、springboot集成RabbitMQ

1-1、增加依靠

首要创立一个springboot项目,然后增加下面依靠

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1-2、增加装备信息

spring.rabbitmq.host=192.168.253.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/mirror

1-3、创立行列及交换机

创立行列及交换机并进行绑定除了在控制台进行操作外,还能够利用程序进行创立,下面来分别介绍一下

1-3-1、运用@Configuration在项目发动时进行创立

运用@Configuration需求留意的是下面声明queue和exchange以及绑定 的办法需求增加@Bean

这种办法创立的优点便是项目发动之后就能够完成创立,缺陷是假如想动态创立就无法完成了

绑定交换机和行列运用BindingBuilder.bind(行列).to(交换机)

1-3-1-1、创立fanout交换机

@Configuration
public class FanoutConfig {
   //声明行列
   @Bean
   public Queue fanoutQ1() {
      return new Queue("finout.queue1");
   }
   //声明exchange
   @Bean
   public FanoutExchange setFanoutExchange() {
      return new FanoutExchange("fanout.exchange");
   }
   //声明Binding,exchange与queue的绑定联系
   @Bean
   public Binding bindQ1() {
      return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange());
   }
}

1-3-1-2、创立topic交换机

绑定交换机和行列运用BindingBuilder.bind(行列).to(交换机).with(路由键)

@Configuration
public class TopicConfig {
   //声明行列
   @Bean
   public Queue topicQ1() {
      return new Queue("topic.queue1");
   }
   //声明exchange
   @Bean
   public TopicExchange setTopicExchange() {
      return new TopicExchange("topic.exchange");
   }
   //声明binding,需求声明一个roytingKey
   @Bean
   public Binding bindTopicHebei1() {
      return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("hunan.*");
   }
  }

1-3-1-3、创立header交换机

绑定交换机和行列有三种办法如下:

BindingBuilder.bind(行列).to(交换机).where(key).matches(val)

BindingBuilder.bind(行列).to(交换机).whereAny(Map).match()

BindingBuilder.bind(行列).to(交换机).whereAll(Map).match()

完成代码

@Configuration
public class HeaderConfig {
   //声明queue
   @Bean
   public Queue headQueueTxTyp1() {
      return new Queue("txTyp1");
   }
   @Bean
   public Queue headQueueBusTyp1() {
      return new Queue("busTyp1");
   }
   @Bean
   public Queue headQueueTxBusTyp() {
      return new Queue("txbusTyp1");
   }
   //声明exchange
   @Bean
   public HeadersExchange setHeaderExchange() {
      return new HeadersExchange("headerExchange");
   }
   //声明Binding
   //绑定header中txtyp=1的行列。header的行列匹配能够用mathces和exisits
   @Bean
   public Binding bindHeaderTxTyp1() {
      return BindingBuilder.bind(headQueueTxTyp1()).to(setHeaderExchange()).where("txTyp").matches("1");
   }
   //绑定Header中busTyp=1的行列。
   @Bean 
   public Binding bindHeaderBusTyp1() {
      return BindingBuilder.bind(headQueueBusTyp1()).to(setHeaderExchange()).where("busTyp").matches("1");
   }
   //绑定Header中txtyp=1或者busTyp=1的行列。
   @Bean 
   public Binding bindHeaderTxBusTyp1() {
      Map<String,Object> condMap = new HashMap<>();
      condMap.put("txTyp", "1");
      condMap.put("busTyp", "1");
      return BindingBuilder.bind(headQueueTxBusTyp()).to(setHeaderExchange()).whereAny(condMap).match();
   }
}

1-3-2、运用AmqpAdmin

运用AmqpAdmin优点是能够随时进行行列及路由的创立和绑定

需求留意的是,创立交换机需求运用各自类型的交换机进行创立:

new FanoutExchange()

new TopicExchange()

new HeadersExchange()

@Autowired
private AmqpAdmin amqpAdmin;
@GetMapping("initFanoutExchangeQueue")
public Object initQueue(){
   String queueName="fanout.queue1";
   String exchangeName="fanout.exchange";
   //创立行列
   Queue queue = new Queue(queueName, false, false, false, null);
   amqpAdmin.declareQueue(queue);
   //创立交换机
   FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange", false, false, null);
   amqpAdmin.declareExchange(fanoutExchange);
   //绑定
   amqpAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE,exchangeName,"",null));
   return "success";
}

1-4、发送音讯

RabbitMQ运用起来和redis很类似,redis运用RedisTemplate,RabbitMQ则运用RabbitTemplate,下面来看下如何运用

在springboot中运用RabbitTemplate无法发送stream行列音讯,由于发送stream行列音讯,需求设置basicQos属性,而basicQos属性需求经过channel来设置,RabbitTemplate现已进行封装,运用的时候无法获取channel,因此无法发送stream类型行列音讯;

1-4-2、给指定行列发送音讯

首要设置音讯的相关参数,最终调用rabbitTemplate.send(行列称号,音讯内容,恳求参数)进行发送

      String message="hello word";
//设置部分恳求参数
      MessageProperties messageProperties = new MessageProperties();
      messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
      //发音讯
      rabbitTemplate.send("directqueue",new Message(message.getBytes("UTF-8"),messageProperties));

1-4-3、运用exchange的fanout发送音讯

向交换机发送音讯和向行列发送类似

第一步:设置音讯相关参数

MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

第二步:发送音讯

rabbitTemplate.send(行列称号,路由称号,音讯,音讯相关参数)

String message="fanoutmessage";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//fanout模式只往exchange里发送音讯。分发到exchange下的所有queue
rabbitTemplate.send(MyConstants.EXCHANGE_FANOUT, "", new Message(message.getBytes("UTF-8"),messageProperties));

1-4-4、运用exchange的topic发送音讯

String routingKey="routingkey";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//发送音讯
rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));

1-4-5、运用exchange的header发送音讯

MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//设置header信息
messageProperties.setHeader("name", "admin");
messageProperties.setHeader("pass", "123");
//发送音讯
rabbitTemplate.send("headerExchange", "uselessRoutingKey", new Message(message.getBytes("UTF-8"),messageProperties));

1-4-6、运用quorum行列发送音讯

//设置部分恳求参数
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//发音讯
rabbitTemplate.send("QUEUE_QUORUM",new Message(message.getBytes("UTF-8"),messageProperties));

1-5、接纳音讯

顾客都是经过@RabbitListener注解来声明。注解中包含了声明顾客行列时所需求的重点参数。对照原生API,这些参数就不难了解了。

可是当要消费Stream行列时,仍是要重点留意他的三个必要的进程:

  • channel必须设置basicQos属性。 channel目标能够在@RabbitListener声明的顾客办法中直接引证,Spring结构会进行注入。
  • 正确声明Stream行列。 经过往Spring容器中注入Queue目标的办法声明行列。在Queue目标中传入声明Stream行列所需求的参数。
  • 消费时需求指定offset。 能够经过注入Channel目标,运用原生API传入offset属性。

​ 运用SpringBoot结构集成RabbitMQ后,开发进程能够得到很大的简化,所以运用进程并不难,对照一下示例就能很快上手。可是,需求了解一下的是,SpringBoot集成后的RabbitMQ中的很多概念,虽然都能跟原生API对应上,可是这些模型中心都是做了转换的,比方Message,就不是原生RabbitMQ中的音讯了。运用SpringBoot结构,尤其需求加深对RabbitMQ原生API的了解,这样才干以不变应万变,深入了解各种看起来简略,可是其实坑很多的各种目标声明办法。

首要经过给办法加上如下注解,并设置行列即可消费对应的音讯

@RabbitListener(queues=MyConstants.QUEUE_Name)

1-5-1、接纳音讯

接纳音讯的办法,不论是直接接纳发送到行列的,仍是发送到exchange交换机,办法都是一样,首要差异便是绑定交换机的行列,依据绑定的行列不同而接纳对应的音讯

@RabbitListener(queues=MyConstants.QUEUE_Name)
public void directReceive2(String message) {
   System.out.println("consumer2 received message : " +message);
}

1-5-2、普通行列音讯

发送音讯,第一个参数为行列称号,假如有多个消费此行列的消费端,只有一个消费端能够进行消费

rabbitTemplate.send("directqueue",new Message(message.getBytes("UTF-8"),messageProperties));

接纳音讯注解

@RabbitListener(queues="directqueue")

1-5-3、交换机fanout类型接纳音讯

发送音讯,第一个参数为交换机称号,和交换机绑定的行列都能够消费音讯

rabbitTemplate.send("EXCHANGE_FANOUT", routingkey, new Message(message.getBytes("UTF-8"),messageProperties));

接纳音讯,传入和上面EXCHANGE_FANOUT交换机绑定的行列,即可消费音讯

@RabbitListener(queues="fanout_queue1")

1-5-4、交换机topic类型接纳音讯

发送音讯,指定一个topic类型的交换机,而且设置routingkey

rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));

接纳音讯:

留意这个模式会有优先匹配原则。例如发送routingKey=beijing.haidian,那匹配到beijing.* (beijing.haidian,beijing.chaoyang),之后就不会再去匹配*.haidian(XXX.haidian)

@RabbitListener(queues="beijing.haidian")

1-5-5、交换机header类型接纳音讯

如下图三个行列绑定了headerExchange交换机

busType1:当header中有busTyp=1则接纳音讯,即使设置其他key-val也能够

txType1:当header中有txTyp=1则接纳音讯,即使设置其他key-val也能够

txbusType1:当header中有busTyp=1或者txTyp:1则能够接纳音讯(由于设置的x-match:any,假如是all则两个都必须满意)

SpringBoot集成RabbitMQ实现消息收发