rabbitmq

so-fast提供对消息队列rabbitmq的支持。

如果使用该功能,请现在pom.xml中添加依赖:

<dependency>
    <groupId>com.sofast.cloud</groupId>
    <artifactId>so-fast-rabbitmq-starter</artifactId>
</dependency>

并在配置文件中进行配置:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

这里只是一个配置示例,更多配置可以参考文末sf-rabbit.yml文件和官方配置https://docs.spring.io/spring-boot/docs/2.3.5.RELEASE/reference/html/appendix-application-properties.html#common-application-properties。

使用

RabbitMQ官网提供了七种度列模型,分别是简单队列、工作队列、发布订阅、路由模式、主题模式、RPC模式、发布者确认模式。前五种是常用模型。在SpringBoot中已经对消息队列提供了完善的封装,因此我们直接使用SpringBoot的amqp进行开发即可。

在sofast中对这五种模型进行了默认封装,可直接使用,也可自定义。

简单模式

声明队列(sofast已经内置

/**
 * 简单队列模式:最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为点对点模式
 * 该模式只允许一个生产者和一个消费者,因此只能在单节点使用
 */
String SIMPLE_MODE_QUEUE_DEFAULT = "queue.simple.default";
/**
 * 简单队列
 * @return
 */
@Bean
public Queue simpleQueue() {
    return new Queue(RabbitConstant.SIMPLE_MODE_QUEUE_DEFAULT);
}

生产者

@Autowired
private AmqpTemplate rabbitTemplate;

public void sendMessage() {
    String message = "简单队列-Message";
    System.out.println("发送消息 : " + message);
    rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_MODE_QUEUE_DEFAULT,message);
}

消费者

@Component
@RabbitListener(queues = RabbitConstant.SIMPLE_MODE_QUEUE_DEFAULT)
public class Consumer {
    /**
     * @RabbitListener@RabbitHandler 搭配使用
     * @RabbitListener可以标注在类上面,需配合 @RabbitHandler 注解一起使用
     * 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,
     * 具体使用哪个方法处理,根据接收到的 message 参数类型
     **/
    @RabbitHandler
    public void process(String message) {
        System.out.println("消费成功  : " + message);
    }
}
工作队列

