前言
SpringBoot项目整合SpringAMQP学习笔记
引入依赖
pom.xml
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
添加配置
- 因为生产者和消费者都需要连接消息队列,所以需要相同的配置
spring.rabbitmq.addresses
:RabbitMQ的地址
spring.rabbitmq.port
:RabbitMQ的端口,默认为5672
spring.rabbitmq.virtual-host
:RabbitMQ的虚拟主机
spring.rabbitmq.username
:RabbitMQ的用户名
spring.rabbitmq.password
:RabbitMQ的密码
src/main/resource/application.yml
1 2 3 4 5 6 7
| spring: rabbitmq: addresses: 127.0.0.1 port: 5672 virtual-host: / username: admin password: admin
|
简单队列(Simple Queue)
- 一个生产者向队列中发送消息
- 一个消费者从队列中获取消息,每次获取消息后,阅后即焚
graph LR
A[生产者] --> B[消息队列]
B --> C[消费者]
生产者
"queuename"
:消息队列名
"message"
:消息内容
src/test/java/com/ApplicationTests.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package com;
import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void send() { rabbitTemplate.convertAndSend("queuename", "message"); } }
|
消费者
@RabbitListener(queues = "")
、@RabbitListener(queues = {"", """})
:指定队列名,可以指定多个队列名
src/main/java/com/listener/SpringRabbitMQListener.java
1 2 3 4 5 6 7 8 9 10 11 12 13
| package com.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class SpringRabbitMQListener {
@RabbitListener(queues = "queuename") public void receive(String message) { System.out.println("queuename: " + message); } }
|
工作队列(Work Queue)
- 一个生产者向队列中发送消息
- 多个消费者从队列中获取消息,每次获取消息后,阅后即焚
graph LR
A[生产者] --> B[消息队列]
B --> C[消费者]
B --> D[消费者]
生产者
"queuename"
:消息队列名
"message"
:消息内容
src/test/java/com/ApplicationTests.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package com;
import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void send() { rabbitTemplate.convertAndSend("queuename", "message"); } }
|
消费者
添加额外配置
- RabbitMQ内部有预取机制,默认预取消息的数量是无限
- 为了防止消费者提前将消息拿走后再处理,而忽略了自身能力,可以修改预取消息数量
spring.rabbitmq.listener.simple.prefetch
:配置预取消息数量。如果设置为1,消费者每次只能先拿到1条消息,处理完后才会拿下一条消息
src/main/resource/application.yml
1 2 3 4 5 6 7 8 9 10
| spring: rabbitmq: addresses: 127.0.0.1 port: 5672 virtual-host: / username: admin password: admin listener: simple: prefetch: 1
|
接收消息
src/main/java/com/listener/SpringRabbitMQListener.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class SpringRabbitMQListener {
@RabbitListener(queues = "queuename") public void receive1(String message) { System.out.println("queuename: " + message); } @RabbitListener(queues = "queuename") public void receive2(String message) { System.out.println("queuename: " + message); } }
|
发布订阅模式
- 一个生产者向交换机(exchange)中发送消息
- 交换机会将消息拷贝多份,发布给多个订阅的队列
- 一个或多个消费者从对应的队列中获取消息,每次获取消息后,阅后即焚
- 交换机只能做消息的转发,如果转发没有成功,则消息会丢失
graph LR
A[生产者] --> B(交换机)
B --> C[消息队列]
C --> E[消费者]
C --> F[消费者]
B --> D[消息队列]
D --> G[消费者]
广播方式(Fanout)
消息队列绑定到交换机
- 本案例通过声明Bean的方式声明交换机、声明消息队列、消息队列绑定交换机
src/main/java/com/config/FanoutConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.config;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.Bean;
@SpringBootConfiguration public class FanoutConfig {
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("exchangename"); }
@Bean public Queue queue1() { return new Queue("queuename1"); } @Bean public Queue queue2() { return new Queue("queuename2"); }
@Bean public Binding binding1(Queue queue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue1).to(fanoutExchange); } @Bean public Binding binding2(Queue queue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue2).to(fanoutExchange); } }
|
生产者
"exchangename"
:交换机名
"message"
:消息内容
src/test/java/com/ApplicationTests.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package com;
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void send() { rabbitTemplate.convertAndSend("exchangename", "", "message"); } }
|
消费者
src/main/java/com/listener/SpringRabbitMQListener.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class SpringRabbitMQListener {
@RabbitListener(queues = "queuename1") public void receive1(String message) { System.out.println("queuename1: " + message); } @RabbitListener(queues = "queuename2") public void receive2(String message) { System.out.println("queuename2: " + message); } }
|
路由方式(Direct)
- 交换机会根据规则路由到指定的消息队列,并不一定会转发给所有绑定的消息队列
- 每一个消息队列指定一个或多个
BindingKey
作为路由的依据
- 在交换机上会指定一个
RoutingKey
作为路由的条件
- 当
RoutingKey
与BindingKey
匹配成功时,才进行消息的转发
生产者
convertAndSend()
:第一个参数是交换机名,第二个参数是BindingKey,第三个参数是消息内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com;
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void send1() { rabbitTemplate.convertAndSend("exchangename", "BindingKey1", "message"); }
@Test public void send2() { rabbitTemplate.convertAndSend("exchangename", "BindingKey2", "message"); }
@Test public void send3() { rabbitTemplate.convertAndSend("exchangename", "BindingKey3", "message"); } }
|
消费者
- 本案例通过注解的方式声明交换机、声明消息队列、消息队列绑定交换机
- 在消费者上,通过注解,将消息队列绑定交换机
value = @Queue()
:声明消息队列
name = ""
:指定消息队列名
exchange = @Exchange()
:声明交换机
name = "exchangename"
:指定交换机名
type = "direct"
、type = ExchangeTypes.DIRECT
:指定交换机类型,可以指定字符串,也可以指定常量。如果不指定,默认是路由方式Direct
key = {"BindingKey1", "BindingKey3"}
:指定BindingKey,可以指定一个或多个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.listener;
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class SpringRabbitMQListener {
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "queuename1"), exchange = @Exchange(name = "exchangename", type = "direct"), key = {"BindingKey1", "BindingKey3"} )) public void receive1(String message) { System.out.println("queuename1: " + message); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "queuename2"), exchange = @Exchange(name = "exchangename", type = ExchangeTypes.DIRECT), key = {"BindingKey2", "BindingKey3"} )) public void receive2(String message) { System.out.println("queuename2: " + message); } }
|
话题方式(Topic)
- 话题方式与路由方式基本相同,只是引入了通配符匹配
- 交换机会根据规则路由到指定的消息队列,并不一定会转发给所有绑定的消息队列
- 每一个消息队列指定一个或多个
BindingKey
作为路由的依据,与路由方式不同的是,话题方式要求BindingKey
是以多个单词组合构成,多个单词之间用.
分隔
- 在交换机上会指定一个
RoutingKey
作为路由的条件,可以使用通配符
- 当
RoutingKey
与BindingKey
匹配成功时,才进行消息的转发
生产者
convertAndSend()
:第一个参数是交换机名,第二个参数是BindingKey(多个单词之间由.
分隔),第三个参数是消息内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package com;
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void send() { rabbitTemplate.convertAndSend("exchangename", "key.key", "message"); } }
|
消费者
- 与路由方式的消费者基本相同,只是在指定BindingKey时可以使用通配符
- 本案例通过注解的方式声明交换机、声明消息队列、消息队列绑定交换机
- 在消费者上,通过注解,将消息队列绑定交换机
value = @Queue()
:声明消息队列
name = ""
:指定消息队列名
exchange = @Exchange()
:声明交换机
name = "exchangename"
:指定交换机名
type = "topic"
、type = ExchangeTypes.TOPIC
:指定交换机类型,可以指定字符串,也可以指定常量
key = {"key.#", "#.key"}
:指定BindingKey,可以指定一个或多个,可以使用通配符
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.listener;
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class SpringRabbitMQListener {
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "queuename1"), exchange = @Exchange(name = "exchangename", type = "topic"), key = {"key.#"} )) public void receive1(String message) { System.out.println("queuename1: " + message); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "queuename2"), exchange = @Exchange(name = "exchangename", type = ExchangeTypes.TOPIC), key = {"#.key"} )) public void receive2(String message) { System.out.println("queuename2: " + message); } }
|
将消息转换方式改为JSON
- 默认的消息转换方式是JDK的序列化和反序列化方式,但是这种方式带来的问题是消息内容过长,同时JDK序列化可能会被漏洞利用
- 可以将消息转换方式改为JSON的方式
引入依赖
pom.xml
1 2 3 4
| <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
|
创建配置类
src/main/java/com/ApplicationConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.Bean;
@SpringBootConfiguration public class ApplicationConfig {
@Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }
|
发送和接收消息
发送消息
src/test/java/com/ApplicationTests.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package com;
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap; import java.util.Map;
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void send() { Map<String, Object> map = new HashMap<>(); map.put("key", "value"); rabbitTemplate.convertAndSend("queuename", map); } }
|
接收消息
src/main/java/com/listener/SpringRabbitMQListener.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package com.listener;
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.util.Map;
@Component public class SpringRabbitMQListener {
@RabbitListener(queues = "queuename") public void receive(Map<String, Object> map) { System.out.println("queuename: " + map); } }
|
完成
参考文献
哔哩哔哩——黑马程序员