0%
前言
SpringBoot项目整合RabbitMQ
添加依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
添加配置
1 2 3 4 5
| spring: rabbitmq: host: 192.168.64.140 username: admin password: admin
|
添加配置方法
1 2 3 4
| @Bean public Queue orderQueue() { return new Queue("orderQueue"); }
|
简单模式
启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @SpringBootApplication public class Main {
@Autowired private Producer producer;
public static void main(String[] args) { SpringApplication.run(Main.class, args); }
@Bean public Queue helloworld() {
return new Queue("helloworld", false); }
@PostConstruct public void test() { producer.send(); }
}
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component public class Producer {
@Autowired private AmqpTemplate amqpTemplate;
public void send() { amqpTemplate.convertAndSend("helloworld", "helloworld"); System.out.println("消息已发送"); }
}
|
消费者
第一种方式
1 2 3 4 5 6 7 8 9 10
| @Component @RabbitListener(queues = "helloworld") public class Consumer {
@RabbitHandler public void receive(String msg) { System.out.println("收到:"+msg); }
}
|
第二种方式
1 2 3 4 5 6 7 8 9
| @Component public class Consumer {
@RabbitListener(queues = "helloworld") public void receive(String msg) { System.out.println("收到:"+msg); }
}
|
工作模式
启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @SpringBootApplication public class Main {
@Autowired private Producer producer;
public static void main(String[] args) { SpringApplication.run(Main.class, args); }
@Bean public Queue helloworld() { return new Queue("task_queue", true); }
@PostConstruct public void test() { producer.send(); }
}
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Component public class Producer {
@Autowired private AmqpTemplate amqpTemplate;
public void send() { new Thread(new Runnable() { @Override public void run() { while (true) { System.out.print("输入消息:"); String msg = new Scanner(System.in).nextLine(); amqpTemplate.convertAndSend("task_queue", msg); } } }).start(); }
}
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Component public class Consumer {
@RabbitListener(queues = "task_queue") public void receive1(String msg) { System.out.println("消费者1收到:"+msg); }
@RabbitListener(queues = "task_queue") public void receive2(String msg) { System.out.println("消费者2收到:"+msg); }
}
|
实现合理分发
手动ack
- SpringBoot整合后默认就是手动ack模式
- 消费者方法执行成功后,SpringBoot会帮助发送回执
qos=1
prefetch
:默认为250
1 2 3 4 5
| spring: rabbitmq: listener: direct: prefetch: 1
|
实现持久化
发布订阅模式
启动类
- 发布订阅模式不使用默认交换机,所以要手动指定交换机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @SpringBootApplication public class Main {
@Autowired private Producer producer;
public static void main(String[] args) { SpringApplication.run(Main.class, args); }
@Bean public FanoutExchange logsExchange() { return new FanoutExchange("logs", false, false); }
@PostConstruct public void test() { producer.send(); }
}
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Component public class Producer {
@Autowired private AmqpTemplate amqpTemplate;
public void send() { new Thread(new Runnable() { @Override public void run() { while (true) { System.out.print("输入消息:"); String msg = new Scanner(System.in).nextLine(); amqpTemplate.convertAndSend("logs", "", msg); } } }).start(); }
}
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Component public class Consumer {
@RabbitListener(bindings = @QueueBinding( value = @Queue, // 队列(如果不指定参数,默认参数为:随机名,非持久,独占,自动删除) exchange = @Exchange( name = "logs", declare = "false" // 只使用交换机,不重新定义新的交换机 ) // 交换机 )) public void receive1(String msg) { System.out.println("消费者1收到:"+msg); }
@RabbitListener(bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "logs", declare = "false"))) public void receive2(String msg) { System.out.println("消费者2收到:"+msg); }
}
|
路由模式
启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @SpringBootApplication public class Main {
@Autowired private Producer producer;
public static void main(String[] args) { SpringApplication.run(Main.class, args); }
@Bean public DirectExchange logsExchange() { return new DirectExchange("direct_logs", false, false); }
@PostConstruct public void test() { producer.send(); }
}
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Component public class Producer {
@Autowired private AmqpTemplate amqpTemplate;
public void send() { new Thread(new Runnable() { @Override public void run() { while (true) { System.out.print("输入消息:"); String msg = new Scanner(System.in).nextLine(); System.out.print("输入路由键:"); String key = new Scanner(System.in).nextLine(); amqpTemplate.convertAndSend("direct_logs", key, msg); } } }).start(); }
}
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Component public class Consumer {
@RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(name = "direct_logs", declare = "false"), key = {"error"} )) public void receive1(String msg) { System.out.println("消费者1收到:"+msg); }
@RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(name = "direct_logs", declare = "false"), key = {"info", "warning", "error"} )) public void receive2(String msg) { System.out.println("消费者2收到:"+msg); }
}
|
主题模式
启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @SpringBootApplication public class Main {
@Autowired private Producer producer;
public static void main(String[] args) { SpringApplication.run(Main.class, args); }
@Bean public DirectExchange logsExchange() { return new DirectExchange("direct_logs", false, false); }
@PostConstruct public void test() { producer.send(); }
}
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Component public class Producer {
@Autowired private AmqpTemplate amqpTemplate;
public void send() { new Thread(new Runnable() { @Override public void run() { while (true) { System.out.print("输入消息:"); String msg = new Scanner(System.in).nextLine(); System.out.print("输入路由键:"); String key = new Scanner(System.in).nextLine(); amqpTemplate.convertAndSend("topic_logs", key, msg); } } }).start(); }
}
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Component public class Consumer {
@RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(name = "topic_logs", declare = "false"), key = {"*.orange.*"} )) public void receive1(String msg) { System.out.println("消费者1收到:"+msg); }
@RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(name = "topic_logs", declare = "false"), key = {"*.*.rabbit", "lazy.#"} )) public void receive2(String msg) { System.out.println("消费者2收到:"+msg); }
}
|
RPC异步调用
完成