⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/install/ 「芋道源码」欢迎转载,保留摘要,谢谢!


推荐阅读如下 RocketMQ 文章:

1. 概述

在开始搭建 RocketMQ 服务之前,我们先来对它做下简单的了解。

RocketMQ 是阿里巴巴在 2012 年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于 2017 年 9 月 25 日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。

如下是 RocketMQ 产生的原因:

淘宝内部的交易系统使用了淘宝自主研发的 Notify 消息中间件,使用 MySQL 作为消息存储媒介,可完全水平扩容。

为了进一步降低成本,我们认为存储部分可以进一步优化。2011 年初,Linkin 开源了 Kafka 这个优秀的消息中间件,淘宝中间件团队在对 Kafka 做过充分 Review 之后, Kafka 无限消息堆积,高效的持久化速度吸引了我们。

但是,同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用 Java 语言编写了 RocketMQ ,定位于非日志的可靠消息传输(日志场景也 OK)。

目前 RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理, binglog 分发等场景。

1.1 基本概念

在开始之前,胖友先认真阅读如下两个文档:

1.2 整体架构

如下图所示:角色组成

  • 生产者(Producer):负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。
  • 消费者(Consumer):负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。
  • 消息服务器(Broker):是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。
  • 名称服务器(NameServer):用来保存 Broker 相关 Topic 等元信息并给 Producer ,提供 Consumer 查找 Broker 信息。

1.3 整体流程

整体流程

  • 1、启动 Namesrv,Namesrv起 来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

  • 2、Broker 启动,跟所有的 Namesrv 保持长连接,定时发送心跳包。

    心跳包中,包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。 注册成功后,Namesrv 集群中就有 Topic 跟 Broker 的映射关系。

    • 3、收发消息前,先创建 Topic 。创建 Topic 时,需要指定该 Topic 要存储在哪些 Broker上。也可以在发送消息时自动创建Topic。
  • 4、Producer 发送消息。

    启动时,先跟 Namesrv 集群中的其中一台建立长连接,并从Namesrv 中获取当前发送的 Topic 存在哪些 Broker 上,然后跟对应的 Broker 建立长连接,直接向 Broker 发消息。

  • 5、Consumer 消费消息。

    Consumer 跟 Producer 类似。跟其中一台 Namesrv 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

1.4 更多文档

目前 RocketMQ 4 的中文文档很少,所以英文不太好的胖友,后续推荐看看如下资料:

2. 单机部署

可以参考 《Apache RocketMQ —— Quick Start》 文章。

本小节,我们会部署一套 RocketMQ 最小化的单机环境,包括一个 RocketMQ Namesrv 和 Broker 服务。部署完成之后,我们会测试消息的发送与消费。下面,让我们逐步开始。

2.1 前置条件

需要安装如下软件:

  • JDK 8+
  • Maven 3.2.X+

因为我们准备直接编译 RocketMQ 源码,构建出 RocketMQ 软件包。

2.2 下载源码

打开 RocketMQ release_notes 页面,我们可以看到 RocketMQ 所有的发布版本。这里,我们选择最新的 RocketMQ 4.6.0 版本。点击进入该版本的发布页面后,我们可以看到两种发布版本:

一般情况下,我们可以直接使用 Binary 版本,它是 RocketMQ 已经编译好,可以直接使用的 RocketMQ 软件包。

这里,我们想带着胖友们编译一次 RocketMQ 源码,所以使用 Source 版本。下面,我们开始下载 RocketMQ 4.6.0 Source 源码。命令行操作如下:

# 下载
$ wget wget http://mirror.bit.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-source-release.zip

# 解压
$ unzip rocketmq-all-4.6.0-source-release.zip

2.2 编译源码

使用 Maven 编译 RocketMQ 源码。命令行操作如下:

# 进入 RocketMQ 源码目录
$ cd rocketmq-all-4.6.0-source-release

# Maven 编译 RocketMQ ,并跳过测试。耐心等待...
$ mvn -Prelease-all -DskipTests clean install -U