声明队列(sofast已经内置

/**
 * 工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式
 * 该模式允许一个生产者和多个消费者,因此消费者可允许多节点
 * 该模式下消息分发可分为轮训分发和公平分发,轮训分发为机械性轮流分发,不管理消费者处理的能力是否不同;
 * 公平分发需要设置channel.basicQos(1);每次只发一条,处理完后再发下一条.
 * <p>
 * 默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。
 * 需要进行以下持久化配置:
 * <p>
 * 参数配置一:生产者创建队列声明时,修改第二个参数为 true
 * channel.queueDeclare(QUEUE_NAME, true, false, false, null);
 * <p>
 * 参数配置二:生产者发送消息时,修改第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN
 * channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
 */
String WORK_MODE_QUEUE_DEFAULT = "query.work.default";
/**
 * 工作队列
 * @return
 */
@Bean
public Queue workQueue() {
    return new Queue(RabbitConstant.WORK_MODE_QUEUE_DEFAULT);
}

配置

工作队列和简单队列在代码上是完全一样的,不同的是:简单队列只有一个消费者,而工作队列有多个消费者。

当存在多个消费者时,队列需要根据策略对不同消费者进行消息分发,rabbit模式采用轮训方式,即不管消费者能力,机械性轮训分发。我们可以通过配置文件修改分发策略

spring:
  rabbitmq:
    virtual-host: /
    listener:
      simple:
        # 公平分发(限制每次发送一条数据, 必须大于等于transaction数量)
        prefetch: 1
发布订阅

声明队列(sofast已经内置

/**
 * 发布/订阅模式:无选择接收消息,一个消息生产者,一个交换器,多个消息队列,多个消费者。称为发布/订阅模式
 * 发布订阅模式引入了交换机(EXCHANGE)概念,生产者不是直接操作队列,而是将数据发送给交换机,由交换机将数据发送给与之绑定的队列.
 * <p>
 * =====发布端=====
 * 绑定的交换机 参数1交互机名称 参数2 exchange类型
 * channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
 * <p>
 * 发送消息
 * channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
 * <p>
 * =====订阅端=====
 * 消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
 * channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
 */
String FANOUT_MODE_QUEUE_DEFAULT = "queue.fanout.default";
String FANOUT_MODE_QUEUE_EXTEND = "queue.fanout.extend";
String FANOUT_MODE_EXCHANGE_DEFAULT = "exchange.fanout.default";
String EXCHANGE_TYPE_FANOUT = "fanout";
// 发布/订阅模式:一个交换机对应多个队列  ======= start

/**
 * 发布/订阅队列(默认队列)
 * @return
 */
@Bean
public Queue fanoutQueueDefault() {
    return new Queue(RabbitConstant.FANOUT_MODE_QUEUE_DEFAULT);
}
/**
 * 发布/订阅队列(扩展队列)
 * @return
 */
@Bean
public Queue fanoutQueueExtend() {
    return new Queue(RabbitConstant.FANOUT_MODE_QUEUE_EXTEND);
}
/**
 * 交换机
 * @return
 */
@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange(RabbitConstant.FANOUT_MODE_EXCHANGE_DEFAULT);
}

/**
 * 发布/订阅默认绑定队列和交换机
 * @param fanoutQueueDefault 绑定默认队列
 * @param fanoutExchange 绑定交换机
 * @return
 */
@Bean
public Binding fanoutBindingDefault(Queue fanoutQueueDefault, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(fanoutQueueDefault).to(fanoutExchange);
}
/**
 * 发布/订阅默认绑定队列和交换机
 * @param fanoutQueueExtend 绑定扩展队列
 * @param fanoutExchange 绑定交换机
 * @return
 */
@Bean
public Binding fanoutBindingExtend(Queue fanoutQueueExtend, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(fanoutQueueExtend).to(fanoutExchange);
}

// 发布/订阅模式:一个交换机对应多个队列  ======= end

生产者

@Autowired
private AmqpTemplate rabbitTemplate;

public void sendMessage() {
    String message = "发布订阅模式-message";
    System.out.println("发送消息 : " + message);
    rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_MODE_EXCHANGE_DEFAULT,message);
}

消费者

@Component
public class Consumer {
    @RabbitListener(queues = RabbitConstant.FANOUT_MODE_QUEUE_DEFAULT)
    @RabbitHandler
    public void processDefault(String message) {
        System.out.println("默认队列消费成功  : " + message);
    }
    @RabbitListener(queues = RabbitConstant.FANOUT_MODE_QUEUE_EXTEND)
    @RabbitHandler
    public void processExtend(String message) {
        System.out.println("扩展队列消费成功  : " + message);
    }
}
路由模式

路由模式是在发布订阅模式的基础上,有选择的接收消息,即通过设置routing进行条件匹配接收消息

