【代码】RabbitMQ学习笔记

前言

RabbitMQ快速入门

简单模式

生产者

  • 向消息队列发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Producer {
public static void main(String[] args) throws Exception {
// 建立连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140"); // 指定主机
f.setPort(5672); // 指定端口号,如果不指定,默认为5672
f.setUsername("admin"); // 指定RabbitMQ用户名
f.setPassword("admin"); // 指定RabbitMQ密码
Channel c = f.newConnection().createChannel();
// 创建队列
c.queueDeclare(
"helloworld", // 队列名
false, // 是否是持久队列
false, // 是否排他(独占)队列
false, // 是否自动删除
null // 其他属性,没有则为null
);
// 发送消息
c.basicPublish(
"", // 交换机名,空字符串表示默认的交换机
"helloworld", // 队列名
null, // 其他的消息属性配置,如果没有则为null
"Hello World !".getBytes() // 消息内容
);
System.out.println("消息已发送");
}
}

消费者

  • 从消息队列获取消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Consumer {
public static void main(String[] args) throws Exception {
// 建立连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 创建队列(队列如果已存在,不会重复创建)
c.queueDeclare("helloworld", false, false, false, null);
// 创建回调对象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery message) throws IOException {
byte[] a = message.getBody();
String msg = new String(a);
System.out.println("收到:"+msg);
}
};
// 创建取消消息处理的回调对象
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
};
// 消费数据,等待从队列接收数据
c.basicConsume(
"helloworld", // 队列名
true, // 是否自动确认,true表示自动确认,false表时手动确认
deliverCallback, // 处理消息的回调函数
cancelCallback // 取消消息处理的回调对象
);
}
}

