理论知识
RocketMQ 天然采用集群模式,常见的 RocketMQ 集群有三种形式:多 Master 模式、多 Master 多 Slave- 异步复制模式、多 Master 多 Slave- 同步双写模式,这三种模式各自的优缺点如下。
- 多 Master 模式是配置最简单的模式,同时也是使用最多的形式。优点是单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,同步刷盘消息也不会丢失,性能也是最高的;缺点是单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
- 多 Master 多 Slave 异步复制模式。每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息毫秒级延迟,即使磁盘损坏只会丢失少量消息,且消息实时性不会受影响。同时 Master 宕机后,消费者仍然可以从 Slave 消费,而且此过程对应用透明,不需要人工干预,性能同多 Master 模式几乎一样;缺点是 Master 宕机,磁盘损坏情况下会丢失少量消息。
- 多 Master 多 Slave 同步双写模式,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,该模式数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;缺点是性能比异步复制模式低 10% 左右,发送单个消息的执行时间会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
部署
部署 NameServer 集群
直接wget下载
wget https://mirror-hk.koddos.net/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
unzip
解压后,我们进入bin
目录修改runserver.sh
文件的虚拟机内存配置(因为默认的太大了吃不消),根据自己服务器实际情况更改
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
后台启动一下NameServer服务
nohup sh mqnamesrv &
NameServer 将占用 9876
端口提供服务,不要忘记在防火墙设置放行,另一台服务器也同样方式操作一波。
部署 Broker 集群
同样改小JVM内存,这次改bin
下的runbroker.sh
文件
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"
然后我们要配置一下Broker,在 conf
目录下,RocketMQ 已经给我们贴心的准备好三组集群配置模板:
- 2m-2s-async 代表双主双从异步复制模式;
- 2m-2s-sync 代表双主双从同步双写模式;
- 2m-noslave 代表双主模式。
这里配置的是双主模式
,所以去2m-noslave目录下,发现有broker-a.properties和broker-b.properties两个配置文件。
第一台主机修改broker-a.properties作为broker-a,第二台修改broker-b.properties作为broker-b,都添加上NameServer集群的地址并配上自己的公网IP:
#集群名称,同一个集群下的 broker 要求统一
brokerClusterName=DefaultCluster
#broker 名称
brokerName=broker-a
#broker 公网IP
brokerIP1=49.234.82.226
#brokerId=0 代表主节点,大于零代表从节点
brokerId=0
#删除日志文件时间点,默认凌晨 4 点
deleteWhen=04
#日志文件保留时间,默认 48 小时
fileReservedTime=48
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘,性能好宕机会丢数
#- SYNC_FLUSH 同步刷盘,性能较差不会丢数
flushDiskType=ASYNC_FLUSH
#末尾追加,NameServer 节点列表,使用分号分割
namesrvAddr=192.168.31.200:9876;192.168.31.201:9876
broker配置文件里一定要配个公网IP,否则默认是用内网的!
broker-b就只有brokerName
和brokerIP1
不同
之后就可以运行了,到bin目录下
nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &
broker-a就用broker-a.properties启动,broker-b就用broker-b.properties启动
Broker 将占用 10911
端口提供服务,记得设置防火墙放行
测试部署结果
- 修改bin下
tools.sh
的Djava.ext.dirs
,添加jvm的ext绝对路径:
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.282.b08-1.el7_9.x86_64/jre/lib/ext"
如上,在最后用冒号隔开加上自己的jre/lib/ext
绝对路径
- 使用
mqadmin
命令查看集群状态
sh mqadmin clusterList -n [NameServer服务器的IP]:9876
可以看到broker-a和broker-b的地址和端口
- 使用
tools.sh
工具通过生成演示数据来测试 MQ 实际的运行情况
测试生产者发送消息
export NAMESRV_ADDR=[NameServer服务器的IP]:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
集群生效后,可以看到broker-a和broker-交替出现
测试消费者接收消息
# 设置环境变量
export NAMESRV_ADDR=[NameServer服务器的IP]:9876
# 运行测试程序
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
部署可视化界面 RocketMQ-Console
使用docker部署
# 拉取镜像
docker pull apacherocketmq/rocketmq-console:2.0.0
# 运行容器
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=[NameServer服务器的IP]:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t apacherocketmq/rocketmq-console:2.0.0
打开网址界面如下
代码实践
生产者 Producer 发送消息
引入依赖
<!-- RocketMQ客户端,版本与Broker保持一致 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
创建一个controller测试发送
@RestController
public class ProviderController {
Logger logger = LoggerFactory.getLogger(ProviderController.class);
@GetMapping(value = "/send_s1_tax")
public String send1() throws MQClientException {
//创建DefaultMQProducer消息生产者对象
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
//设置NameServer节点地址,多个节点间用分号分割
producer.setNamesrvAddr("192.168.31.200:9876;192.168.31.201:9876");
//与NameServer建立长连接
producer.start();
try {
//发送一百条数据
for(int i = 0 ; i< 100 ; i++) {
//数据正文
String data = "{\"title\":\"X市2021年度第一季度税务汇总数据\"}";
/*创建消息
Message消息三个参数
topic 代表消息主题,自定义为tax-data-topic说明是税务数据
tags 代表标志,用于消费者接收数据时进行数据筛选。2021S1代表2021年第一季度数据
body 代表消息内容
*/
Message message = new Message("tax-data-topic", "2021S1", data.getBytes());
//发送消息,获取发送结果
SendResult result = producer.send(message);
//将发送结果对象打印在控制台
logger.info("消息已发送:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
}
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
return "success";
}
}
启动应用调用接口后,可以在控制台看到
2021-04-19 19:06:16.630 INFO 51083 --- [nio-8000-exec-3] c.c.r.controller.ProviderController : 消息已发送:MsgId:20010DA820039217106D5F092230313AC78B18B4AAC260CBEA71024E,发送状态:SEND_OK
2021-04-19 19:06:16.697 INFO 51083 --- [nio-8000-exec-3] c.c.r.controller.ProviderController : 消息已发送:MsgId:20010DA820039217106D5F092230313AC78B18B4AAC260CBEAB6024F,发送状态:SEND_OK
2021-04-19 19:06:16.777 INFO 51083 --- [nio-8000-exec-3] c.c.r.controller.ProviderController : 消息已发送:MsgId:20010DA820039217106D5F092230313AC78B18B4AAC260CBEAFA0250,发送状态:SEND_OK
2021-04-19 19:06:16.847 INFO 51083 --- [nio-8000-exec-3] c.c.r.controller.ProviderController : 消息已发送:MsgId:20010DA820039217106D5F092230313AC78B18B4AAC260CBEB490251,发送状态:SEND_OK
2021-04-19 19:06:16.953 INFO 51083 --- [nio-8000-exec-3] c.c.r.controller.ProviderController : 消息已发送:MsgId:20010DA820039217106D5F092230313AC78B18B4AAC260CBEB8F0252,发送状态:SEND_OK
2021-04-19 19:06:17.050 INFO 51083 --- [nio-8000-exec-3] c.c.r.controller.ProviderController : 消息已发送:MsgId:20010DA820039217106D5F092230313AC78B18B4AAC260CBEBF90253,发送状态:SEND_OK
消费者 Consumer 接收消息
引入同样依赖
<!-- RocketMQ客户端,版本与Broker保持一致 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
创建一个controller测试接收
@SpringBootApplication
public class RocketmqConsumerApplication {
private static Logger logger = LoggerFactory.getLogger(RocketmqConsumerApplication.class);
public static void main(String[] args) throws MQClientException {
SpringApplication.run(RocketmqConsumerApplication.class, args);
//创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
//设置NameServer节点
consumer.setNamesrvAddr("192.168.31.200:9876;192.168.31.201:9876");
/*订阅主题,
consumer.subscribe包含两个参数:
topic: 说明消费者从Broker订阅哪一个主题,这一项要与Provider保持一致。
subExpression: 子表达式用于筛选tags。
同一个主题下可以包含很多不同的tags,subExpression用于筛选符合条件的tags进行接收。
例如:设置为*,则代表接收所有tags数据。
例如:设置为2020S1,则Broker中只有tags=2020S1的消息会被接收,而2020S2就会被排除在外。
*/
consumer.subscribe("tax-data-topic", "*");
//创建监听,当有新的消息监听程序会及时捕捉并加以处理。
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//批量数据处理
for (MessageExt msg : msgs) {
logger.info("消费者消费数据:"+new String(msg.getBody()));
}
//返回数据已接收标识
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者,与Broker建立长连接,开始监听。
consumer.start();
}
}
当应用启动后,Provider 产生新消息的同时,Consumer 端就会立即消费掉,控制台产生输出。
2021-04-19 13:01:57.636 INFO 51389 --- [MessageThread_2] c.c.c.RocketmqConsumerApplication : 消费者消费数据:{"title":"X市2021年度第一季度税务汇总数据"}
2021-04-19 13:01:57.692 INFO 51389 --- [MessageThread_1] c.c.c.RocketmqConsumerApplication : 消费者消费数据:{"title":"X市2021年度第一季度税务汇总数据"}
2021-04-19 13:01:57.719 INFO 51389 --- [essageThread_18] c.c.c.RocketmqConsumerApplication : 消费者消费数据:{"title":"X市2021年度第一季度税务汇总数据"}
2021-04-19 13:01:57.728 INFO 51389 --- [essageThread_10] c.c.c.RocketmqConsumerApplication : 消费者消费数据:{"title":"X市2021年度第一季度税务汇总数据"}
2021-04-19 13:01:57.738 INFO 51389 --- [MessageThread_6] c.c.c.RocketmqConsumerApplication : 消费者消费数据:{"title":"X市2021年度第一季度税务汇总数据"}
在可视化界面也可以看到变化。
Spring Cloud 生态中还提供了 Spring Cloud Stream
模块,允许程序员采用“声明式”的开发方式实现与 MQ 更轻松的接入。