编译完成,在我们进入 distribution 目录下,就可以看到 RocketMQ 的发布包了。命令行操作如下:

# 进入 distribution 目录下
$ cd distribution/target/rocketmq-4.6.0/rocketmq-4.6.0

# 打印目录
$ ls
40 -rwxr-xr-x 1 yunai staff 17336 Nov 19 20:59 LICENSE
8 -rwxr-xr-x 1 yunai staff 1338 Nov 19 20:59 NOTICE
16 -rwxr-xr-x 1 yunai staff 4225 Nov 19 20:59 README.md
0 drwxr-xr-x 6 yunai staff 192 Dec 3 12:48 benchmark # 性能基准测试
0 drwxr-xr-x 30 yunai staff 960 Nov 19 20:59 bin # 执行脚本
0 drwxr-xr-x 12 yunai staff 384 Nov 19 20:59 conf # 配置文件
0 drwxr-xr-x 36 yunai staff 1152 Dec 3 12:48 lib # RocketMQ jar 包

2.3 启动 Namesrv

启动一个 RocketMQ Namesrv 服务。命令行操作如下:

nohup sh bin/mqnamesrv &

启动完成后,查看日志。

# 查看 Namesrv 日志。
$ tail -f ~/logs/rocketmqlogs/namesrv.log

2019-12-03 12:58:04 INFO main - The Name Server boot success. serializeType=JSON

  • 默认情况下,Namesrv 日志文件所在地址为 ~/logs/rocketmqlogs/namesrv.log 。如果想要自定义,可以通过 conf/logback_namesrv.xml 配置文件来进行修改。

2.4 启动 Broker

conf 目录下,RocketMQ 提供了多种 Broker 的配置文件:

  • broker.conf :单主,异步刷盘。
  • 2m/ :双主,异步刷盘。
  • 2m-2s-async/ :两主两从,异步复制,异步刷盘。
  • 2m-2s-sync/ :两主两从,同步复制,异步刷盘。
  • dledger/Dledger 集群,至少三节点。

这里,我们只启动一个 RocketMQ Broker 服务,所以使用 broker.conf 配置文件。命令行操作如下:

nohup sh bin/mqbroker -c conf/broker.conf  -n 127.0.0.1:9876 &

  • 通过 -c 参数,配置读取的主 Broker 配置。

  • 通过 -n 参数,设置 RocketMQ Namesrv 地址。

  • 如果胖友的服务器的存相对小,可以修改下 bin/runbroker.sh 脚本,将 Broker JVM 内存调小。如下:

    JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g"

启动完成后,查看日志。

tail -f ~/logs/rocketmqlogs/broker.log