工作模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Producer {
public static void main(String[] args) throws Exception {
// 建立连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义队列
c.queueDeclare("helloworld", false, false, false, null);
// 发送消息
while (true) {
System.out.print("输入消息:");
String msg = new Scanner(System.in).nextLine();
c.basicPublish("", "helloworld", null, msg.getBytes());
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Consumer {
public static void main(String[] args) throws Exception {
// 建立连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义队列
c.queueDeclare("helloworld", false, false, false, null);

DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) {
String msg = new String(delivery.getBody());
System.out.println("收到:"+msg);
for (int i = 0; i < msg.length(); i++) {
if ('.' == msg.charAt(i)) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
System.out.println("消息处理结束");
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
};
// 消费数据
c.basicConsume("helloworld", true, deliverCallback, cancelCallback);
}
}

合理分发

  • 通过回执,通知服务器消息处理完成
  • 每次只从服务器获取一条消息,没处理完成不进行下一条
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class Consumer {
public static void main(String[] args) throws Exception {
// 建立连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义队列
c.queueDeclare("helloworld", false, false, false, null);

DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String msg = new String(delivery.getBody());
System.out.println("收到:"+msg);
for (int i = 0; i < msg.length(); i++) {
if ('.' == msg.charAt(i)) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("消息处理结束");

/**
* 向服务器发送回执
* c.basicAck(回执, 是否一次回执全部提交);
*/
c.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {

}
};
/**
* 每次只处理一条消息,处理完成之前不接收下一条
* 必须在手动ACK模式下才有效
*/
c.basicQos(1);
// 消费数据
// c.basicConsume("helloworld", true, deliverCallback, cancelCallback);
c.basicConsume("helloworld", false, deliverCallback, cancelCallback);
}
}

持久化

  • 已经创建的队列不能修改为持久队列

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Producer {
public static void main(String[] args) throws Exception {
// 连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Connection conn = f.newConnection();
Channel c = conn.createChannel();
// 定义队列
c.queueDeclare("task_queue", true, false, false, null);

// 发送消息
/**
* 发送消息
* 消费者收到消息后,遍历字符串,每个字符串,都暂停一秒,来模拟耗时消息
*/
while (true) {
System.out.print("输入消息:");
String msg = new Scanner(System.in).nextLine();
c.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Consumer {
public static void main(String[] args) throws Exception {
// 建立连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义队列
c.queueDeclare("task_queue", true, false, false, null);

DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String msg = new String(delivery.getBody());
System.out.println("收到:"+msg);
for (int i = 0; i < msg.length(); i++) {
if ('.' == msg.charAt(i)) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("消息处理结束");
c.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};

CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {

}
};

c.basicQos(1);
// 消费数据
c.basicConsume("task_queue", false, deliverCallback, cancelCallback);
}
}

发布和订阅模式

  • fanout

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Producer {
public static void main(String[] args) throws Exception {
// 建立连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
/**
* 定义交换机
* 服务器如果已经存在这个交换机,不会重复创建
* c.exchangeDeclare(交换机名, 交换机类型);
*/
// c.exchangeDeclare("logs", "fanout");
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
// 向交换机发送消息
while (true) {
System.out.println("输入消息:");
String msg = new Scanner(System.in).nextLine();
c.basicPublish("logs", // 交换机名
"", // 选择队列,对fanout无效
null,
msg.getBytes()
);
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Consumer {
public static void main(String[] args) throws Exception {
// 建立连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
/**
* 定义随机队列
* 定义交换机(非持久,独占,自动删除)
* 绑定交换机和队列
* 对于fanout交换机,第三个参数无效
*/
String queue = UUID.randomUUID().toString();
c.queueDeclare(queue, false, true, true, null);
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
c.queueBind(queue, "logs", "");
// 接收消息
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String msg = new String(delivery.getBody());
System.out.print("收到:"+msg);
}
};

CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
};
c.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}

路由模式

  • direct

  • 通过路由键匹配绑定键发送消息

  • 交换机不存储消息,如果没有匹配的队列,消息直接丢弃

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Producer {
public static void main(String[] args) throws Exception {
// 建立连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义交换机
c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);

// 发送消息,携带路由键
while (true) {
System.out.println("输入消息:");
String msg = new Scanner(System.in).nextLine();
System.out.println("输入路由键:");
String key = new Scanner(System.in).nextLine();
c.basicPublish("direct_logs", key, null, msg.getBytes());
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Consumer {
public static void main(String[] args) throws Exception {
// 建立连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
/**
* 定义随机序列
* 定义交换机
* 绑定指定绑定的绑定键
*/
// 由RabbitMQ自动命名,通过getQueue()方法返回队列名,默认参数为false,true,true(非持久,独占,自动删除)
String queue = c.queueDeclare().getQueue();
c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
System.out.println("输入绑定键(用空格隔开多个键):");
String s = new Scanner(System.in).nextLine();
String[] a = s.split("\\s+");
for (String key : a) {
c.queueBind(queue, "direct_logs", key);
}
// 消费数据
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
// 取出消息数据和消息上携带的路由键
String msg = new String(delivery.getBody());
String key = delivery.getEnvelope().getRoutingKey();
System.out.println("收到:"+key+" - "+msg);
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
};
c.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}

主题模式

  • topic

  • 提供一种关键词规则的路由模式

  • 代码与路由模式大同小异

  • 每个消费者匹配到多个相同数据,只接收一条

  • 交换机不存储消息,如果没有匹配的队列,消息直接丢弃

绑定键的格式

  • .连接的多个关键词
  • *关键字表示任意一个关键词
  • #关键字表示任意一个关键词
1
2
3
aa.bb.cc
aa.*.cc
aa.#

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Producer {
public static void main(String[] args) throws Exception {
// 建立连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义交换机
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
// 发送消息,携带路由键
while (true) {
System.out.println("输入消息:");
String msg = new Scanner(System.in).nextLine();
System.out.println("输入路由键:");
String key = new Scanner(System.in).nextLine();
c.basicPublish("topic_logs", key, null, msg.getBytes());
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Consumer {
public static void main(String[] args) throws Exception {
// 建立连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
/**
* 定义随机序列
* 定义交换机
* 绑定指定绑定的绑定键
*/
// 由RabbitMQ自动命名,通过getQueue()方法返回队列名,默认参数为false,true,true(非持久,独占,自动删除)
String queue = c.queueDeclare().getQueue();
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
System.out.println("输入绑定键(用空格隔开多个键):");
String s = new Scanner(System.in).nextLine();
String[] a = s.split("\\s+");
for (String key : a) {
c.queueBind(queue, "direct_logs", key);
}
// 消费数据
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
// 取出消息数据和消息上携带的路由键
String msg = new String(delivery.getBody());
String key = delivery.getEnvelope().getRoutingKey();
System.out.println("收到:"+key+" - "+msg);
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
};
c.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}

RPC模式

完成