博客
关于我
RabbitMQ - 基于 SpringAMQP 带你实现五种消息队列模型
阅读量:795 次
发布时间:2023-03-01

本文共 6756 字,大约阅读时间需要 22 分钟。

Spring AMQP 开发指南

目录

1.1 概念

  • AMQP是什么?
    AMQP(Advanced Message Queuing Protocol)是一种用于在应用程序之间传递业务消息的开放标准协议,与语言和平台无关,更符合微服务架构下的独立性要求。
  • Spring AMQP是什么?
    Spring AMQP 是基于 AMQP 协议定义的一套 API 规范,提供了模板用于发送和接收消息,包含两部分:spring-amqp 是基础抽象,spring-rabbit 是底层的默认实现。

1.2 前置知识(实现案例前必看)

1.2.1 创建队列

  • 在实际开发中,创建交换机和队列有两种方式,其中 Bean 方式最为常用和推荐。

  • 队列的创建

    • name:队列名称。
    • durable:是否持久化。
    • exclusive:是否独占。true 表示队列只能由一个消费者使用,false 表示所有消费者均可使用。
    • autoDelete:自动删除。true 表示没有消费者使用后,队列会自动删除。
    • arguments:扩展参数,可自定义队列选项,如队列过期、消息过期、死信队列等。
  • 示例代码

    Configuration 层中创建队列:

    @Configuration
    public class MqConfig {
    @Bean
    public Queue simpleQueue() {
    return new Queue("simple.queue", true);
    }
    }

1.2.2 创建交换机

  • 直接交换机

    直接交换机的创建方法:

    @Configuration
    public class MqConfig {
    @Bean
    public DirectExchange directExchange() {
    return new DirectExchange("direct", true, true);
    }
    }
  • 扇出交换机和主题交换机

    扇出交换机和主题交换机的创建方式与直接交换机类似,主要区别在于不同的路由规则。

  • 交换机的创建属性

    • name:交换机名称。
    • durable:是否持久化。
    • autoDelete:自动删除。
    • arguments:扩展参数,可配置延时消息、过期时间、死信交换机等。

1.2.3 创建绑定

  • 绑定交换机和队列的方式是通过 BindingBuilder 对象实现的。
  • 示例代码
    @Configuration
    public class MqConfig {
    @Bean
    public Binding simpleBinding() {
    return BindingBuilder.bind(simpleQueue())
    .to(directExchange())
    .with("simple");
    }
    }
  • 注意事项
    • 如果使用扇出交换机(FanoutExchange),则不需要通过 with 指定 bindingKey

1.2.4 @RabbitListener 注解

  • 情况一:队列已存在
    • 如果队列已存在,可以直接通过 @RabbitListener 注解监听队列消息,成为消费者。
    • 示例代码:
    @RabbitListener(queues = "simple.queue")
    public void ListenQueue(String msg) {
    System.out.println("消费者接收到 simple.queue 的消息: " + msg);
    }
  • 情况二:队列不存在
    • 如果队列不存在,可以通过 @RabbitListener 注解创建交换机、队列和绑定关系。
    • 示例代码:
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue("queue1"),
    exchange = @Exchange(name = "exchange1", type = ExchangeTypes.TOPIC),
    key = "test"
    ))
    public void testListener(String msg) {
    System.out.println("消费者接收到消息: " + msg);
    }

1.2.5 为什么更推荐使用 @Bean 注解?

  • 优点
    • 解耦合:队列、交换机、绑定关系的创建与业务逻辑分离。
    • 更好地控制 RabbitMQ 监听器的创建和销毁,避免性能问题和内存泄漏。
    • 管理连接和通道,更有助于提高可靠性和性能。

1.3 案例实现

1.3.1 基础消息队列(BasicQueue)

  • 队列创建
    @Configuration
    public class MqConfig {
    @Bean
    public Queue simpleQueue() {
    return new Queue("simple.queue", true);
    }
    }
  • 消息发送
    publisher 服务中添加测试类:
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAMQPTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendMessageSimpleQueue() {
    String queueName = "simple.queue";
    String message = "hello, spring amqp!";
    rabbitTemplate.convertAndSend(queueName, message);
    }
    }
  • 消息消费
    consumer 服务中添加消费逻辑:
    @Component
    public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void ListenQueue(String msg) {
    System.out.println("消费者接收到 simple.queue 的消息: " + msg);
    }
    }