2019-12-03 14:27:07 INFO main - The broker[broker-a, 192.168.3.44:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

  • 默认情况下,Broker 日志文件所在地址为 ~/logs/rocketmqlogs/broker.log 。如果想要自定义,可以通过 conf/logback_broker.xml 配置文件来进行修改。

😈 至此,我们已经完成了 RocketMQ 单机部署。下面,我们开始进行下消息的发送和消费的测试。

2.5 测试发送消息

通过使用 bin/tools.sh 工具类,实现测试发送消息。命令行操作如下:

# 设置 Namesrv 服务器的地址
export NAMESRV_ADDR=127.0.0.1:9876

# 执行生产者 Producer 发送测试消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

如果发送成功,我们会看到大量成功的发送日志。

SendResult [sendStatus=SEND_OK, msgId=FE800000000000004F2B5386138462F500000D7163610D67E7F100F4, offsetMsgId=C0A8032C00002A9F000000000000D7EE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=61]
SendResult [sendStatus=SEND_OK, msgId=FE800000000000004F2B5386138462F500000D7163610D67E7F200F5, offsetMsgId=C0A8032C00002A9F000000000000D8D1, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=61]

  • 通过发送结果为 sendStatus=SEND_OK 状态,说明消息都发送成功了。

2.6 测试消费消息

通过使用 bin/tools.sh 工具类,实现测试消费消息。命令行操作如下:

# 设置 Namesrv 服务器的地址
export NAMESRV_ADDR=127.0.0.1:9876

# 执行消费者 Consumer 消费测试消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

如果消费成功,我们会看到大量成功的消费日志。

ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=2, storeSize=227, queueOffset=131, sysFlag=0, bornTimestamp=1575354513732, bornHost=/192.168.3.44:55510, storeTimestamp=1575354513733, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000001D1FC, commitLogOffset=119292, bodyCRC=1549304357, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=145, CONSUME_START_TIME=1575354867104, UNIQ_KEY=FE800000000000004F2B5386138462F500000D7163610D67E944020E, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 50, 54], transactionId='null'}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=227, queueOffset=130, sysFlag=0, bornTimestamp=1575354513729, bornHost=/192.168.3.44:55510, storeTimestamp=1575354513729, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000001CE70, commitLogOffset=118384, bodyCRC=1530218044, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=145, CONSUME_START_TIME=1575354867103, UNIQ_KEY=FE800000000000004F2B5386138462F500000D7163610D67E941020A, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 50, 50], transactionId='null'}]]

  • 通过 ConsumeMessageThread_4ConsumeMessageThread_3 线程名,我们可以看出,目前是进行并发消费消息。

3. 集群部署

在生产环境下,必须搭建 RocketMQ 高可用集群,不然简直是找死。艿艿有个项目抠门了下,只搭建了一主一从,在一次主挂掉之后,因为 RocketMQ 不支持主从切换,就发生了线上事故。一般 RocketMQ 的集群部署方案推荐如下:

  • 如果对高性能有比较强的诉求,使用两主两从,异步复制,异步刷盘。
  • 如果对可靠性有比较强的诉求,建议使用 Dledger 集群,至少三节点。

因为在 《性能测试 —— RocketMQ 基准测试》「5. 搭建集群」 小节中,我们已经详细描述了如何搭建一个一主一从的 RocketMQ 单集群。胖友可以参考该文,搭建一个二主两从的 RocketMQ 双集群。😈

下面,艿艿额外放送下 RocketMQ 实现高可用的原理。感兴趣的胖友,可以瞅一瞅。

RocketMQ 集群

🦅 1. Producer

  • 1、Producer 自身在应用中,所以无需考虑高可用。
  • 2、Producer 配置多个 Namesrv 列表,从而保证 Producer 和 Namesrv 的连接高可用。并且,会从 Namesrv 定时拉取最新的 Topic 信息。
  • 3、Producer 会和所有 Consumer 直连,在发送消息时,会选择一个 Broker 进行发送。如果发送失败,则会使用另外一个 Broker 。
  • 4、Producer 会定时向 Broker 心跳,证明其存活。而 Broker 会定时检测,判断是否有 Producer 异常下线。

🦅 2. Consumer

  • 1、Consumer 需要部署多个节点,以保证 Consumer 自身的高可用。当相同消费者分组中有新的 Consumer 上线,或者老的 Consumer 下线,会重新分配 Topic 的 Queue 到目前消费分组的 Consumer 们。
  • 2、Consumer 配置多个 Namesrv 列表,从而保证 Consumer 和 Namesrv 的连接高可用。并且,会从 Consumer 定时拉取最新的 Topic 信息。
  • 3、Consumer 会和所有 Broker 直连,消费相应分配到的 Queue 的消息。如果消费失败,则会发回消息到 Broker 中。
  • 4、Consumer 会定时向 Broker 心跳,证明其存活。而 Broker 会定时检测,判断是否有 Consumer 异常下线。

🦅 3. Namesrv

  • 1、Namesrv 需要部署多个节点,以保证 Namesrv 的高可用。
  • 2、Namesrv 本身是无状态,不产生数据的存储,是通过 Broker 心跳将 Topic 信息同步到 Namesrv 中。
  • 3、多个 Namesrv 之间不会有数据的同步,是通过 Broker 向多个 Namesrv 多写。