声明队列(sofast已经内置

    /**
     * 路由模式:在发布/订阅模式的基础上,有选择的接收消息,也就是通过 routing 路由进行匹配条件是否满足接收消息。
     * 路由模式在交换机的基础上又引入了路由(routing)概念,决定消息向队列推送的主要取决于路由,而不是交换机了.
     * <p>
     * =====发布端=====
     * 绑定的交换机 参数1交互机名称 参数2 exchange类型
     * channel.exchangeDeclare(EXCHANGE_NAME, "direct");
     * 发送消息
     * channel.basicPublish(EXCHANGE_NAME, sendType, null, message.getBytes("utf-8"));
     * <p>
     * =====订阅端=====
     * 消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
     * channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingKey");
     */
    String DIRECT_MODE_EXCHANGE_DEFAULT = "exchange.direct.default";
    String EXCHANGE_TYPE_DIRECT = "direct";
    /** TODO 路由模式下,queue和routingKey具有业务性,应该在消费端根据业务不同进行定义,这里只是示例 */
    String DIRECT_MODE_QUEUE_DEFAULT = "queue.direct.default";
    String DIRECT_MODE_QUEUE_EXTEND = "queue.direct.extend";

    String DIRECT_MODE_ROUTING_KEY_DEFAULT = "routing.key.default";
    String DIRECT_MODE_ROUTING_KEY_EXTEND = "routing.key.extend";

因为路由模式,需要根据业务需求提供routingKey,因此在sofast中并没有对此做配置,在业务服务中进行配置使用较好。使用方式同上述发布订阅模式类型,只是增加了routingKey,交换机类型换成了Direct。

// 路由模式:通过routing key进行消息选择  ======= start
@Bean
public Queue directQueue() {
    return new Queue(RabbitConstant.DIRECT_MODE_QUEUE_DEFAULT);
}
@Bean
public Queue directQueueExtend() {
    return new Queue(RabbitConstant.DIRECT_MODE_QUEUE_EXTEND);
}

/**
 * 路由模式交换机
 * 交换机和队列之间通过binding key进行关联,只有当消息的routing key和binding key相同时才会被消费
 * @return
 */
@Bean
public DirectExchange directExchange() {
    return new DirectExchange(RabbitConstant.DIRECT_MODE_EXCHANGE_DEFAULT);
}
@Bean
public Binding directBindingDefault(Queue directQueue, DirectExchange directExchange) {
    return BindingBuilder.bind(directExchange).to(directExchange).with(RabbitConstant.DIRECT_MODE_ROUTING_KEY_DEFAULT);
}
@Bean
public Binding directBindingExtend(Queue directQueueExtend, DirectExchange directExchange) {
    return BindingBuilder.bind(directQueueExtend).to(directExchange).with(RabbitConstant.DIRECT_MODE_ROUTING_KEY_EXTEND);
}

// 路由模式:通过routing key进行消息选择  ======= end

生产者

@Autowired
private AmqpTemplate rabbitTemplate;

/**
* 生产者将消息发送给交换机,并绑定default key
**/
public void sendMessageDefault() {
    String message = "路由模式-defaultKey-message";
    System.out.println("发送消息 : " + message);
    rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_MODE_EXCHANGE_DEFAULT,RabbitConstant.DIRECT_MODE_ROUTING_KEY_DEFAULT,message);
}

/**
* 生产者将消息发送给交换机,并绑定extend key
**/
public void sendMessageExtend() {
    String message = "路由模式-extendKey-message";
    System.out.println("发送消息 : " + message);
    rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_MODE_EXCHANGE_EXTEND,RabbitConstant.DIRECT_MODE_ROUTING_KEY_EXTEND,message);
}

消费者

@Component
public class Consumer {

    @RabbitListener(queues = RabbitConstant.DIRECT_MODE_QUEUE_DEFAULT)
    @RabbitHandler
    public void processA(String message) {
        System.out.println("消息路由到了队列Default: " + message);
    }

    @RabbitListener(queues = RabbitConstant.DIRECT_MODE_QUEUE_EXTEND)
    @RabbitHandler
    public void processB1(String message) {
        System.out.println("消息路由到了队列Extend: " + message);
    }
}
主题模式

路由模式只能设置固定的routingkey,主题模式是在路由模式的基础上增加了通配符来选择信息。

