RocketMQ各角色介绍

Producer:消息发送者

Consumer:消费消费者

Broker:暂存、传递消息

NameServer:管理Broker

Topic:频道

Message Queue:消息

RocketMQ集群分析

服务器只需要关系 NameServer集群、Broker集群。我们实现生产者、消费者集群即可。

NameServer集群,每加入一个Broker,就会上报给所有NameServer

Broker集群以:BrokerName与BrokerID来定义。相同BrokerName即视为同一集群。集群数据同步可选择同步、异步方式复制数据。

Broker以ID区分主节点、从节点。默认id是0是master主节点,id大于0就是slave子节点了

生产者需要与NameServer集群的一个节点(随机节点)建立长链接,当发送消息时,向NameServer获得Broker地址。

消费者也需要与NameServer集群的某一个节点(随机节点)建立长链接,定期从NameServer获取Topic路由信息,并向Topic服务的Master、Slave节点建立长链接,定时向Master、Slave节点发送心跳。消费者获取消息时,可由消费者自由定义获取主节点、还是从节点的数据。

集群选择

单Master:就一个Master,没有Slave。 重启Broker,整个服务不可用,不建议线上环境使用!

多Master:没有Slave。性能最好,缺点:单机器宕机,服务不可用。依旧不建议线上使用!

多Master多Slave 异步模式:性能接近多Master。对比同步模式,不需要Master与Slave完全同步。一旦损坏,有可能会丢部分没及时同步的消息

多Master多Slave 同步模式:通以Broker集群,接收消息时,必须Master与Slave同步时,才告知消息发送成功。对比同步性能降低10%,对磁盘要求较高。

双主双从服务器搭建

集群的架构

A机器 1个NameServer、1个Master Broker A、1个Slave Broker B

B机器 1个NameServer、1个Master Broker B、1个Slave Broker A

特别说明 Broker是一个程序,多个程序就要避免端口被占用的情况

A、B机器分别各创建如下文件夹 (有些是A机器主从公Broker用一个文件夹,B机器也是如此)

mkdir -p /usr/local/rocketmq/store
mkdir -p /usr/local/rocketmq/store/commitlog
mkdir -p /usr/local/rocketmq/store/consumequeue
mkdir -p /usr/local/rocketmq/store/index

# 注意,下面千万不能被创建,不然你的集群将无法启动!
# mkdir -p /usr/local/rocketmq/store/checkpoint
# mkdir -p /usr/local/rocketmq/store/abort

mkdir -p /usr/local/rocketmq/store-s
mkdir -p /usr/local/rocketmq/store-s/commitlog
mkdir -p /usr/local/rocketmq/store-s/consumequeue
mkdir -p /usr/local/rocketmq/store-s/index

机器A 配置如下

# 所属集群名字 特别说明,集群必须保证每个节点加入的集群名一致
brokerClusterName=rocketmq-test-cluster
# broker名字,注意:不同的配置文件填写的不一样
brokerName=broker-a
# 是否是Master节点,0表示Master,大于0表示Slave
brokerId=0
# nameServer地址,多个地址以";"分割,注意:多个每个节点必须保证nameServer的地址、端口完全一致。
namesrvAddr=192.168.253.128:9876;192.168.253.129:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口,注意一台机器一个端口只能被一个节点,所以请保持本节点去其他节点不冲突
listenPort=10911
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

# Broker 的角色,这里非常重要
# - ASYNC_MASTER 异步的Master用这个
# - SYNC_MASTER 同步双写的Master用这个
# - SLAVE Slave节点只能用这个
brokerRole=SYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
# 发消息线程池数量
#sendMessageThreadPoolNums=128
# 拉消息线程池数量
#pullMessageThreadPoolNums=128

# 所属集群名字 特别说明,集群必须保证每个节点加入的集群名一致
brokerClusterName=rocketmq-test-cluster
# broker名字,注意:不同的配置文件填写的不一样
brokerName=broker-b
# 是否是Master节点,0表示Master,大于0表示Slave
brokerId=1
# nameServer地址,多个地址以";"分割,注意:多个每个节点必须保证nameServer的地址、端口完全一致。
namesrvAddr=192.168.253.128:9876;192.168.253.129:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口,注意一台机器一个端口只能被一个节点,所以请保持本节点去其他节点不冲突
listenPort=28000
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/store-s
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-s/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-s/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-s/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-s/checkpoint
# abort 文件存储路径
abortFile=/usr/local/rocketmq/store-s/abort
# 限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
# Broker 的角色,这里非常重要
# - ASYNC_MASTER 异步的Master用这个
# - SYNC_MASTER 同步双写的Master用这个
# - SLAVE Slave节点只能用这个
brokerRole=SLAVE
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
# 发消息线程池数量
#sendMessageThreadPoolNums=128
# 拉消息线程池数量
#pullMessageThreadPoolNums=128

机器B 配置如下