🦅 4. Broker

  • 1、多个 Broker 可以形成一个 Broker 分组。每个 Broker 分组存在一个 Master 和多个 Slave 节点。
    • Master 节点,可提供读和写功能。Slave 节点,可提供读功能。
    • Master 节点会不断发送新的 CommitLog 给 Slave节点。Slave 节点不断上报本地的 CommitLog 已经同步到的位置给 Master 节点。
    • Slave 节点会从 Master 节点拉取消费进度、Topic 配置等等。
  • 2、多个 Broker 分组,形成 Broker 集群。
    • Broker 集群和集群之间,不存在通信与数据同步。
  • 3、Broker 可以配置同步刷盘或异步刷盘,根据消息的持久化的可靠性来配置。

4. Web Console 控制台

在 RocketMQ 拓展项目(rocketmq-externals) 中,包含了 RocketMQ Console 项目,是 RocketMQ 的图形化管理控制台,提供 Broker 集群信息查看,Topic 管理,Producer、Consumer 信息展示,消息查询等等常用功能。

虽然说,我们也可以使用 RocketMQ 提供的 CLI Admin Tool 工具,实现上述的查询与管理的功能,但是命令行的方式对操作人员的要求稍高一些。当然,在 RocketMQ Console 无法满足我们更精细化的管理的需求的时候,我们还是会使用 CLI Admin Tool 工具。

下面,让我们来搭建一个 RocketMQ Console 控制台。

4.1 克隆代码

rocketmq-externals 仓库的代码,克隆到本地。操作流程如下:

# 克隆代码
$ git clone https://github.com/apache/rocketmq-externals.git

# 进入 Console 目录
$ cd rocketmq-console

4.2 配置文件

如果胖友需要自定义 RocketMQ Console 的配置,可以进入该项目下的 src/main/resources/ 目录下,进行相应的配置文件修改。例如说,设置 RocketMQ Namesrv 地址,开启 RocketMQ Console 的登录访问。

这里,我们修改 src/main/resources/application.properties 配置文件,通过设置 rocketmq.config.namesrvAddr=127.0.0.1:9876 配置项,设置 RocketMQ Namesrv 的地址。

4.3 编译源码

使用 Maven 编译 RocketMQ Console 源码。命令行操作如下:

# 编译
$ mvn clean package -Dmaven.test.skip=true

4.4 启动控制台

直接以 jar 的方式,启动控制台。注意,控制台使用 8080 端口。命令行操作如下:

nohup java -jar target/rocketmq-console-ng-1.0.1.jar &

启动完成后,查看日志。

$ tail -f nohup.out

[2019-12-03 16:05:19.349] INFO Tomcat started on port(s): 8080 (http)
[2019-12-03 16:05:19.354] INFO Started App in 5.341 seconds (JVM running for 6.104)

  • 当看到如上日志,说明 Console 控制台启动完成。

4.5 简单使用

使用浏览器,访问 http://127.0.0.1:8080/ 地址,我们就可以看到 RocketMQ Console 的界面。如下图:RocketMQ Console

更多的使用指南,胖友可以后续看看 《RocketMQ Console —— 使用文档》

5. 简单示例

本小节,我们来看看如何使用生产者 Producer 发送消息,和消费者 Consumer 消费消息。

Rocketmq 仓库的 example 目录下,提供了 RocketMQ 示例。本小节,我们主要来看看 qucikstart 这个最简示例。

5.1 Producer

Producer 类,提供生产者 Producer 发送消息的最简示例。代码如下:

// Producer.java

