Back
Featured image of post RocketMQ入门

RocketMQ入门

理论知识

RocketMQ 天然采用集群模式,常见的 RocketMQ 集群有三种形式:多 Master 模式多 Master 多 Slave- 异步复制模式多 Master 多 Slave- 同步双写模式,这三种模式各自的优缺点如下。

  • 多 Master 模式是配置最简单的模式,同时也是使用最多的形式。优点是单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,同步刷盘消息也不会丢失,性能也是最高的;缺点是单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
  • 多 Master 多 Slave 异步复制模式。每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息毫秒级延迟,即使磁盘损坏只会丢失少量消息,且消息实时性不会受影响。同时 Master 宕机后,消费者仍然可以从 Slave 消费,而且此过程对应用透明,不需要人工干预,性能同多 Master 模式几乎一样;缺点是 Master 宕机,磁盘损坏情况下会丢失少量消息。
  • 多 Master 多 Slave 同步双写模式,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,该模式数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;缺点是性能比异步复制模式低 10% 左右,发送单个消息的执行时间会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

部署

部署 NameServer 集群

直接wget下载

wget https://mirror-hk.koddos.net/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip

unzip 解压后,我们进入bin目录修改runserver.sh文件的虚拟机内存配置(因为默认的太大了吃不消),根据自己服务器实际情况更改

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

后台启动一下NameServer服务

nohup sh mqnamesrv &

NameServer 将占用 9876 端口提供服务,不要忘记在防火墙设置放行,另一台服务器也同样方式操作一波。

部署 Broker 集群

同样改小JVM内存,这次改bin下的runbroker.sh文件

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"

然后我们要配置一下Broker,在 conf 目录下,RocketMQ 已经给我们贴心的准备好三组集群配置模板:

  • 2m-2s-async 代表双主双从异步复制模式;
  • 2m-2s-sync 代表双主双从同步双写模式;
  • 2m-noslave 代表双主模式。

这里配置的是双主模式 ,所以去2m-noslave目录下,发现有broker-a.properties和broker-b.properties两个配置文件。

第一台主机修改broker-a.properties作为broker-a,第二台修改broker-b.properties作为broker-b,都添加上NameServer集群的地址并配上自己的公网IP:

#集群名称,同一个集群下的 broker 要求统一
brokerClusterName=DefaultCluster
#broker 名称
brokerName=broker-a
#broker 公网IP
brokerIP1=49.234.82.226
#brokerId=0 代表主节点,大于零代表从节点
brokerId=0
#删除日志文件时间点,默认凌晨 4 点
deleteWhen=04
#日志文件保留时间,默认 48 小时
fileReservedTime=48
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘,性能好宕机会丢数
#- SYNC_FLUSH 同步刷盘,性能较差不会丢数
flushDiskType=ASYNC_FLUSH
#末尾追加,NameServer 节点列表,使用分号分割
namesrvAddr=192.168.31.200:9876;192.168.31.201:9876

broker配置文件里一定要配个公网IP,否则默认是用内网的!

broker-b就只有brokerNamebrokerIP1不同

之后就可以运行了,到bin目录下

nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &

broker-a就用broker-a.properties启动,broker-b就用broker-b.properties启动

Broker 将占用 10911 端口提供服务,记得设置防火墙放行

测试部署结果

  • 修改bin下tools.shDjava.ext.dirs,添加jvm的ext绝对路径:
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.282.b08-1.el7_9.x86_64/jre/lib/ext"

如上,在最后用冒号隔开加上自己的jre/lib/ext 绝对路径

  • 使用 mqadmin 命令查看集群状态
sh mqadmin clusterList -n [NameServer服务器的IP]:9876

可以看到broker-a和broker-b的地址和端口

  • 使用 tools.sh工具通过生成演示数据来测试 MQ 实际的运行情况

测试生产者发送消息

export NAMESRV_ADDR=[NameServer服务器的IP]:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Producer

集群生效后,可以看到broker-a和broker-交替出现

测试消费者接收消息

# 设置环境变量
export NAMESRV_ADDR=[NameServer服务器的IP]:9876
# 运行测试程序
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

部署可视化界面 RocketMQ-Console

使用docker部署

# 拉取镜像
docker pull apacherocketmq/rocketmq-console:2.0.0
# 运行容器
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=[NameServer服务器的IP]:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t apacherocketmq/rocketmq-console:2.0.0

打开网址界面如下

代码实践

生产者 Producer 发送消息

引入依赖

<!-- RocketMQ客户端,版本与Broker保持一致 -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>

创建一个controller测试发送