声明队列(sofast已经内置

/**
 * 主题模式:同样是在发布/订阅模式的基础上,根据主题匹配进行筛选是否接收消息,比路由模式更灵活。
 * direct 不支持匹配 routingKey,一但绑定了就是绑定了,而 topic 主题模式支持规则匹配,只要符合 routingKey 就能发送到绑定的队列上。
 * <p>
 * topics 模式与 routing 模式比较相近,topics 模式不能具有任意的 routingKey,
 * 必须由一个英文句点号“.”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词)
 * 比如 "lazy.orange.fox"。topics routingKey 中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
 * <p>
 * "*" 表示任何一个词
 * "#" 表示0或1个词
 * <p>
 * =====发布端=====
 * 绑定的交换机 参数1交互机名称 参数2 exchange类型
 * channel.exchangeDeclare(EXCHANGE_NAME, "topic");
 * 发送消息
 * channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
 * <p>
 * =====订阅端=====
 * 消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
 * channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.*");
 */
String TOPIC_MODE_EXCHANGE_DEFAULT = "exchange.topic.default";
String EXCHANGE_TYPE_TOPIC = "topic";
/**
 * topic模式下,queue和routingKey具有业务性,应该在消费端根据业务不同进行定义
 */
String TOPIC_MODE_QUEUE_DEFAULT = "queue.topic.default";
String TOPIC_MODE_QUEUE_ERROR = "queue.topic.error";
/**
 * 示例
 */
String TOPIC_MODE_ROUTING_KEY_DEFAULT = "sofast.info.#";
String TOPIC_MODE_ROUTING_KEY_ERROR = "sofast.error.#";
// 主题模式:  ======= // TODO: 2021/3/11
@Bean
public Queue topicQueue() {
    return new Queue(RabbitConstant.TOPIC_MODE_QUEUE_DEFAULT);
}
@Bean
public Queue topicQueueError() {
    return new Queue(RabbitConstant.TOPIC_MODE_QUEUE_ERROR);
}
/**
 * 主题模式交换机
 * <li>路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email</li>
 * <li>通配符 * ,代表一个占位符,或者说一个单词,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了</li>
 * <li>通配符 # ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也可以匹配</li>
 */
@Bean
public TopicExchange topicExchange() {
    return new TopicExchange(RabbitConstant.TOPIC_MODE_EXCHANGE_DEFAULT);
}
@Bean
public Binding topicBindingDefault(Queue topicQueue, TopicExchange topicExchange) {
    return BindingBuilder.bind(topicQueue).to(topicExchange).with(RabbitConstant.TOPIC_MODE_ROUTING_KEY_DEFAULT);
}
@Bean
public Binding topicBindingError(Queue topicQueueError, TopicExchange topicExchange) {
    return BindingBuilder.bind(topicQueueError).to(topicExchange).with(RabbitConstant.TOPIC_MODE_ROUTING_KEY_ERROR);
}

发布者

@Autowired
private AmqpTemplate rabbitTemplate;

public void sendMessage() {
    String message1 = "主题模式-message-routingKey-sofast.info.#";
    String message2 = "主题模式-message-routingKey-sofast.error.#";

    System.out.println("发送message1 : " + message1);
    rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_EXCHANGE_DEFAULT,"sofast.info.system",message1);

    System.out.println("发送message2 : " + message2);
    rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_MODE_EXCHANGE_DEFAULT,"sofast.eror.user",message2);
}

消费者

@Component
public class Consumer {

    @RabbitListener(queues = RabbitConstant.1_MODE_QUEUE_DEFAULT)
    @RabbitHandler
    public void processA(String message) {
        System.out.println("消息路由到了队列A: " + message);
    }

    @RabbitListener(queues = RabbitConstant.TOPIC_MODE_QUEUE_ERROR)
    @RabbitHandler
    public void processB1(String message) {
        System.out.println("消息路由到了队列B: " + message);
    }
}

手动Ack与自动Ack

当消息被消费者接收并处理后,队列中的消息就会被删除。那么rabbitmq如何能准确知道消费者是否接收到消息呢?这就需要通过消息确认机制。当消费者获取消息后,需要向rabbitmq发送回执ack,告知消息已经被接收到。

Ack的种类:
  • 自动Ack:消息一旦被接收,消费者自动发送ack到服务器
  • 手动Ack:消费者接收消息后,需要显式调用回执方法来发送ack到服务器
场景
  • 自动Ack:非事务性消息或消息重要度较低,偶尔丢失1条对业务并不会产生影响,建议使用自动ack
  • 手动Ack:针对不容丢失的重要业务消息,必须使用手动Ack方式,考虑处理成功和失败场景,考虑服务宕机或网络异常等场景。保证消息的有且仅有一次的消息处理。
使用方式
  • 自动Ack:自动ack由框架自动完成,因此在自动Ack场景下,不需要任何编码来实现Ack。

    但是在自动模式下,消费端可能会因为某些原因导致消费失败(比如引网络波动导致的短暂性网络不通)等情况,那么可以通过retry来进行重试。

    需要注意的是,这里的失败场景一定不是因为自身代码原因导致的,因为这种情况重试多少次也是无法成功的,需要修复代码的bug才可以;当因为网络或第三方接口暂时不通等导致的情况,可以通过如下配置示例进行自动重试

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto  # 自动ack
        retry:
          enabled: true
          max-attempts: 5
          max-interval: 10000   # 重试最大间隔时间
          initial-interval: 2000  # 重试初始间隔时间
          multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