public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {

/*
* Instantiate with a producer group name.
*/
// <1.1> 创建 DefaultMQProducer 对象
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// <1.2> 设置 RocketMQ Namesrv 地址
producer.setNamesrvAddr("127.0.0.1:9876");

/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/

/*
* Launch the instance.
*/
// <1.3> 启动 producer 生产者
producer.start();

for (int i = 0; i < 1000; i++) {
try {

/*
* Create a message instance, specifying topic, tag and message body.
*/
// <2.1> 创建 Message 消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);

/*
* Call send message to deliver message to one of brokers.
*/
// <2.2> 同步发送消息
SendResult sendResult = producer.send(msg);

// <2.3> 打印发送结果
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}

/*
* Shut down once the producer instance is not longer in use.
*/
// <3> 关闭 producer 生产者
producer.shutdown();
}

}

  • <1> 处,初始化一个 Producer 生产者。
    • <1.1> 处,创建 DefaultMQProducer 对象,这里设置的生产者分组是 "please_rename_unique_group_name"
    • <1.2> 处,设置 设置 producer 的 RocketMQ Namesrv 地址。这里,是艿艿额外添加的代码。
    • <1.3> 处,启动 producer 生产者。
  • <2> 处,使用 Producer 发送 1000 条消息。
    • <2.1> 处,创建 Message 消息。这里设置了其 Topic 为 "TopicTest",Tag 为 TagA、消息体 Body 为 "Hello RocketMQ" 的二进制数组。
    • <2.2> 处,调用生产者的 #send(Message msg) 方法,同步发送消息,等待发送结果。RocketMQ Producer 一共有三种发送消息的方式,除了我们这里看到的同步发送消息之外,还有异步发送消息(可见 AsyncProducer 示例),和 Oneway 发送消息。
    • <2.3> 处,打印发送结果。
  • <3> 处,关闭 producer 生产者。

执行 #main(args) 方法,开始发送消息。在控制台上,可以看到如下内容:

# 发送日志,省略另外 999 条日志
SendResult [sendStatus=SEND_OK, msgId=240E00E0F0931BB3FCEC071C1CDE61A8000018B4AAC20E79E06A03E7, offsetMsgId=C0A82BF000002A9F000000000008EE72, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=645]

# 关闭 Producer 日志
19:27:48.339 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
19:27:48.340 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.43.240:10911] result: true

5.2 Consumer

Consumer 类,提供消费者 Consumer 消费消息的最简示例。代码如下:

// Consumer.java

public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

/*
* Instantiate with specified consumer group name.
*/
// <1> 创建 DefaultMQPushConsumer 对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// <2> 设置 RocketMQ Namesrv 地址
consumer.setNamesrvAddr("127.0.0.1:9876");

/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/

/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
// <3> 设置消费进度,从 Topic 最初位置开始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

/*
* Subscribe one more more topics to consume.
*/
// <4> 订阅 TopicTest 主题
consumer.subscribe("TopicTest", "*");

/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
// <5> 添加消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

});

/*
* Launch the consumer instance.
*/
// <6> 启动 producer 消费者
consumer.start();

// 打印 Consumer 启动完成
System.out.printf("Consumer Started.%n");
}

}

  • <1> 处,创建 DefaultMQPushConsumer 对象,这里设置的消费者分组是 "please_rename_unique_group_name" 。注意,消费者分组的概念:

    FROM 概念(Concept)

    同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。

    要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic 。

    RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。 * 在集群消费下,同一条消息会被相同消费者分组的一个消费者所消费。 * 在广播消费下,同一条消息会被相同消费者分组的所有消费者所消费。 * 在当前示例里,我们采用的是 DefaultMQPushConsumer 的默认消费方式,集群消费。

  • <2> 处,设置 consumer 的 RocketMQ Namesrv 地址。这里,是艿艿额外添加的代码。

  • <3> 处,设置一个新的消费集群,初始的消费进度。目前有三个选项:

    • CONSUME_FROM_FIRST_OFFSET :每个 Topic 队列的第一条消息。
    • CONSUME_FROM_LAST_OFFSET :每个 Topic 队列的最后一条消息。
    • CONSUME_FROM_TIMESTAMP :每个 Topic 队列的指定时间开始的消息。
    • 😈 注意,只针对新的消费集群。如果一个集群每个 Topic 已经有消费进度,则继续使用该消费进度。仔细理解一下哈~
  • <4> 处,设置订阅 "TopicTest" 主题的消息。有一定一定要注意!!!消费者组的消费者实例必须订阅完全相同的 Topic + Tag

  • <5> 处,添加消息监听器。这里我们采用的是 MessageListenerConcurrently 并发消费消息的监听器。如果胖友需要实现顺序消费消息,需要使用 MessageListenerOrderly 顺序消费的监听器。

  • <6> 处,启动 consumer 消费者。此时,Consumer 就开始正式的消费消息啦。。

