RocketMQ详解——RocketMQ的消息模式

RocketMQ的消息模式

在RocketMQ中,可以理解为没有ActiveMQ的createQueue()createTopic()的用法,也就是并没有P2P和Pub/Sub类似的概念。RocketMQ不遵循JMS规范,而是使用了一套自定义的机制。可以理解为RocketMQ都是基于Pub/Sub发布订阅模式的,在此基础上提供了集群消息和广播消息两种消息模式,可通过消费端方法consumer.setMessageModel()进行设置。

  • 集群消息——MessageModel.CLUSTERING

这方方式可以实现类似ActiveMQ负载均衡客户端的功能,同一个ConsumerGroup下的所有Consumer以负载均衡的方式消费消息。比较特殊的是,这种方式可以支持生产端先发送消息到Broker,消费端再订阅主题进行消费,比较灵活。RocketMQ默认为该模式

  • 广播消息——MessageModel.BROADCASTING

在这种模式下,生产端发送到Topic下的消息,会被订阅了该Topic的所有Consumer消费,即使它们处于同一个ConsumerGroup

在RocketMQ中,有一个很重要的概念——GroupName。无论是Producer端还是Consumer端,都必须指定一个GroupName,这个组名称需要由应用来保证唯一性。同一个ProducerGroup下的所有Producer发送用一类消息,且发送逻辑一致。Consumer同理。

Topic代表消息发送和订阅的主题,是一个逻辑上的概念,Topic并不实际存储消息。每个Topic都会维护一些MessageQueue(默认4个),这个MessageQueue则是物理上的概念,直接存储消息。

Producer端程序

使用DefaultMQProducer,发送8条消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import william.rmq.common.constant.RocketMQConstant;
import william.rmq.producer.common.CommonSendCallback;
import javax.annotation.PostConstruct;

/**
* @Description:RocketMQ消息生产者
*/
@Service
@Slf4j
public class MessageProducer {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;

private static final DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");

@PostConstruct
public void start(){
try {
producer.setNamesrvAddr(namesrvAddr);
producer.setRetryTimesWhenSendFailed(RocketMQConstant.MAX_RETRY_TIMES);
producer.start();

log.info("Message Producer Start...");
System.err.println("Message Producer Start...");
}catch (Exception e){
log.error("Message Producer Start Error!!", e);
}

String message = "Message-";
String topic = RocketMQConstant.TEST_TOPIC_NAME;
String tags = "Tags";
String keys = "Keys-";

for (int i = 1;i <= 8;i++) {
sendMessage(message + i, topic,tags, keys + i);
}
}

public void sendMessage(String data, String topic, String tags, String keys) {
try {
byte[] messageBody = data.getBytes(RemotingHelper.DEFAULT_CHARSET);

Message mqMsg = new Message(topic, tags, keys, messageBody);

producer.send(mqMsg, new CommonSendCallback());
} catch (Exception e) {
log.error("Message Producer: Send Message Error ", e);
}

}
}

Consumer端程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import william.rmq.common.constant.RocketMQConstant;

import javax.annotation.PostConstruct;
import java.util.List;

/**
* @Description:RocketMQ消息消费者
*/
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;

private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");


@PostConstruct
public void start() {
try {
consumer.setNamesrvAddr(namesrvAddr);

//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

//设置集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);

//设置消费超时时间(分钟)
consumer.setConsumeTimeout(RocketMQConstant.CONSUMER_TIMEOUT_MINUTES);

//订阅主题
consumer.subscribe(RocketMQConstant.TEST_TOPIC_NAME, "*");

//注册消息监听器
consumer.registerMessageListener(this);

//设置批量消费最大消息数,这里设置为逐条消费
consumer.setConsumeMessageBatchMaxSize(1);

//启动消费端
consumer.start();

log.info("Message Consumer Start...");
System.err.println("Message Consumer Start...");
} catch (MQClientException e) {
log.error("Message Consumer Start Error!!", e);
}

}

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

MessageExt message = msgs.get(0);
try {
String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " +
message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("Consume Message Error!!", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}

}

先启动2个Consumer程序,再启动单个Producer程序,两个Consumer端控制台输出如下:

Consumer1:

1
2
3
4
Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89E30006,topic: DefaultCluster,tags: Tags,messageBody: Message-7
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89E30007,topic: DefaultCluster,tags: Tags,messageBody: Message-8
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89D70002,topic: DefaultCluster,tags: Tags,messageBody: Message-1
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89E30005,topic: DefaultCluster,tags: Tags,messageBody: Message-6

Consumer2:

1
2
3
4
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89D80003,topic: DefaultCluster,tags: Tags,messageBody: Message-4
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89D70000,topic: DefaultCluster,tags: Tags,messageBody: Message-2
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89D70001,topic: DefaultCluster,tags: Tags,messageBody: Message-3
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89E30004,topic: DefaultCluster,tags: Tags,messageBody: Message-5

可以看到,使用集群消息模式后,两个Consumer负载均衡消费了8条消息

Powered by AppBlog.CN     浙ICP备14037229号

Copyright © 2012 - 2021 APP开发技术博客 All Rights Reserved.

访客数 : | 访问量 :