@RestController
public class ProviderController {
    Logger logger = LoggerFactory.getLogger(ProviderController.class);
    @GetMapping(value = "/send_s1_tax")
    public String send1() throws MQClientException {
        //创建DefaultMQProducer消息生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("producer-group");
        //设置NameServer节点地址,多个节点间用分号分割
        producer.setNamesrvAddr("192.168.31.200:9876;192.168.31.201:9876");
        //与NameServer建立长连接
        producer.start();
        try {
            //发送一百条数据
            for(int i = 0 ; i< 100 ; i++) {
                //数据正文
                String data = "{\"title\":\"X市2021年度第一季度税务汇总数据\"}";
                /*创建消息
                    Message消息三个参数
                    topic 代表消息主题,自定义为tax-data-topic说明是税务数据
                    tags 代表标志,用于消费者接收数据时进行数据筛选。2021S1代表2021年第一季度数据
                    body 代表消息内容
                */
                Message message = new Message("tax-data-topic", "2021S1", data.getBytes());
                //发送消息,获取发送结果
                SendResult result = producer.send(message);
                //将发送结果对象打印在控制台
                logger.info("消息已发送:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
            }
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
        return "success";
    }
}

启动应用调用接口后,可以在控制台看到

2021-04-19 19:06:16.630  INFO 51083 --- [nio-8000-exec-3] c.c.r.controller.ProviderController      : 消息已发送MsgId:20010DA820039217106D5F092230313AC78B18B4AAC260CBEA71024E发送状态:SEND_OK
2021-04-19 19:06:16.697  INFO 51083 --- [nio-8000-exec-3] c.c.r.controller.ProviderController      : 消息已发送MsgId:20010DA820039217106D5F092230313AC78B18B4AAC260CBEAB6024F发送状态:SEND_OK
2021-04-19 19:06:16.777  INFO 51083 --- [nio-8000-exec-3] c.c.r.controller.ProviderController      : 消息已发送MsgId:20010DA820039217106D5F092230313AC78B18B4AAC260CBEAFA0250发送状态:SEND_OK
2021-04-19 19:06:16.847  INFO 51083 --- [nio-8000-exec-3] c.c.r.controller.ProviderController      : 消息已发送MsgId:20010DA820039217106D5F092230313AC78B18B4AAC260CBEB490251发送状态:SEND_OK
2021-04-19 19:06:16.953  INFO 51083 --- [nio-8000-exec-3] c.c.r.controller.ProviderController      : 消息已发送MsgId:20010DA820039217106D5F092230313AC78B18B4AAC260CBEB8F0252发送状态:SEND_OK
2021-04-19 19:06:17.050  INFO 51083 --- [nio-8000-exec-3] c.c.r.controller.ProviderController      : 消息已发送MsgId:20010DA820039217106D5F092230313AC78B18B4AAC260CBEBF90253发送状态:SEND_OK

消费者 Consumer 接收消息

引入同样依赖

<!-- RocketMQ客户端,版本与Broker保持一致 -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>

创建一个controller测试接收

@SpringBootApplication
public class RocketmqConsumerApplication {
    private static Logger logger = LoggerFactory.getLogger(RocketmqConsumerApplication.class);
    public static void main(String[] args) throws MQClientException {
        SpringApplication.run(RocketmqConsumerApplication.class, args);
        //创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
        //设置NameServer节点
        consumer.setNamesrvAddr("192.168.31.200:9876;192.168.31.201:9876");
        /*订阅主题,
        consumer.subscribe包含两个参数:
        topic: 说明消费者从Broker订阅哪一个主题,这一项要与Provider保持一致。
        subExpression: 子表达式用于筛选tags。
            同一个主题下可以包含很多不同的tags,subExpression用于筛选符合条件的tags进行接收。
            例如:设置为*,则代表接收所有tags数据。
            例如:设置为2020S1,则Broker中只有tags=2020S1的消息会被接收,而2020S2就会被排除在外。
        */
        consumer.subscribe("tax-data-topic", "*");
        //创建监听,当有新的消息监听程序会及时捕捉并加以处理。
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                //批量数据处理
                for (MessageExt msg : msgs) {
                    logger.info("消费者消费数据:"+new String(msg.getBody()));
                }
                //返回数据已接收标识
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者,与Broker建立长连接,开始监听。
        consumer.start();
    }
}

当应用启动后,Provider 产生新消息的同时,Consumer 端就会立即消费掉,控制台产生输出。

2021-04-19 13:01:57.636  INFO 51389 --- [MessageThread_2] c.c.c.RocketmqConsumerApplication        : 消费者消费数据:{"title":"X市2021年度第一季度税务汇总数据"}
2021-04-19 13:01:57.692  INFO 51389 --- [MessageThread_1] c.c.c.RocketmqConsumerApplication        : 消费者消费数据:{"title":"X市2021年度第一季度税务汇总数据"}
2021-04-19 13:01:57.719  INFO 51389 --- [essageThread_18] c.c.c.RocketmqConsumerApplication        : 消费者消费数据:{"title":"X市2021年度第一季度税务汇总数据"}
2021-04-19 13:01:57.728  INFO 51389 --- [essageThread_10] c.c.c.RocketmqConsumerApplication        : 消费者消费数据:{"title":"X市2021年度第一季度税务汇总数据"}
2021-04-19 13:01:57.738  INFO 51389 --- [MessageThread_6] c.c.c.RocketmqConsumerApplication        : 消费者消费数据:{"title":"X市2021年度第一季度税务汇总数据"}

在可视化界面也可以看到变化。

Spring Cloud 生态中还提供了 Spring Cloud Stream 模块,允许程序员采用“声明式”的开发方式实现与 MQ 更轻松的接入。

comments powered by Disqus
一辈子热爱技术
Built with Hugo
Theme Stack designed by Jimmy
gopher