1、创建工程
勾选web及rabbitMQ
2、配置连接信息
新建配置文件:application.yml
server:
port: 8080
spring:
rabbitmq:
host: 101.16.36.124
username: guest
password: guest
port: 3046
queue:
video_queue: dev_video_consult_apply
tw_queue: dev_tw_consult_apply
ds_queue: dev_ds_consult_apply
jianGuan:
consult_apply: jian_guan_hlwyy_consult_apply
consult_diagose: jian_guan_hlwyy_consult_diagose
immediate_queue: immediate_queue
immediate_exchange: immediate_exchange
immediate_routing_key: immediate_routing_key
pay_task_queue: pay_task_queue
dead_letter_exchange: dead_letter_exchange
delay_routing_key: delay_routing_key
pay_remind_queue: pay_remind_queue
delay_remind_key: delay_remind_key
这里配置端口、rabbitmq连接、消费者队列
3、创建rabbitmq配置类
package com.laoxu.rabbittest.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Created with IntelliJ IDEA.
* User: HLW-RYH-1370
* Date: 2019/5/8
* Time: 15:02
* To change this template use File | Settings | File Templates.
*/
@Configuration
@Component
public class RabbitConfig {
/**
* ****************************
* 第一部分说明:
* 从配置文件中取出对应的队列配置
* ****************************
*/
private static final Integer DEFAULT_CONCURRENT = 5;
public static String IMMEDIATE_QUEUE;
@Value("${queue.immediate_queue}")
public void setImmediateQueue(String immediateQueue) {
IMMEDIATE_QUEUE = immediateQueue;
}
public static String IMMEDIATE_EXCHANGE;
@Value("${queue.immediate_exchange}")
public void setImmediateExchange(String immediateExchange) {
IMMEDIATE_EXCHANGE = immediateExchange;
}
public static String IMMEDIATE_ROUTING_KEY;
@Value("${queue.immediate_routing_key}")
public void setImmediateRoutingKey(String immediateRoutingKey) {
IMMEDIATE_ROUTING_KEY = immediateRoutingKey;
}
public static String PAY_TASK_QUEUE;
@Value("${queue.pay_task_queue}")
public void setPayTaskQueue(String payTaskQueue) {
PAY_TASK_QUEUE = payTaskQueue;
}
public static String PAY_REMIND_QUEUE ;
@Value("${queue.pay_remind_queue}")
public void setPayRemindQueueQueue(String payRemindQueue) {
PAY_REMIND_QUEUE = payRemindQueue;
}
//死信交换机名称
public static String DEAD_LETTER_EXCHANGE;
@Value("${queue.dead_letter_exchange}")
public void setDeadLetterExchange(String deadLetterExchange) {
DEAD_LETTER_EXCHANGE = deadLetterExchange;
}
public static String DELAY_ROUTING_KEY;
@Value("${queue.delay_routing_key}")
public void setDelayRoutingKey(String delayRoutingKey) {
DELAY_ROUTING_KEY = delayRoutingKey;
}
public static String DELAY_REMIND_KEY;
@Value("${queue.delay_remind_key}")
public void setDelayRemindKeyKey(String delayRemindKey) {
DELAY_REMIND_KEY = delayRemindKey;
}
public static String VIDEO_CONSULT_APPLY ;
@Value("${queue.video_queue}")
public void setVideoConsultApply(String videoConsultApply) {
VIDEO_CONSULT_APPLY = videoConsultApply;
}
public static String TW_CONSULT_APPLY;
@Value("${queue.tw_queue}")
public void setTwConsultApply(String twConsultApply) {
TW_CONSULT_APPLY = twConsultApply;
}
public static String DS_CONSULT_APPLY;
@Value("${queue.ds_queue:ds_consult_apply}")
public void setDsConsultApply(String dsConsultApply) {
DS_CONSULT_APPLY = dsConsultApply;
}
/**
* ****************************
* 第二部分说明:
* 根据第一部分创建对应的队列
* ****************************
*/
@Bean
public Queue videoConsultApply() {
return new Queue(VIDEO_CONSULT_APPLY, true);
}
@Bean
public Queue twConsultApply() {
return new Queue(TW_CONSULT_APPLY, true);
}
@Bean
public Queue dsConsultApply(){
return new Queue(DS_CONSULT_APPLY, true);
}
// 创建一个立即消费队列
@Bean
public Queue immediateQueue() {
// 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
return new Queue(IMMEDIATE_QUEUE, true);
}
// 创建一个延时队列
@Bean
public Queue payTaskQueue() {
Map<String, Object> params = new HashMap<>();
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-message-ttl",600000);
params.put("x-dead-letter-exchange", IMMEDIATE_EXCHANGE);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", IMMEDIATE_ROUTING_KEY);
return new Queue(PAY_TASK_QUEUE, false, false, false, params);
}
@Bean
public Queue payRemindQueue() {
Map<String, Object> params = new HashMap<>();
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-message-ttl",300000);
params.put("x-dead-letter-exchange", IMMEDIATE_EXCHANGE);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", IMMEDIATE_ROUTING_KEY);
return new Queue(PAY_REMIND_QUEUE, false, false, false, params);
}
/**
* ****************************
* 第三部分说明:
* 创建还未绑定的交换机
* ****************************
*/
@Bean
public DirectExchange immediateExchange() {
// 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
//第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
return new DirectExchange(IMMEDIATE_EXCHANGE, true, false);
}
@Bean
public DirectExchange deadLetterExchange() {
// 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
//第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
}
/**
* ****************************
* 第四部分说明:
* 交换机和对应的队列绑定到一起
* ****************************
*/
@Bean
//把立即消费的队列和立即消费的exchange绑定在一起
public Binding immediateBinding() {
return BindingBuilder.bind(immediateQueue()).to(immediateExchange()).with(IMMEDIATE_ROUTING_KEY);
}
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(payTaskQueue()).to(deadLetterExchange()).with(DELAY_ROUTING_KEY);
}
@Bean
public Binding remindBinding() {
return BindingBuilder.bind(payRemindQueue()).to(deadLetterExchange()).with(DELAY_REMIND_KEY);
}
/**
* ****************************
* 第五部分说明:
* 管理监听工厂
* ****************************
*/
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//并发数
factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
//最大并发数
factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);
configurer.configure(factory, connectionFactory);
return factory;
}
}
到此,若启动项目,则会自动创建相应的 队列,交换机、绑定
4、创建信息发送类
package com.laoxu.rabbittest.dto;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@Data
public class RabbitSendMsgDto implements Serializable {
private String consultId;
// 1支付超时 2图文结束倒计时 3支付提醒
private String type;
private String content;
private Date sendTime;
}
5、创建相应的消息发送和接收类
发送类
package com.laoxu.rabbittest.task;
import com.laoxu.rabbittest.config.RabbitConfig;
import com.laoxu.rabbittest.dto.RabbitSendMsgDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
import java.util.*;
/**
* Created with IntelliJ IDEA.
* User: HLW-RYH-1370
* Date: 2019/5/8
* Time: 15:21
* To change this template use File | Settings | File Templates.
*/
@Component
@Slf4j
public class RabbitQueueSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(RabbitSendMsgDto msg) {
msg.setSendTime(new Date());
log.info("发送定时消息:"+msg);
this.rabbitTemplate.convertAndSend(RabbitConfig.DEAD_LETTER_EXCHANGE, RabbitConfig.DELAY_ROUTING_KEY, msg);
}
public void sendRemind(RabbitSendMsgDto msg) {
msg.setSendTime(new Date());
log.info("发送定时消息:"+msg);
this.rabbitTemplate.convertAndSend(RabbitConfig.DEAD_LETTER_EXCHANGE, RabbitConfig.DELAY_REMIND_KEY, msg);
}
/**
* 视频业务进队列
*/
public void sendVideoConsult(RabbitSendMsgDto msg) {
rabbitTemplate.convertAndSend( RabbitConfig.VIDEO_CONSULT_APPLY , msg);
}
/**
* 安卓门诊药店发送消息
*/
public void sendToAndroid(RabbitSendMsgDto msg){
rabbitTemplate.convertAndSend("que_android_mzyd_" + "consultId", JSON.toJSONString(msg));
}
public void sendTwConsult(RabbitSendMsgDto msg) {
rabbitTemplate.convertAndSend( RabbitConfig.TW_CONSULT_APPLY , msg);
}
public void sendPharmacyConsult(RabbitSendMsgDto msg) {
rabbitTemplate.convertAndSend(RabbitConfig.DS_CONSULT_APPLY, msg);
}
}
接收类
package com.laoxu.rabbittest.task;
import com.laoxu.rabbittest.dto.RabbitSendMsgDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
/**
* Created with IntelliJ IDEA.
* User: HLW-RYH-1370
* Date: 2019/5/8
* Time: 16:16
* To change this template use File | Settings | File Templates.
*/
@Component
@EnableRabbit
@Configuration
@Slf4j
public class RabbitQueueReceiver {
@Autowired
private RabbitQueueSender taskQueueSender;
@RabbitListener(queues = "${queue.immediate_queue}")
@RabbitHandler
public void get(RabbitSendMsgDto msg) {
log.info("immediate_queue:" + msg.getContent());
}
@RabbitListener(queues = "${queue.video_queue}")
@RabbitHandler
public void getVideoConsult(RabbitSendMsgDto msg) {
log.info("video_queue:" + msg.getContent());
}
@RabbitListener(queues = "${queue.tw_queue}")
@RabbitHandler
public void getTwConsult(RabbitSendMsgDto msg) {
log.info("tw_queue:" + msg.getContent());
}
@RabbitListener(queues = "${queue.ds_queue:ds_consult_apply}")
@RabbitHandler
public void getPharmacyConsult(RabbitSendMsgDto msg) {
log.info("ds_queue:ds_consult_apply:" + msg.getContent());
}
}
说明,@RabbitListener 注解可以指定监听的队列,当发送类中有消息发送到队列中,接收类中可立即消费掉。
6、创建控制器测试
package com.laoxu.rabbittest.controller;
import com.laoxu.rabbittest.dto.RabbitSendMsgDto;
import com.laoxu.rabbittest.task.RabbitQueueSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/test")
public class rabbitController {
@Autowired
private RabbitQueueSender taskQueueSender;
@Bean
private RabbitSendMsgDto msgDto(){
RabbitSendMsgDto msgDto =new RabbitSendMsgDto();
msgDto.setType("1");
msgDto.setContent("支付超时消息");
msgDto.setConsultId("consult1");
return msgDto;
}
@GetMapping("/1")
public String send1(){
taskQueueSender.send(msgDto());
return "success";
}
@GetMapping("/2")
public String send2(){
taskQueueSender.sendRemind(msgDto());
return "success";
}
@GetMapping("/3")
public String send3(){
taskQueueSender.sendVideoConsult(msgDto());
return "success";
}
@GetMapping("/4")
public String send4(){
taskQueueSender.sendToAndroid(msgDto());
return "success";
}
@GetMapping("/5")
public String send5(){
taskQueueSender.sendTwConsult(msgDto());
return "success";
}
@GetMapping("/6")
public String send6(){
taskQueueSender.sendPharmacyConsult(msgDto());
return "success";
}
}
Github地址:https://github.com/1051513344/rabbittest