安装
MAC 系统
安装命令:brew install rabbitmq
安装的路径是 /usr/local/Cellar/rabbitmq/3.8.3,具体情况要视版本而定,我安装的版本是 3.8.3。
接下来就可以启动了,进入安装目录,执行命令:./sbin/rabbitmq-server
接下来可以在浏览器打开 http://localhost:15672,可以看到 RabbitMQ 的管理页面。
登录账号密码:guest/guest
导入 Maven 依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency
|
TTL 方式
application.properties 配置信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| spring.application.name=rabbitMq
server.port=8080 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=1
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=10
|
RabbitmqConfig 配置信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| package com.bean.springcloudproduct.config; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import java.util.HashMap; import java.util.Map;
public class RabbitmqConfig {
public DirectExchange userOrderDelayExchange() { return new DirectExchange("user.order.delay_exchange"); }
public Queue userOrderDelayQueue() { Map<String, Object> map = new HashMap<>(16); map.put("x-dead-letter-exchange", "user.order.receive_exchange"); map.put("x-dead-letter-routing-key", "user.order.receive_key"); return new Queue("user.order.delay_queue", true, false, false, map); }
public Binding userOrderDelayBinding() { return BindingBuilder.bind(userOrderDelayQueue()).to(userOrderDelayExchange()).with("user.order.delay_key"); }
public DirectExchange userOrderReceiveExchange() { return new DirectExchange("user.order.receive_exchange"); }
public Queue userOrderReceiveQueue() { return new Queue("user.order.receive_queue"); }
public Binding userOrderReceiveBinding() { return BindingBuilder.bind(userOrderReceiveQueue()).to(userOrderReceiveExchange()).with("user.order.receive_key"); } }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
|
public class DeadLetterSenderServiceImpl implements DeadLetterSenderService {
private RabbitTemplate rabbitTemplate;
public void sendLetterSenderMsg() { User user = new User(1, "confirm", "confirm123456"); MessagePostProcessor postProcessor = message -> { message.getMessageProperties().setExpiration("5000"); return message; }; rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); CorrelationData correlationData = new CorrelationData("confirm-" + System.currentTimeMillis()); this.rabbitTemplate.convertAndSend("user.order.delay_exchange", "user.order.delay_key", user, postProcessor, correlationData);
}
private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean b, String s) { if (b) { System.out.println("confirm 消息确认成功..." + correlationData.getId() + new Date()); } else { System.out.println("confirm 消息确认失败..." + correlationData.getId() + " cause: " + s); } } };
private final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("消息:" + message); System.out.println(new String(message.getBody())); System.out.println("回应码:" + i); System.out.println("回应信息:" + s); System.out.println("交换机:" + s1); System.out.println("路由键:" + s2); } };
}
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| Slf4j public class DeadLetterSenderListener {
public void getDLMessage(User user, Channel channel, Message message) throws IOException { try { System.out.println("延迟队列参数数据 : " + user + new Date()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("延迟队列接受到的消息为:" + new String(message.getBody())); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("延迟队列消息已重复处理失败,拒绝再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { log.error("延迟队列消息即将再次返回队列处理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } }
|
DLX
首先我们需要下载并安装 RabbitMQ 的延迟插件。
地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
将插件文件复制到 RabbitMQ 安装目录的 plugins 目录下;
进入 RabbitMQ 安装目录的 sbin 目录下,使用如下命令启用延迟插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
代码实现:
配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<>(1); args.put("x-delayed-type", "direct"); return new CustomExchange("exchange.xdelay.delayed", "x-delayed-message", true, false, args); }
public Queue immediateQueue() { return new Queue("queue.xdelay.immediate", true); }
public Binding bindingNotify() { return BindingBuilder.bind(immediateQueue()).to(delayExchange()).with("exchange.xdelay.delayed").noargs(); }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void sendLetterDLXSenderMsg() { User user = new User(1, "confirm", "confirm123456"); MessagePostProcessor postProcessor = message -> { message.getMessageProperties().setDelay(5000); return message; }; rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback);
CorrelationData correlationData = new CorrelationData("confirm-" + System.currentTimeMillis()); this.rabbitTemplate.convertAndSend("exchange.xdelay.delayed", "exchange.xdelay.delayed", user, postProcessor, correlationData); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
public void getDLXMessage(User user, Channel channel, Message message) throws IOException { try { System.out.println("DLX延迟队列参数数据 : " + user + new Date()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("DLX延迟队列接受到的消息为:" + new String(message.getBody())); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("DLX延迟队列消息已重复处理失败,拒绝再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { log.error("DLX延迟队列消息即将再次返回队列处理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
|
参考文章:https://blog.csdn.net/qq_37892957/article/details/89296157