重点来了:这里的重试完全是客户端的行为,与rabbitmq是没有任何关系的(并不是rabbitmq会重新推送消息过来),而重试的依据是通过客户端抛出异常被AOP拦截后,间隔指定的时间后再重新执行该方法来进行的。因此这里最好不要进行全局的try{}catch(Exception e),可能会导致异常无法抛出而重试失效。

重试次数满了之后,消息应该如何处理,也有多种方式:在自动Ack模式下,当消息重试结束后默认自动ack本消息并且不会将消息重新发送队列(即:RejectAndDontRequeueRecoverer)。

MessageRecoverer除了RejectAndDontRequeueRecoverer的默认实现外,还有另外两种实现RepublishMessageRecoverer(重新发布消息)和ImmediateRequeueMessageRecoverer(立即重新返回队列)。RepublishMessageRecoverer会以新的routingKey重新发送到rabbitmq,然后通过专门的消费者来进行处理;ImmediateRequeueMessageRecoverer是重新返回原队列,周而复始直到不抛出异常才会停止,因此该种方式一般不推荐使用,可能会产生死循环。

另外还有一种方式是通过死信队列来处理重试失败的消息,这里就不展开。。

  • 手动Ack:手动Ack需要调用代码来显示指定ack。

channel.basicAck(deliveryTag, multiple):消费成功的Ack应答;

channel.basicNack(deliveryTag, multiple, requeue):消费失败的Ack应答。

在消费失败的场景下,有两个选择:即requeue为true还是false。当requeue=true时,消息会重新放入消息队列进行重新消费,直到发送消费成功的Ack应答为止;当requeue=false时,消息会直接进入死信队列。

那么这里仍然有一个问题:在requeue=true的情况下,可能会出现无限制重试,这里可通过配合redis计数器进行重试次数指定。

//消费失败重试3次,3次失败后放入死信队列
            int retryCount = (int) redisUtil.get(msgId);

            if (retryCount >= 3) {
                //requeue = false 放入死信队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } else {
                //requeue = true 放入消费队列重试消费
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                redisUtil.set(msgId, retryCount + 1);
            }

懵逼点来了:在手动Ack模式下,其实也可以使用spring的自动retry配置,但是因为在手动模式下需要显式的发送Ack应答,当重试次数达到后,仍然消费失败,那么就无法发送成功的Ack应答,因此不推荐在手动Ack模式下进行自动retry。

附件:sf-rabbit.yml

# 该配置文件是示例参考文件,需要在自己的微服务中根据需要进行配置,本配置并未启用
# 更多详细配置可参考官方:https://docs.spring.io/spring-boot/docs/2.3.5.RELEASE/reference/html/appendix-application-properties.html#common-application-properties
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

    ssl:
      enable: false
      # 指定持有SSL certificate的key store的路径
      key-store:
      # 指定访问key store的密码
      key-store-password:
      # 指定持有SSL certificates的Trust store
      trust-store:
      # 指定访问trust store的密码
      trust-store-password:
      # ssl使用的算法,例如,TLSv1.1
      algorithm:

    virtual-host: /
    # 必须配置这个才会确认回调
    publisher-confirm-type: correlated
    publisher-returns: true
    # 手动提交消息
    listener:
      simple:
        #(手动Ack或自动Ack,这里只能选择一个)
        acknowledge-mode: manual/auto 
        # 公平分发(限制每次发送一条数据, 必须大于等于transaction数量)
        prefetch: 1
        # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
        transaction-size: 1
        retry:
          # 是否支持重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试有无状态
          stateless: false
          # 时间策略乘数因子
          multiplier: 1.0
          # 第一次和第二次尝试发布或传递消息之间的间隔
          initial-interval: 1000ms
          # 最大重试时间间隔
          max-interval: 10000ms
      direct:
        acknowledge-mode: manual
Copyright © 2020. 恩梯梯数据(中国)信息技术有限公司. all right reserved,powered by Gitbook该文件修订时间: 2021-04-25 10:50:58

results matching ""

    No results matching ""