rabbitmq
so-fast提供对消息队列rabbitmq的支持。
如果使用该功能,请现在pom.xml中添加依赖:
<dependency>
<groupId>com.sofast.cloud</groupId>
<artifactId>so-fast-rabbitmq-starter</artifactId>
</dependency>
并在配置文件中进行配置:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
这里只是一个配置示例,更多配置可以参考文末sf-rabbit.yml文件和官方配置https://docs.spring.io/spring-boot/docs/2.3.5.RELEASE/reference/html/appendix-application-properties.html#common-application-properties。
使用
RabbitMQ官网提供了七种度列模型,分别是简单队列、工作队列、发布订阅、路由模式、主题模式、RPC模式、发布者确认模式。前五种是常用模型。在SpringBoot中已经对消息队列提供了完善的封装,因此我们直接使用SpringBoot的amqp进行开发即可。
在sofast中对这五种模型进行了默认封装,可直接使用,也可自定义。
简单模式
声明队列(sofast已经内置)
/**
* 简单队列模式:最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为点对点模式
* 该模式只允许一个生产者和一个消费者,因此只能在单节点使用
*/
String SIMPLE_MODE_QUEUE_DEFAULT = "queue.simple.default";
/**
* 简单队列
* @return
*/
@Bean
public Queue simpleQueue() {
return new Queue(RabbitConstant.SIMPLE_MODE_QUEUE_DEFAULT);
}
生产者
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendMessage() {
String message = "简单队列-Message";
System.out.println("发送消息 : " + message);
rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_MODE_QUEUE_DEFAULT,message);
}
消费者
@Component
@RabbitListener(queues = RabbitConstant.SIMPLE_MODE_QUEUE_DEFAULT)
public class Consumer {
/**
* @RabbitListener 和 @RabbitHandler 搭配使用
* @RabbitListener可以标注在类上面,需配合 @RabbitHandler 注解一起使用
* 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,
* 具体使用哪个方法处理,根据接收到的 message 参数类型
**/
@RabbitHandler
public void process(String message) {
System.out.println("消费成功 : " + message);
}
}
工作队列
声明队列(sofast已经内置)
/**
* 工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式
* 该模式允许一个生产者和多个消费者,因此消费者可允许多节点
* 该模式下消息分发可分为轮训分发和公平分发,轮训分发为机械性轮流分发,不管理消费者处理的能力是否不同;
* 公平分发需要设置channel.basicQos(1);每次只发一条,处理完后再发下一条.
* <p>
* 默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。
* 需要进行以下持久化配置:
* <p>
* 参数配置一:生产者创建队列声明时,修改第二个参数为 true
* channel.queueDeclare(QUEUE_NAME, true, false, false, null);
* <p>
* 参数配置二:生产者发送消息时,修改第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN
* channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
*/
String WORK_MODE_QUEUE_DEFAULT = "query.work.default";
/**
* 工作队列
* @return
*/
@Bean
public Queue workQueue() {
return new Queue(RabbitConstant.WORK_MODE_QUEUE_DEFAULT);
}
配置
工作队列和简单队列在代码上是完全一样的,不同的是:简单队列只有一个消费者,而工作队列有多个消费者。
当存在多个消费者时,队列需要根据策略对不同消费者进行消息分发,rabbit模式采用轮训方式,即不管消费者能力,机械性轮训分发。我们可以通过配置文件修改分发策略
spring:
rabbitmq:
virtual-host: /
listener:
simple:
# 公平分发(限制每次发送一条数据, 必须大于等于transaction数量)
prefetch: 1
发布订阅
声明队列(sofast已经内置)
/**
* 发布/订阅模式:无选择接收消息,一个消息生产者,一个交换器,多个消息队列,多个消费者。称为发布/订阅模式
* 发布订阅模式引入了交换机(EXCHANGE)概念,生产者不是直接操作队列,而是将数据发送给交换机,由交换机将数据发送给与之绑定的队列.
* <p>
* =====发布端=====
* 绑定的交换机 参数1交互机名称 参数2 exchange类型
* channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
* <p>
* 发送消息
* channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
* <p>
* =====订阅端=====
* 消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
* channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
*/
String FANOUT_MODE_QUEUE_DEFAULT = "queue.fanout.default";
String FANOUT_MODE_QUEUE_EXTEND = "queue.fanout.extend";
String FANOUT_MODE_EXCHANGE_DEFAULT = "exchange.fanout.default";
String EXCHANGE_TYPE_FANOUT = "fanout";
// 发布/订阅模式:一个交换机对应多个队列 ======= start
/**
* 发布/订阅队列(默认队列)
* @return
*/
@Bean
public Queue fanoutQueueDefault() {
return new Queue(RabbitConstant.FANOUT_MODE_QUEUE_DEFAULT);
}
/**
* 发布/订阅队列(扩展队列)
* @return
*/
@Bean
public Queue fanoutQueueExtend() {
return new Queue(RabbitConstant.FANOUT_MODE_QUEUE_EXTEND);
}
/**
* 交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(RabbitConstant.FANOUT_MODE_EXCHANGE_DEFAULT);
}
/**
* 发布/订阅默认绑定队列和交换机
* @param fanoutQueueDefault 绑定默认队列
* @param fanoutExchange 绑定交换机
* @return
*/
@Bean
public Binding fanoutBindingDefault(Queue fanoutQueueDefault, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueueDefault).to(fanoutExchange);
}
/**
* 发布/订阅默认绑定队列和交换机
* @param fanoutQueueExtend 绑定扩展队列
* @param fanoutExchange 绑定交换机
* @return
*/
@Bean
public Binding fanoutBindingExtend(Queue fanoutQueueExtend, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueueExtend).to(fanoutExchange);
}
// 发布/订阅模式:一个交换机对应多个队列 ======= end
生产者
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendMessage() {
String message = "发布订阅模式-message";
System.out.println("发送消息 : " + message);
rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_MODE_EXCHANGE_DEFAULT,message);
}
消费者
@Component
public class Consumer {
@RabbitListener(queues = RabbitConstant.FANOUT_MODE_QUEUE_DEFAULT)
@RabbitHandler
public void processDefault(String message) {
System.out.println("默认队列消费成功 : " + message);
}
@RabbitListener(queues = RabbitConstant.FANOUT_MODE_QUEUE_EXTEND)
@RabbitHandler
public void processExtend(String message) {
System.out.println("扩展队列消费成功 : " + message);
}
}
路由模式
路由模式是在发布订阅模式的基础上,有选择的接收消息,即通过设置routing进行条件匹配接收消息
声明队列(sofast已经内置)
/**
* 路由模式:在发布/订阅模式的基础上,有选择的接收消息,也就是通过 routing 路由进行匹配条件是否满足接收消息。
* 路由模式在交换机的基础上又引入了路由(routing)概念,决定消息向队列推送的主要取决于路由,而不是交换机了.
* <p>
* =====发布端=====
* 绑定的交换机 参数1交互机名称 参数2 exchange类型
* channel.exchangeDeclare(EXCHANGE_NAME, "direct");
* 发送消息
* channel.basicPublish(EXCHANGE_NAME, sendType, null, message.getBytes("utf-8"));
* <p>
* =====订阅端=====
* 消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
* channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingKey");
*/
String DIRECT_MODE_EXCHANGE_DEFAULT = "exchange.direct.default";
String EXCHANGE_TYPE_DIRECT = "direct";
/** TODO 路由模式下,queue和routingKey具有业务性,应该在消费端根据业务不同进行定义,这里只是示例 */
String DIRECT_MODE_QUEUE_DEFAULT = "queue.direct.default";
String DIRECT_MODE_QUEUE_EXTEND = "queue.direct.extend";
String DIRECT_MODE_ROUTING_KEY_DEFAULT = "routing.key.default";
String DIRECT_MODE_ROUTING_KEY_EXTEND = "routing.key.extend";
因为路由模式,需要根据业务需求提供routingKey,因此在sofast中并没有对此做配置,在业务服务中进行配置使用较好。使用方式同上述发布订阅模式类型,只是增加了routingKey,交换机类型换成了Direct。
// 路由模式:通过routing key进行消息选择 ======= start
@Bean
public Queue directQueue() {
return new Queue(RabbitConstant.DIRECT_MODE_QUEUE_DEFAULT);
}
@Bean
public Queue directQueueExtend() {
return new Queue(RabbitConstant.DIRECT_MODE_QUEUE_EXTEND);
}
/**
* 路由模式交换机
* 交换机和队列之间通过binding key进行关联,只有当消息的routing key和binding key相同时才会被消费
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(RabbitConstant.DIRECT_MODE_EXCHANGE_DEFAULT);
}
@Bean
public Binding directBindingDefault(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directExchange).to(directExchange).with(RabbitConstant.DIRECT_MODE_ROUTING_KEY_DEFAULT);
}
@Bean
public Binding directBindingExtend(Queue directQueueExtend, DirectExchange directExchange) {
return BindingBuilder.bind(directQueueExtend).to(directExchange).with(RabbitConstant.DIRECT_MODE_ROUTING_KEY_EXTEND);
}
// 路由模式:通过routing key进行消息选择 ======= end
生产者
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* 生产者将消息发送给交换机,并绑定default key
**/
public void sendMessageDefault() {
String message = "路由模式-defaultKey-message";
System.out.println("发送消息 : " + message);
rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_MODE_EXCHANGE_DEFAULT,RabbitConstant.DIRECT_MODE_ROUTING_KEY_DEFAULT,message);
}
/**
* 生产者将消息发送给交换机,并绑定extend key
**/
public void sendMessageExtend() {
String message = "路由模式-extendKey-message";
System.out.println("发送消息 : " + message);
rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_MODE_EXCHANGE_EXTEND,RabbitConstant.DIRECT_MODE_ROUTING_KEY_EXTEND,message);
}
消费者
@Component
public class Consumer {
@RabbitListener(queues = RabbitConstant.DIRECT_MODE_QUEUE_DEFAULT)
@RabbitHandler
public void processA(String message) {
System.out.println("消息路由到了队列Default: " + message);
}
@RabbitListener(queues = RabbitConstant.DIRECT_MODE_QUEUE_EXTEND)
@RabbitHandler
public void processB1(String message) {
System.out.println("消息路由到了队列Extend: " + message);
}
}
主题模式
路由模式只能设置固定的routingkey,主题模式是在路由模式的基础上增加了通配符来选择信息。
声明队列(sofast已经内置)
/**
* 主题模式:同样是在发布/订阅模式的基础上,根据主题匹配进行筛选是否接收消息,比路由模式更灵活。
* direct 不支持匹配 routingKey,一但绑定了就是绑定了,而 topic 主题模式支持规则匹配,只要符合 routingKey 就能发送到绑定的队列上。
* <p>
* topics 模式与 routing 模式比较相近,topics 模式不能具有任意的 routingKey,
* 必须由一个英文句点号“.”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词)
* 比如 "lazy.orange.fox"。topics routingKey 中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
* <p>
* "*" 表示任何一个词
* "#" 表示0或1个词
* <p>
* =====发布端=====
* 绑定的交换机 参数1交互机名称 参数2 exchange类型
* channel.exchangeDeclare(EXCHANGE_NAME, "topic");
* 发送消息
* channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
* <p>
* =====订阅端=====
* 消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
* channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.*");
*/
String TOPIC_MODE_EXCHANGE_DEFAULT = "exchange.topic.default";
String EXCHANGE_TYPE_TOPIC = "topic";
/**
* topic模式下,queue和routingKey具有业务性,应该在消费端根据业务不同进行定义
*/
String TOPIC_MODE_QUEUE_DEFAULT = "queue.topic.default";
String TOPIC_MODE_QUEUE_ERROR = "queue.topic.error";
/**
* 示例
*/
String TOPIC_MODE_ROUTING_KEY_DEFAULT = "sofast.info.#";
String TOPIC_MODE_ROUTING_KEY_ERROR = "sofast.error.#";
// 主题模式: ======= // TODO: 2021/3/11
@Bean
public Queue topicQueue() {
return new Queue(RabbitConstant.TOPIC_MODE_QUEUE_DEFAULT);
}
@Bean
public Queue topicQueueError() {
return new Queue(RabbitConstant.TOPIC_MODE_QUEUE_ERROR);
}
/**
* 主题模式交换机
* <li>路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email</li>
* <li>通配符 * ,代表一个占位符,或者说一个单词,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了</li>
* <li>通配符 # ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也可以匹配</li>
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(RabbitConstant.TOPIC_MODE_EXCHANGE_DEFAULT);
}
@Bean
public Binding topicBindingDefault(Queue topicQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue).to(topicExchange).with(RabbitConstant.TOPIC_MODE_ROUTING_KEY_DEFAULT);
}
@Bean
public Binding topicBindingError(Queue topicQueueError, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueueError).to(topicExchange).with(RabbitConstant.TOPIC_MODE_ROUTING_KEY_ERROR);
}
发布者
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendMessage() {
String message1 = "主题模式-message-routingKey-sofast.info.#";
String message2 = "主题模式-message-routingKey-sofast.error.#";
System.out.println("发送message1 : " + message1);
rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_EXCHANGE_DEFAULT,"sofast.info.system",message1);
System.out.println("发送message2 : " + message2);
rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_EXCHANGE_DEFAULT,"sofast.eror.user",message2);
}
消费者
@Component
public class Consumer {
@RabbitListener(queues = RabbitConstant.1_MODE_QUEUE_DEFAULT)
@RabbitHandler
public void processA(String message) {
System.out.println("消息路由到了队列A: " + message);
}
@RabbitListener(queues = RabbitConstant.TOPIC_MODE_QUEUE_ERROR)
@RabbitHandler
public void processB1(String message) {
System.out.println("消息路由到了队列B: " + message);
}
}
手动Ack与自动Ack
当消息被消费者接收并处理后,队列中的消息就会被删除。那么rabbitmq如何能准确知道消费者是否接收到消息呢?这就需要通过消息确认机制。当消费者获取消息后,需要向rabbitmq发送回执ack,告知消息已经被接收到。
Ack的种类:
- 自动Ack:消息一旦被接收,消费者自动发送ack到服务器
- 手动Ack:消费者接收消息后,需要显式调用回执方法来发送ack到服务器
场景
- 自动Ack:非事务性消息或消息重要度较低,偶尔丢失1条对业务并不会产生影响,建议使用自动ack
- 手动Ack:针对不容丢失的重要业务消息,必须使用手动Ack方式,考虑处理成功和失败场景,考虑服务宕机或网络异常等场景。保证消息的有且仅有一次的消息处理。
使用方式
自动Ack:自动ack由框架自动完成,因此在自动Ack场景下,不需要任何编码来实现Ack。
但是在自动模式下,消费端可能会因为某些原因导致消费失败(比如引网络波动导致的短暂性网络不通)等情况,那么可以通过retry来进行重试。
需要注意的是,这里的失败场景一定不是因为自身代码原因导致的,因为这种情况重试多少次也是无法成功的,需要修复代码的bug才可以;当因为网络或第三方接口暂时不通等导致的情况,可以通过如下配置示例进行自动重试
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自动ack
retry:
enabled: true
max-attempts: 5
max-interval: 10000 # 重试最大间隔时间
initial-interval: 2000 # 重试初始间隔时间
multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
重点来了:这里的重试完全是客户端的行为,与rabbitmq是没有任何关系的(并不是rabbitmq会重新推送消息过来),而重试的依据是通过客户端抛出异常被AOP拦截后,间隔指定的时间后再重新执行该方法来进行的。因此这里最好不要进行全局的try{}catch(Exception e),可能会导致异常无法抛出而重试失效。
重试次数满了之后,消息应该如何处理,也有多种方式:在自动Ack模式下,当消息重试结束后默认自动ack本消息并且不会将消息重新发送队列(即:RejectAndDontRequeueRecoverer)。
MessageRecoverer除了RejectAndDontRequeueRecoverer的默认实现外,还有另外两种实现RepublishMessageRecoverer(重新发布消息)和ImmediateRequeueMessageRecoverer(立即重新返回队列)。RepublishMessageRecoverer会以新的routingKey重新发送到rabbitmq,然后通过专门的消费者来进行处理;ImmediateRequeueMessageRecoverer是重新返回原队列,周而复始直到不抛出异常才会停止,因此该种方式一般不推荐使用,可能会产生死循环。
另外还有一种方式是通过死信队列来处理重试失败的消息,这里就不展开。。
- 手动Ack:手动Ack需要调用代码来显示指定ack。
channel.basicAck(deliveryTag, multiple):消费成功的Ack应答;
channel.basicNack(deliveryTag, multiple, requeue):消费失败的Ack应答。
在消费失败的场景下,有两个选择:即requeue为true还是false。当requeue=true时,消息会重新放入消息队列进行重新消费,直到发送消费成功的Ack应答为止;当requeue=false时,消息会直接进入死信队列。
那么这里仍然有一个问题:在requeue=true的情况下,可能会出现无限制重试,这里可通过配合redis计数器进行重试次数指定。
//消费失败重试3次,3次失败后放入死信队列
int retryCount = (int) redisUtil.get(msgId);
if (retryCount >= 3) {
//requeue = false 放入死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
//requeue = true 放入消费队列重试消费
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
redisUtil.set(msgId, retryCount + 1);
}
懵逼点来了:在手动Ack模式下,其实也可以使用spring的自动retry配置,但是因为在手动模式下需要显式的发送Ack应答,当重试次数达到后,仍然消费失败,那么就无法发送成功的Ack应答,因此不推荐在手动Ack模式下进行自动retry。
附件:sf-rabbit.yml
# 该配置文件是示例参考文件,需要在自己的微服务中根据需要进行配置,本配置并未启用
# 更多详细配置可参考官方:https://docs.spring.io/spring-boot/docs/2.3.5.RELEASE/reference/html/appendix-application-properties.html#common-application-properties
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
ssl:
enable: false
# 指定持有SSL certificate的key store的路径
key-store:
# 指定访问key store的密码
key-store-password:
# 指定持有SSL certificates的Trust store
trust-store:
# 指定访问trust store的密码
trust-store-password:
# ssl使用的算法,例如,TLSv1.1
algorithm:
virtual-host: /
# 必须配置这个才会确认回调
publisher-confirm-type: correlated
publisher-returns: true
# 手动提交消息
listener:
simple:
#(手动Ack或自动Ack,这里只能选择一个)
acknowledge-mode: manual/auto
# 公平分发(限制每次发送一条数据, 必须大于等于transaction数量)
prefetch: 1
# 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
transaction-size: 1
retry:
# 是否支持重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试有无状态
stateless: false
# 时间策略乘数因子
multiplier: 1.0
# 第一次和第二次尝试发布或传递消息之间的间隔
initial-interval: 1000ms
# 最大重试时间间隔
max-interval: 10000ms
direct:
acknowledge-mode: manual