1.3.2 工作队列(Work Queue)

  • 工作队列

    Work Queue 用于提高消息处理速度,避免消息堆积。

  • 消息预取问题

    消费者可能会先进行消息预取(prefetch),以避免同时处理过多消息。

    • 解决方法:设置 prefetch 为 1,以适应消费者处理能力。
  • 实现步骤

  • publisher 服务中循环发送消息:
    @Test
    public void testSendMessageWorkQueue() throws InterruptedException {
    String queueName = "simple.queue";
    String message = "hello, workQueue!";
    for (int i = 0; i < 50; i++) {
    rabbitTemplate.convertAndSend(queueName, message + i);
    Thread.sleep(20);
    }
    }
  • consumer 服务中添加多个消费者:
    @Component
    public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void ListenQueue(String msg) throws InterruptedException {
    System.err.println("消费者接收到 simple.queue 的消息: " + msg + LocalDate.now());
    Thread.sleep(100);
    }
    @RabbitListener(queues = "simple.queue")
    public void ListenQueue2(String msg) throws InterruptedException {
    System.out.println("消费者接收到 simple.queue 的消息: " + msg + LocalDate.now());
    Thread.sleep(25);
    }
    }

1.3.3 广播交换机(Fanout Exchange)

  • 广播消息
    Fanout Exchange 将消息广播到所有绑定的队列。
  • 实现步骤
  • 创建 FanoutExchange 和 绑定队列:
    @Configuration
    public class FanoutConfig {
    @Bean
    public FanoutExchange fanoutExchange() {
    return new FanoutExchange("itcast.fanout");
    }
    @Bean
    public Queue fanoutQueue1() {
    return new Queue("fanout.queue1");
    }
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(fanoutQueue1)
    .to(fanoutExchange);
    }
    @Bean
    public Queue fanoutQueue2() {
    return new Queue("fanout.queue2");
    }
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(fanoutQueue2)
    .to(fanoutExchange);
    }
    }
  • 编写测试方法:
    @Test
    public void testFanoutMessage() {
    String exchangeName = "itcast.fanout";
    String message = "hello! fanout";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
    }

1.3.4 路由交换机(Direct Exchange)

  • 路由规则
    Direct Exchange 根据路由键(RoutingKey)将消息路由到指定队列。
  • 实现步骤
  • consumer 服务中添加多个消费者:
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"java", "C++"}
    ))
    public void listenDirectQueue1(String msg) {
    System.out.println("消费者 1 收到 Direct 消息:" + msg);
    }
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"java", "GO"}
    ))
    public void listenDirectQueue2(String msg) {
    System.out.println("消费者 2 收到 Direct 消息:" + msg);
    }
  • 编写测试方法:
    @Test
    public void testDirectMessage() {
    String exchangeName = "itcast.direct";
    String message = "hello! direct";
    rabbitTemplate.convertAndSend(exchangeName, "C++", message);
    }

1.3.5 主题交换机(Topic Exchange)

  • 主题交换机
    Topic Exchange 的路由键使用通配符(#*),以实现更灵活的路由。
  • 实现步骤
  • 创建主题交换机和队列:
    @Configuration
    public class TopicConfig {
    @Bean
    public TopicExchange topicExchange() {
    return new TopicExchange("itcast.topic");
    }
    @Bean
    public Queue topicQueue1() {
    return new Queue("topic.queue1");
    }
    @Bean
    public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange) {
    return BindingBuilder.bind(topicQueue1)
    .to(topicExchange)
    .with("china.#");
    }
    @Bean
    public Queue topicQueue2() {
    return new Queue("topic.queue2");
    }
    @Bean
    public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) {
    return BindingBuilder.bind(topicQueue2)
    .to(topicExchange)
    .with("#.good");
    }
    }
  • 编写测试方法:
    @Test
    public void testTopicMessage() {
    String exchangeName = "itcast.topic";
    String message = "hello! topic";
    rabbitTemplate.convertAndSend(exchangeName, "china.good", message);
    }

通过以上步骤,可以实现 Spring AMQP 在不同场景下的消息队列开发,满足各种业务需求。

转载地址:http://kttfk.baihongyu.com/

你可能感兴趣的文章