RocketMQ 生产消费实例

前言

前面两章主要是介绍了RocketMQ的应用场景和组件之间的关系,相信对MQ已经有了一定的认识,那么这篇主要来记录一下RocketMQ的安装,以及简单的消息生产和消费的demo编写。

RocketMQ的安装

环境:Windows + jdk1.8 + Maven + git。具体的步骤可以参看这篇博客。搭建过程没什么难度,所以这里不再复述。

需要注意的是在上面的步骤中,必须先要启动nameServer,cmd命令框执行进入至MQ文件夹\bin下:

1
start mqnamesrv.cmd

再启动一个Broker,在同一个目录下使用:

1
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

最终可以在RocketMQ插件控制台看到下面的效果:

你想输入的替代文字

Produce message

先给出依赖pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<rocketmq.version>4.1.0-incubating</rocketmq.version>
</properties>

<dependencies>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>

<!-- 整合RocketMq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>${rocketmq.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>

</dependencies>

Producer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Producer {
@Test
public static void main(String[] args) throws MQClientException, InterruptedException {

//需要一个producer group名字作为构造方法的参数,这里为producer1
DefaultMQProducer producer = new DefaultMQProducer("producer1");

//设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔
//NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setVipChannelEnabled(false);

//为避免程序启动的时候报错,添加此代码,可以让rocketMq自动创建topickey
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
producer.start();

for(int i=0;i<10;i++){
try {
//指定了topic为DefaultCluster,tags为Tag。发送的消息为测试
Message message = new Message("DefaultCluster", "Tag", ("测试" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println("发送的消息ID:" + sendResult.getMsgId() +"--- 发送消息的状态:" + sendResult.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}

输出:

你想输入的替代文字

Consum message

consumer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Consumer {
private static final String ADDR = "127.0.0.1:9876";//填写你的nameser地址

public static void main(String[] args) throws MQClientException {
//设置消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_LRW_DEV_SUBS");

consumer.setVipChannelEnabled(false);
consumer.setNamesrvAddr(ADDR);
//设置消费者端消息拉取策略,表示从哪里开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

//设置消费者拉取消息的策略,*表示消费该topic下的所有消息,也可以指定tag进行消息过滤
consumer.subscribe("DefaultCluster", "*");

//消费者端启动消息监听,一旦生产者发送消息被监听到,就打印消息,和rabbitmq中的handlerDelivery类似
consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
String topic = messageExt.getTopic();
String tag = messageExt.getTags();
String msg = new String(messageExt.getBody());
System.out.println("*********************************");
System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + msg + ", tag:" + tag + ", topic:" + topic);
System.out.println("*********************************");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

//调用start()方法启动consumer
consumer.start();
System.out.println("Consumer Started....");
}

}

启动后,consumer会一直监听topic为DefaultCluster是否有消息产生:

你想输入的替代文字

结束语

流程其实很简单,走通之后就可以慢慢的了解MQ的具体实现,下面会慢慢开始看RocketMQ的源码,后面的会主要围绕着源码进行记录。