本文共 6756 字,大约阅读时间需要 22 分钟。
spring-amqp 是基础抽象,spring-rabbit 是底层的默认实现。在实际开发中,创建交换机和队列有两种方式,其中 Bean 方式最为常用和推荐。
队列的创建:
name:队列名称。durable:是否持久化。exclusive:是否独占。true 表示队列只能由一个消费者使用,false 表示所有消费者均可使用。autoDelete:自动删除。true 表示没有消费者使用后,队列会自动删除。arguments:扩展参数,可自定义队列选项,如队列过期、消息过期、死信队列等。示例代码:
在Configuration 层中创建队列: @Configurationpublic class MqConfig { @Bean public Queue simpleQueue() { return new Queue("simple.queue", true); }} 直接交换机:
直接交换机的创建方法:@Configurationpublic class MqConfig { @Bean public DirectExchange directExchange() { return new DirectExchange("direct", true, true); }} 扇出交换机和主题交换机:
扇出交换机和主题交换机的创建方式与直接交换机类似,主要区别在于不同的路由规则。交换机的创建属性:
name:交换机名称。durable:是否持久化。autoDelete:自动删除。arguments:扩展参数,可配置延时消息、过期时间、死信交换机等。BindingBuilder 对象实现的。@Configurationpublic class MqConfig { @Bean public Binding simpleBinding() { return BindingBuilder.bind(simpleQueue()) .to(directExchange()) .with("simple"); }} FanoutExchange),则不需要通过 with 指定 bindingKey。@RabbitListener 注解监听队列消息,成为消费者。@RabbitListener(queues = "simple.queue")public void ListenQueue(String msg) { System.out.println("消费者接收到 simple.queue 的消息: " + msg);} @RabbitListener 注解创建交换机、队列和绑定关系。@RabbitListener(bindings = @QueueBinding( value = @Queue("queue1"), exchange = @Exchange(name = "exchange1", type = ExchangeTypes.TOPIC), key = "test"))public void testListener(String msg) { System.out.println("消费者接收到消息: " + msg);} @Configurationpublic class MqConfig { @Bean public Queue simpleQueue() { return new Queue("simple.queue", true); }} publisher 服务中添加测试类: @RunWith(SpringRunner.class)@SpringBootTestpublic class SpringAMQPTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessageSimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); }} consumer 服务中添加消费逻辑: @Componentpublic class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void ListenQueue(String msg) { System.out.println("消费者接收到 simple.queue 的消息: " + msg); }} 工作队列:
Work Queue 用于提高消息处理速度,避免消息堆积。消息预取问题:
消费者可能会先进行消息预取(prefetch),以避免同时处理过多消息。 prefetch 为 1,以适应消费者处理能力。实现步骤:
publisher 服务中循环发送消息: @Testpublic void testSendMessageWorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, workQueue!"; for (int i = 0; i < 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); }} consumer 服务中添加多个消费者: @Componentpublic class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void ListenQueue(String msg) throws InterruptedException { System.err.println("消费者接收到 simple.queue 的消息: " + msg + LocalDate.now()); Thread.sleep(100); } @RabbitListener(queues = "simple.queue") public void ListenQueue2(String msg) throws InterruptedException { System.out.println("消费者接收到 simple.queue 的消息: " + msg + LocalDate.now()); Thread.sleep(25); }} @Configurationpublic class FanoutConfig { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("itcast.fanout"); } @Bean public Queue fanoutQueue1() { return new Queue("fanout.queue1"); } @Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1) .to(fanoutExchange); } @Bean public Queue fanoutQueue2() { return new Queue("fanout.queue2"); } @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2) .to(fanoutExchange); }} @Testpublic void testFanoutMessage() { String exchangeName = "itcast.fanout"; String message = "hello! fanout"; rabbitTemplate.convertAndSend(exchangeName, "", message);} RoutingKey)将消息路由到指定队列。consumer 服务中添加多个消费者: @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"java", "C++"}))public void listenDirectQueue1(String msg) { System.out.println("消费者 1 收到 Direct 消息:" + msg);}@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"java", "GO"}))public void listenDirectQueue2(String msg) { System.out.println("消费者 2 收到 Direct 消息:" + msg);} @Testpublic void testDirectMessage() { String exchangeName = "itcast.direct"; String message = "hello! direct"; rabbitTemplate.convertAndSend(exchangeName, "C++", message);} # 和 *),以实现更灵活的路由。@Configurationpublic class TopicConfig { @Bean public TopicExchange topicExchange() { return new TopicExchange("itcast.topic"); } @Bean public Queue topicQueue1() { return new Queue("topic.queue1"); } @Bean public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue1) .to(topicExchange) .with("china.#"); } @Bean public Queue topicQueue2() { return new Queue("topic.queue2"); } @Bean public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue2) .to(topicExchange) .with("#.good"); }} @Testpublic void testTopicMessage() { String exchangeName = "itcast.topic"; String message = "hello! topic"; rabbitTemplate.convertAndSend(exchangeName, "china.good", message);} 通过以上步骤,可以实现 Spring AMQP 在不同场景下的消息队列开发,满足各种业务需求。
转载地址:http://kttfk.baihongyu.com/