# 所属集群名字 特别说明,集群必须保证每个节点加入的集群名一致
brokerClusterName=rocketmq-test-cluster
# broker名字,注意:不同的配置文件填写的不一样
brokerName=broker-b
# 是否是Master节点,0表示Master,大于0表示Slave
brokerId=0
# nameServer地址,多个地址以";"分割,注意:多个每个节点必须保证nameServer的地址、端口完全一致。
namesrvAddr=192.168.253.128:9876;192.168.253.129:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口,注意一台机器一个端口只能被一个节点,所以请保持本节点去其他节点不冲突
listenPort=10911
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

# Broker 的角色,这里非常重要
# - ASYNC_MASTER 异步的Master用这个
# - SYNC_MASTER 同步双写的Master用这个
# - SLAVE Slave节点只能用这个
brokerRole=SYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
# 发消息线程池数量
#sendMessageThreadPoolNums=128
# 拉消息线程池数量
#pullMessageThreadPoolNums=128
# 所属集群名字 特别说明,集群必须保证每个节点加入的集群名一致
brokerClusterName=rocketmq-test-cluster
# broker名字,注意:不同的配置文件填写的不一样
brokerName=broker-a
# 是否是Master节点,0表示Master,大于0表示Slave
brokerId=1
# nameServer地址,多个地址以";"分割,注意:多个每个节点必须保证nameServer的地址、端口完全一致。
namesrvAddr=192.168.253.128:9876;192.168.253.129:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口,注意一台机器一个端口只能被一个节点,所以请保持本节点去其他节点不冲突
listenPort=28000
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/store-s
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-s/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-s/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-s/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-s/checkpoint
# abort 文件存储路径
abortFile=/usr/local/rocketmq/store-s/abort
# 限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

# Broker 的角色,这里非常重要
# - ASYNC_MASTER 异步的Master用这个
# - SYNC_MASTER 同步双写的Master用这个
# - SLAVE Slave节点只能用这个
brokerRole=SLAVE
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
# 发消息线程池数量
#sendMessageThreadPoolNums=128
# 拉消息线程池数量
#pullMessageThreadPoolNums=128

配置好,就可以去启动了,首先先去关闭Broker,然后再去关闭NameServer。

注意,Master分支使用端口是10911,Slave端口特别注意,我当时研究使用:10912,他娘的,搞来搞去,反之大致就是启动不起来,我对配置文件不熟悉,就觉得有问题,思维就被限制了。就盲目替换配置文件、RocketMQ版本,最后我没办法了,就修改修改端口28000,内存没掉下来,心里大舒一口气,可算成功了。

NameServer均是9876,第一台机器RocketMQ Connect Console是9877,注意防火墙放行 https://www.zanglikun.com/1010.html#firewall 然后重启防火墙即可。

NameServer正常启动。Broker需要指定配置文件了。

机器启动集群吧

nohup sh bin/mqnamesrv &

# 机器1 的A Master
nohup sh bin/mqbroker -c ./conf/2m-2s-sync/broker-a.properties &

# 启动机器2 的B Master
nohup sh bin/mqbroker -c ./conf/2m-2s-sync/broker-b.properties &

# 机器1 的B Slave
nohup sh bin/mqbroker -c ./conf/2m-2s-sync/broker-b-s.properties &

# 机器2 的A Slave
nohup sh bin/mqbroker -c ./conf/2m-2s-sync/broker-a-s.properties &

启动了,注意控制台的java项目没有配置相关参数,请前往配置文件修改一下,并重新编译一下:

rocketmq.config.namesrvAddr=192.168.253.128:9876;192.168.253.129:9876

还有种方式,启动时,动态修改参数,反正不加参数默认8080端口

nohup java -jar rocketmq-console-ng-1.0.0.jar --server.port=9877 rocketmq.config.namesrvAddr=192.168.253.128:9876;192.168.253.129:9876 &

RocketMQ支持3种消息发送方式

同步消息(Sync Message):生产者向broker发送消息,执行相关的代码同时等待,直到broker服务器返回发送结果,在后续执行。

异步消息(Async Message):生产者向broker发送消息时指定消息发送成功及发送异常的回调方法,调用的生产者代码执行立即返回,生产者发送消息线程不阻塞,消息发送成功或失败的回调任务在一个新的线程中执行。

单项消息(Oneway Message):生产者向broker发送消息,执行相关代码时直接返回,不等待broker服务器的结果。

消息结构

基础属性

topic:主题:将相同topic的消息发送到相同的的消息队列中。

消息体:消息的内容,可有是String,也可以是对象类型(此对象可有序列化)。单个消息的最大长度是4M

消息Flag:消息的标记,

拓展属性

tag:消息的二级分类,用于消费消息时进行过滤,可为空。

keys:Messgae索引键,可有根据key,快速检索到消息,可为空。

waitStoreMsgOK:消息发送时是否等消息存储后在返回。

快速入门

特殊说明:
上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取最新全部资料 ❤