【笔记】SpringBoot项目整合RabbitMQ

前言

SpringBoot项目整合RabbitMQ

添加依赖

  • 编辑pom.xml配置文件
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置

  • 编辑application.yml配置文件
1
2
3
4
5
spring:
rabbitmq:
host: 192.168.64.140
username: admin
password: admin

添加配置方法

  • 在全局配置类中(或启动类中,因为启动类就是一个特殊的全局配置类)添加配置方法

    • import org.springframework.amqp.core.Queue:封装队列信息的对象
    • RabbitMQ的自动配置类会自动发现这个Queue实例
    • 根据其中封装的队列信息,在RabbitMQ服务器上创建这个队列
1
2
3
4
@Bean
public Queue orderQueue() {
return new Queue("orderQueue"); // true,false,false
}

简单模式

启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@SpringBootApplication
public class Main {

@Autowired
private Producer producer;

public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}

@Bean
public Queue helloworld() {
// return new Queue("helloworld"); // true,false,false
return new Queue("helloworld", false);
}

// Sprinjg扫描创建了所有对象,并完成所有注入操作后,会执行@PostConstruct方法
@PostConstruct
public void test() {
producer.send();
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class Producer {

// 在RabbitAutoConfiguration自动配置类中创建的工具对象
@Autowired
private AmqpTemplate amqpTemplate;

public void send() {
amqpTemplate.convertAndSend("helloworld", "helloworld");
System.out.println("消息已发送");
}

}

消费者

第一种方式

1
2
3
4
5
6
7
8
9
10
@Component
@RabbitListener(queues = "helloworld")
public class Consumer {

@RabbitHandler
public void receive(String msg) {
System.out.println("收到:"+msg);
}

}

第二种方式

1
2
3
4
5
6
7
8
9
@Component
public class Consumer {

@RabbitListener(queues = "helloworld")
public void receive(String msg) {
System.out.println("收到:"+msg);
}

}

工作模式

启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@SpringBootApplication
public class Main {

@Autowired
private Producer producer;

public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}

@Bean
public Queue helloworld() {
return new Queue("task_queue", true);
}

@PostConstruct
public void test() {
producer.send();
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class Producer {

// 在RabbitAutoConfiguration自动配置类中创建的工具对象
@Autowired
private AmqpTemplate amqpTemplate;

public void send() {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.print("输入消息:");
String msg = new Scanner(System.in).nextLine();
amqpTemplate.convertAndSend("task_queue", msg);
}
}
}).start();
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class Consumer {

@RabbitListener(queues = "task_queue")
public void receive1(String msg) {
System.out.println("消费者1收到:"+msg);
}

@RabbitListener(queues = "task_queue")
public void receive2(String msg) {
System.out.println("消费者2收到:"+msg);
}

}

实现合理分发

手动ack

  • SpringBoot整合后默认就是手动ack模式
  • 消费者方法执行成功后,SpringBoot会帮助发送回执

qos=1

  • 编辑application.yml配置文件

prefetch:默认为250

1
2
3
4
5
spring:
rabbitmq:
listener:
direct:
prefetch: 1

实现持久化

  • SpringBoot中默认就是持久化

发布订阅模式

启动类

  • 发布订阅模式不使用默认交换机,所以要手动指定交换机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@SpringBootApplication
public class Main {

@Autowired
private Producer producer;

public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}

// 创建交换机
@Bean
public FanoutExchange logsExchange() {
return new FanoutExchange("logs", false, false); // 非持久,不自动删除
}

@PostConstruct
public void test() {
producer.send();
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class Producer {

// 在RabbitAutoConfiguration自动配置类中创建的工具对象
@Autowired
private AmqpTemplate amqpTemplate;

public void send() {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.print("输入消息:");
String msg = new Scanner(System.in).nextLine();
amqpTemplate.convertAndSend("logs", "", msg);
}
}
}).start();
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class Consumer {

@RabbitListener(bindings = @QueueBinding(
value = @Queue, // 队列(如果不指定参数,默认参数为:随机名,非持久,独占,自动删除)
exchange = @Exchange(
name = "logs",
declare = "false" // 只使用交换机,不重新定义新的交换机
) // 交换机
))
public void receive1(String msg) {
System.out.println("消费者1收到:"+msg);
}

@RabbitListener(bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "logs", declare = "false")))
public void receive2(String msg) {
System.out.println("消费者2收到:"+msg);
}

}

路由模式

启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@SpringBootApplication
public class Main {

@Autowired
private Producer producer;

public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}

// 创建交换机
@Bean
public DirectExchange logsExchange() {
return new DirectExchange("direct_logs", false, false); // 非持久,不自动删除
}

@PostConstruct
public void test() {
producer.send();
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class Producer {

// 在RabbitAutoConfiguration自动配置类中创建的工具对象
@Autowired
private AmqpTemplate amqpTemplate;

public void send() {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.print("输入消息:");
String msg = new Scanner(System.in).nextLine();
System.out.print("输入路由键:");
String key = new Scanner(System.in).nextLine();
amqpTemplate.convertAndSend("direct_logs", key, msg);
}
}
}).start();
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class Consumer {

@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "direct_logs", declare = "false"),
key = {"error"}
))
public void receive1(String msg) {
System.out.println("消费者1收到:"+msg);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "direct_logs", declare = "false"),
key = {"info", "warning", "error"}
))
public void receive2(String msg) {
System.out.println("消费者2收到:"+msg);
}

}

主题模式

启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@SpringBootApplication
public class Main {

@Autowired
private Producer producer;

public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}

// 创建交换机
@Bean
public DirectExchange logsExchange() {
return new DirectExchange("direct_logs", false, false); // 非持久,不自动删除
}

@PostConstruct
public void test() {
producer.send();
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class Producer {

// 在RabbitAutoConfiguration自动配置类中创建的工具对象
@Autowired
private AmqpTemplate amqpTemplate;

public void send() {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.print("输入消息:");
String msg = new Scanner(System.in).nextLine();
System.out.print("输入路由键:");
String key = new Scanner(System.in).nextLine();
amqpTemplate.convertAndSend("topic_logs", key, msg);
}
}
}).start();
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class Consumer {

@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "topic_logs", declare = "false"),
key = {"*.orange.*"}
))
public void receive1(String msg) {
System.out.println("消费者1收到:"+msg);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "topic_logs", declare = "false"),
key = {"*.*.rabbit", "lazy.#"}
))
public void receive2(String msg) {
System.out.println("消费者2收到:"+msg);
}

}

RPC异步调用

完成