【笔记】SpringBoot项目整合SpringAMQP

前言

SpringBoot项目整合SpringAMQP学习笔记

引入依赖

pom.xml
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置

  • 因为生产者和消费者都需要连接消息队列,所以需要相同的配置

spring.rabbitmq.addresses:RabbitMQ的地址
spring.rabbitmq.port:RabbitMQ的端口,默认为5672
spring.rabbitmq.virtual-host:RabbitMQ的虚拟主机
spring.rabbitmq.username:RabbitMQ的用户名
spring.rabbitmq.password:RabbitMQ的密码

src/main/resource/application.yml
1
2
3
4
5
6
7
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
virtual-host: /
username: admin
password: admin

简单队列(Simple Queue)

  • 一个生产者向队列中发送消息
  • 一个消费者从队列中获取消息,每次获取消息后,阅后即焚
graph LR
A[生产者] --> B[消息队列]
    B --> C[消费者]

生产者

  • 通过convertAndSend()方法发送消息

"queuename":消息队列名
"message":消息内容

src/test/java/com/ApplicationTests.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send() {
rabbitTemplate.convertAndSend("queuename", "message");
}
}

消费者

@RabbitListener(queues = "")@RabbitListener(queues = {"", """}):指定队列名,可以指定多个队列名

src/main/java/com/listener/SpringRabbitMQListener.java
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitMQListener {

@RabbitListener(queues = "queuename")
public void receive(String message) {
System.out.println("queuename: " + message);
}
}

工作队列(Work Queue)

  • 一个生产者向队列中发送消息
  • 多个消费者从队列中获取消息,每次获取消息后,阅后即焚
graph LR
A[生产者] --> B[消息队列]
    B --> C[消费者]
    B --> D[消费者]

生产者

"queuename":消息队列名
"message":消息内容

src/test/java/com/ApplicationTests.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send() {
rabbitTemplate.convertAndSend("queuename", "message");
}
}

消费者

添加额外配置

  • RabbitMQ内部有预取机制,默认预取消息的数量是无限
  • 为了防止消费者提前将消息拿走后再处理,而忽略了自身能力,可以修改预取消息数量

spring.rabbitmq.listener.simple.prefetch:配置预取消息数量。如果设置为1,消费者每次只能先拿到1条消息,处理完后才会拿下一条消息

src/main/resource/application.yml
1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple:
prefetch: 1

接收消息

src/main/java/com/listener/SpringRabbitMQListener.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitMQListener {

@RabbitListener(queues = "queuename")
public void receive1(String message) {
System.out.println("queuename: " + message);
}

@RabbitListener(queues = "queuename")
public void receive2(String message) {
System.out.println("queuename: " + message);
}
}

发布订阅模式

  • 一个生产者向交换机(exchange)中发送消息
  • 交换机会将消息拷贝多份,发布给多个订阅的队列
  • 一个或多个消费者从对应的队列中获取消息,每次获取消息后,阅后即焚
  • 交换机只能做消息的转发,如果转发没有成功,则消息会丢失
graph LR
A[生产者] --> B(交换机)
    B --> C[消息队列]
        C --> E[消费者]
        C --> F[消费者]
    B --> D[消息队列]
        D --> G[消费者]

广播方式(Fanout)

  • 交换机会将消息路由到每一个与其绑定的消息队列

消息队列绑定到交换机

  • 本案例通过声明Bean的方式声明交换机、声明消息队列、消息队列绑定交换机
src/main/java/com/config/FanoutConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;

@SpringBootConfiguration
public class FanoutConfig {

// 声明1个交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("exchangename");
}

// 声明2个消息队列
@Bean
public Queue queue1() {
return new Queue("queuename1");
}
@Bean
public Queue queue2() {
return new Queue("queuename2");
}

// 2个消息队列绑定到交换机
@Bean
public Binding binding1(Queue queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
@Bean
public Binding binding2(Queue queue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}

生产者

  • 生产者在发送消息时需要指定交换机

"exchangename":交换机名
"message":消息内容

src/test/java/com/ApplicationTests.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send() {
rabbitTemplate.convertAndSend("exchangename", "", "message");
}
}

消费者

src/main/java/com/listener/SpringRabbitMQListener.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitMQListener {

@RabbitListener(queues = "queuename1")
public void receive1(String message) {
System.out.println("queuename1: " + message);
}

@RabbitListener(queues = "queuename2")
public void receive2(String message) {
System.out.println("queuename2: " + message);
}
}

路由方式(Direct)

  • 交换机会根据规则路由到指定的消息队列,并不一定会转发给所有绑定的消息队列
  • 每一个消息队列指定一个或多个BindingKey作为路由的依据
  • 在交换机上会指定一个RoutingKey作为路由的条件
  • RoutingKeyBindingKey匹配成功时,才进行消息的转发

生产者

convertAndSend():第一个参数是交换机名,第二个参数是BindingKey,第三个参数是消息内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send1() {
rabbitTemplate.convertAndSend("exchangename", "BindingKey1", "message");
}

@Test
public void send2() {
rabbitTemplate.convertAndSend("exchangename", "BindingKey2", "message");
}

@Test
public void send3() {
rabbitTemplate.convertAndSend("exchangename", "BindingKey3", "message");
}
}

消费者

  • 本案例通过注解的方式声明交换机、声明消息队列、消息队列绑定交换机
  • 在消费者上,通过注解,将消息队列绑定交换机

value = @Queue():声明消息队列

name = "":指定消息队列名

exchange = @Exchange():声明交换机

name = "exchangename":指定交换机名
type = "direct"type = ExchangeTypes.DIRECT:指定交换机类型,可以指定字符串,也可以指定常量。如果不指定,默认是路由方式Direct

key = {"BindingKey1", "BindingKey3"}:指定BindingKey,可以指定一个或多个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitMQListener {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "queuename1"),
exchange = @Exchange(name = "exchangename", type = "direct"),
key = {"BindingKey1", "BindingKey3"}
))
public void receive1(String message) {
System.out.println("queuename1: " + message);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "queuename2"),
exchange = @Exchange(name = "exchangename", type = ExchangeTypes.DIRECT),
key = {"BindingKey2", "BindingKey3"}
))
public void receive2(String message) {
System.out.println("queuename2: " + message);
}
}

话题方式(Topic)

  • 话题方式与路由方式基本相同,只是引入了通配符匹配
  • 交换机会根据规则路由到指定的消息队列,并不一定会转发给所有绑定的消息队列
  • 每一个消息队列指定一个或多个BindingKey作为路由的依据,与路由方式不同的是,话题方式要求BindingKey是以多个单词组合构成,多个单词之间用.分隔
  • 在交换机上会指定一个RoutingKey作为路由的条件,可以使用通配符
    • #:表示0个或多个任意单词
    • *:表示1个任意单词
  • RoutingKeyBindingKey匹配成功时,才进行消息的转发

生产者

convertAndSend():第一个参数是交换机名,第二个参数是BindingKey(多个单词之间由.分隔),第三个参数是消息内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send() {
rabbitTemplate.convertAndSend("exchangename", "key.key", "message");
}
}

消费者

  • 与路由方式的消费者基本相同,只是在指定BindingKey时可以使用通配符
  • 本案例通过注解的方式声明交换机、声明消息队列、消息队列绑定交换机
  • 在消费者上,通过注解,将消息队列绑定交换机

value = @Queue():声明消息队列

name = "":指定消息队列名

exchange = @Exchange():声明交换机

name = "exchangename":指定交换机名
type = "topic"type = ExchangeTypes.TOPIC:指定交换机类型,可以指定字符串,也可以指定常量

key = {"key.#", "#.key"}:指定BindingKey,可以指定一个或多个,可以使用通配符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitMQListener {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "queuename1"),
exchange = @Exchange(name = "exchangename", type = "topic"),
key = {"key.#"}
))
public void receive1(String message) {
System.out.println("queuename1: " + message);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "queuename2"),
exchange = @Exchange(name = "exchangename", type = ExchangeTypes.TOPIC),
key = {"#.key"}
))
public void receive2(String message) {
System.out.println("queuename2: " + message);
}
}

将消息转换方式改为JSON

  • 默认的消息转换方式是JDK的序列化和反序列化方式,但是这种方式带来的问题是消息内容过长,同时JDK序列化可能会被漏洞利用
  • 可以将消息转换方式改为JSON的方式

引入依赖

pom.xml
1
2
3
4
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

创建配置类

src/main/java/com/ApplicationConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;

@SpringBootConfiguration
public class ApplicationConfig {

@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}

发送和接收消息

发送消息

src/test/java/com/ApplicationTests.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send() {
Map<String, Object> map = new HashMap<>();
map.put("key", "value");
rabbitTemplate.convertAndSend("queuename", map);
}
}

接收消息

src/main/java/com/listener/SpringRabbitMQListener.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class SpringRabbitMQListener {

@RabbitListener(queues = "queuename")
public void receive(Map<String, Object> map) {
System.out.println("queuename: " + map);
}
}

完成

参考文献

哔哩哔哩——黑马程序员