执行 #main(args) 方法,开始消费消息。在控制台上,可以看到如下内容:

# 消费者启动成功
Consumer Started.

# 消费内容
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=225, queueOffset=645, sysFlag=0, bornTimestamp=1575373846053, bornHost=/192.168.43.240:52717, storeTimestamp=1575373846058, storeHost=/192.168.43.240:10911, msgId=C0A82BF000002A9F000000000008EF55, commitLogOffset=585557, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=646, CONSUME_START_TIME=1575373846067, UNIQ_KEY=240E00E0F0931BB3FCEC071C1CDE61A8000018B4AAC20E8EE6250000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=3, storeSize=225, queueOffset=645, sysFlag=0, bornTimestamp=1575373846060, bornHost=/192.168.43.240:52717, storeTimestamp=1575373846061, storeHost=/192.168.43.240:10911, msgId=C0A82BF000002A9F000000000008F036, commitLogOffset=585782, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=646, CONSUME_START_TIME=1575373846067, UNIQ_KEY=240E00E0F0931BB3FCEC071C1CDE61A8000018B4AAC20E8EE62C0001, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]]

  • 通过 ConsumeMessageThread_1ConsumeMessageThread_2 线程名,我们可以看出,目前是进行并发消费消息。

6. Spring Boot 使用示例

《芋道 Spring Boot 分布式消息队列 RocketMQ 入门》 中,我们来详细学习如何在 Spring Boot 中,整合并使用 RocketMQ 。😈 会方便很多。

7. Spring Cloud 使用示例

在如下的文章中,我们来详细学习如何在 Spring Cloud 中,整合并使用 RocketMQ 。😈 更加方便。

666. 彩蛋

至此,我们已经完成了 RocketMQ 的入门。个人建议的话,对于初学 RocketMQ 的胖友,一定要认真仔细去读读 「1.4 更多文档」 推荐的内容,艿艿一路踩着“坑”过来,希望胖友能够走的平稳一些。很多时候,我们踩坑的原因,是因为我们没有认真仔细阅读相关的文档,在没有完全入门的情况下,匆匆忙忙将一个中间件就上线了。

这里,在额外推荐一些内容:

文章目录
  1. 1. 1. 概述
    1. 1.1. 1.1 基本概念
    2. 1.2. 1.2 整体架构
    3. 1.3. 1.3 整体流程
    4. 1.4. 1.4 更多文档
  2. 2. 2. 单机部署
    1. 2.1. 2.1 前置条件
    2. 2.2. 2.2 下载源码
    3. 2.3. 2.2 编译源码
    4. 2.4. 2.3 启动 Namesrv
    5. 2.5. 2.4 启动 Broker
    6. 2.6. 2.5 测试发送消息
    7. 2.7. 2.6 测试消费消息
  3. 3. 3. 集群部署
  4. 4. 4. Web Console 控制台
    1. 4.1. 4.1 克隆代码
    2. 4.2. 4.2 配置文件
    3. 4.3. 4.3 编译源码
    4. 4.4. 4.4 启动控制台
    5. 4.5. 4.5 简单使用
  5. 5. 5. 简单示例
    1. 5.1. 5.1 Producer
    2. 5.2. 5.2 Consumer
  6. 6. 6. Spring Boot 使用示例
  7. 7. 7. Spring Cloud 使用示例
  8. 8. 666. 彩蛋