【笔记】RocketMQ学习笔记

前言

RocketMQ学习笔记

同步模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Producer {
public static void main(String[] args) throws Exception {
// 创建生产者对象,并指定生产者组
DefaultMQProducer p = new DefaultMQProducer("producerGroup1");
// 指定注册中心
p.setNamesrvAddr("127.0.0.1:9876");

// 启动生产者
p.start();

// 向消息服务发送消息
Message msg = new Message(
"Topic1", // 指定发送的主题,相当于一级分类
"TagA", // 任意指定消息的标签,相当于二级分类(可选)
"Hello World !".getBytes() // 消息内容
);
SendResult r = p.send(msg);
System.out.println(r);

p.shutdown();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建消费者对象,并指定消费者组
/**
* DefaultMQPushConsumer:服务器主动向消费者推送消息
* DefaultMQPullConsumer:消费者主动向服务器请求消息
*/
DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumerGroup1");
// 指定消费者中心
c.setNamesrvAddr("127.0.0.1:9876");
// 从指定的Topic订阅消息
c.subscribe(
"Topic1", // 指定订阅的主题
"TagA || TagB" // 指定标签,多个表钱用`||`分隔
);
// 注册消息监听器
c.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt ext : list) {
String msg = new String(ext.getBody());
System.out.println("收到:"+msg);
}
// 处理成功的返回值
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// 处理失败的返回值
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 启动消费者
c.start();
System.out.println("消费者开始消费数据");
}
}

延迟消息

延迟级别

延迟级别 实际延迟时间
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 5m
10 6m
11 7m
12 8m
13 9m
14 10m
15 20m
16 30m
17 1h
18 2h

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Producer {
public static void main(String[] args) throws Exception {
// 创建生产者对象,并指定生产者组
DefaultMQProducer p = new DefaultMQProducer("producerGroup1");
// 指定注册中心
p.setNamesrvAddr("127.0.0.1:9876");

// 启动生产者
p.start();

// 向消息服务发送消息
Message msg = new Message("Topic1", "TagA", "Hello World !".getBytes());
// 指定延迟级别
msg.setDelayTimeLevel(3);

SendResult r = p.send(msg);
System.out.println(r);

p.shutdown();
}
}

消费者

  • 与同步模式相同

顺序模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class Producer {
static String[] msgs = {
"15103111039,创建",
"15103111065,创建",
"15103111039,付款",
"15103117235,创建",
"15103111065,付款",
"15103117235,付款",
"15103111065,完成",
"15103111039,推送",
"15103117235,完成",
"15103111039,完成"
};
public static void main(String[] args) throws Exception {
// 创建生产者对象,并指定生产者组
DefaultMQProducer p = new DefaultMQProducer("producerGroup1");
// 指定注册中心
p.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
p.start();

// 顺序发送所有消息,并指定队列选择器
for (String s : msgs) {
String[] arr = s.split(",");
long orderId = Long.parseLong(arr[0]);
Message msg = new Message("Topic2", "TagA", s.getBytes());

SendResult r = p.send(
msg, // 消息内容
new MessageQueueSelector() {
// 通过`id对消费者数量取余作为消费者的下标`来判定发送到哪个消费者
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o /*选择依据*/) {
long orderId = (long) o;
int index = (int) orderId % list.size();
return list.get(index);
}
}, // 选择器
orderId // 选择依据
);

System.out.println(r);

}

p.shutdown();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 创建消费者连接注册中心
DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumerGroup2");
c.setNamesrvAddr("127.0.0.1:9876");
// 订阅消息
c.subscribe(
"Topic2",
"*" // 所有标签
);

// 注册顺序消息监听器
c.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt ext : list) {
String msg = new String(ext.getBody());
System.out.println("收到:"+msg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

c.start();
System.out.println("消费者开始接收消息");

}
}

事务消息

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class Producer {
public static void main(String[] args) throws MQClientException {
// 创建事务消息生产者对象
TransactionMQProducer p = new TransactionMQProducer("producerGroup3");
// 连接注册中心并启动
p.setNamesrvAddr("127.0.0.1:9876");
p.start();
// 注册事务消息监听器,监听器实现两个功能
p.setTransactionListener(new TransactionListener() {

Map<String, LocalTransactionState> map = new ConcurrentHashMap<>();

// 1、执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {

if (Math.random() < 0.5/*模拟回查*/) {
System.out.println("本地事务执行状态未知");
map.put(message.getTransactionId(), LocalTransactionState.UNKNOW);
return LocalTransactionState.UNKNOW;
}

System.out.println("执行本地事务");
if (Math.random() < 0.5/*模拟事务执行状态*/) {
System.out.println("本地事务执行成功");
map.put(message.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE);
// 提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} else {
System.out.println("本地事务执行失败");
map.put(message.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);
// 回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}

// 2、处理事务回查
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("服务器正在回查事务状态");
return map.get(messageExt.getTransactionId());
}
});

// 循环发送事务消息,发送事务消息会出发监听器执行
while (true) {
System.out.print("输入消息:");
String s = new Scanner(System.in).nextLine();
Message msg = new Message("topic3", s.getBytes());

System.out.println("发送事务消息");

p.sendMessageInTransaction(
msg,
null // 业务数据,会被传递到监听器执行本地事务的方法中
);

System.out.println("事务消息处理结束");
}

}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建消费者对象,并指定消费者组
/**
* DefaultMQPushConsumer:服务器主动向消费者推送消息
* DefaultMQPullConsumer:消费者主动向服务器请求消息
*/
DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumerGroup3");
// 指定消费者中心
c.setNamesrvAddr("127.0.0.1:9876");
// 从指定的Topic订阅消息
c.subscribe("Topic1", "*");
// 注册消息监听器
c.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt ext : list) {
String msg = new String(ext.getBody());
System.out.println("收到:"+msg);
}
// 处理成功的返回值
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// 处理失败的返回值
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 启动消费者
c.start();
System.out.println("消费者开始消费数据");
}
}

完成