Erlo

RocketMQ 4.7.1 环境搭建、集群、MQ整合SpringBoot

2020-10-26 00:30:19 发布   915 浏览  
页面报错/反馈
收藏 点赞

导读

  之前学过ActiveMQ但是并发量不是很大点我直达,所以又学阿里开源的RocketMQ,据说队列可以堆积亿级别。下面是网上找的消息队列对比图,仅供参考

部署

官网

点我直达

前置条件

  1. 推荐使用64位操作系统,建议使用Linux / Unix / Mac;
  2. 64位JDK 1.8+;
  3. Maven 3.2.x;
  4. Git;
  5. 适用于Broker服务器的内存4G +可用磁盘

下载

地址:https://downloads.apache.org/rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip 

百度云盘:

链接: https://pan.baidu.com/s/1luq_MwxSn8k_bugrnQSJWg  密码: varj

安装依赖项

  1. jdk:点我直达
  2. maven:点我直达
  3. git安装:yum install -y git

export JAVA_HOME=/opt/soft/jdk1.8.0_202
export PATH=$JAVA_HOME/bin:$PATH
export CLASPATH=.:$JAVA_home/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
export MAVEN_HOME=/opt/soft/apache-maven-3.6.3
export PATH=$PATH:$MAVEN_HOME/bin

mq上传至linux

解压

 

maven编译

 

启动NameServer

后台启动方式

nohup sh bin/mqnamesrv &

NameServer启动时内存不足(问题解决)

找到runserver.sh 修改JAVA_OPT

vim /bin/runserver.sh配置

启动Broker

nohup sh bin/mqbroker -n localhost:9876 &

语法:nohup sh bin/mqbroker -n NameServer服务ip地址

 

Broker内存不足(问题解决)

找到runbroker.sh 修改JAVA_OPT

vim /bin/runbroker.sh配置

服务都启动成功

 

模拟消费

export NAMESRV_ADDR=localhost:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

开2个控制台,连接通一台linux

注意

  NameServer默认端口号:9876;broker默认端口号:10911

可视化控制台

官网地址

点我直达

百度云盘

链接: https://pan.baidu.com/s/1mdEGkq-JBTy1wtNmFPkmDg  密码: v6bq

解压

安装编译

进入:/opt/soft/rocketmq-externals-master/rocketmq-console
编译: mvn clean package -Dmaven.test.skip=true

修改appliccation.properties的rocketmq.config.namesrvAddr

编译打包

启动

  进入target目录,启动java -jar

守护进程启动: nohup java -jar rocketmq-console-ng-2.0.0.jar &

SpringBoot整合RocketMQ(生产者)

创建SpringBoot项目

点我直达

项目结构

加入依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <#gth# lookup parent from repository -->
    </parent>
    <groupId>com.ybchen</groupId>
    <artifactId>ybchen-mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>ybchen-mq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <#gth#注意: 这里的版本,要和部署在服务器上的版本号一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.7.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

PayProducer.java

package com.ybchen.ybchenmq.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

/**
 * 消息生产者
 */
@Component
public class PayProducer {
    /**
     * 生产者所属的组
     */
    private String producerGroup = "pay_group";
    /**
     * MQ的地址,注意需开放端口号或者关闭防火墙
     */
    private String nameServerAddr = "192.168.199.100:9876";
    private DefaultMQProducer producer;

    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址以;隔开
        //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876")
        producer.setNamesrvAddr(nameServerAddr);
        start();
    }

    /**
     * 获取生产者
     * @return
     */
    public DefaultMQProducer getProducer() {
        return this.producer;
    }

    /**
     * 开启,对象在使用之前必须要调用一次,只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭,一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown() {
        this.producer.shutdown();
    }
}

PayController.java

package com.ybchen.ybchenmq.controller;

import com.ybchen.ybchenmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName:PayController
 * @Description:支付
 * @Author:chenyb
 * @Date:2020/10/18 2:47 下午
 * @Versiion:1.0
 */
@RestController
@RequestMapping("/api/v1")
public class PayController {
    @Autowired
    private PayProducer payProducer;

    private static final String TOPIC = "ybchen_pay_topic";

    /**
     * 支付回调
     *
     * @param text
     * @return
     */
    @RequestMapping("pay_cb")
    public Object callback(String text) {
        /**
         * String topic:话题
         * String tags:二级分类
         * byte[] body:body消息字节数组
         */
        Message message = new Message(TOPIC,"tag_a",("hello ybchen ==>"+text).getBytes());
        try {
            SendResult send = payProducer.getProducer().send(message);
            System.out.println("send------>"+send);
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "ok";
    }
}

测试

常见错误

错误一

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
sendDefaultImpl call timeout

原因:阿里云存在多网卡,rocketmq会根据当前网卡选择一个IP使用,当你的机器有多块网卡时,很可能会有问题,比如,机器上有两个ip,一个公网ip,一个私网ip,因此需要配置broker.conf指定当前公网的ip,然后重启broker


修改配置:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/broker.conf
新增这个配置:brokerIP1=xxx.xxx.xxx.xxx

启动命令:nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &

错误2

MQClientException: No route info of this topic, TopicTest1

原因:Broker 紧追自动创建Topic,且用户没有通过手工方式创建此Topic,或者broker和Nameserver网络不通

解决:
    通过sh bin/mqbroker -m 查看配置
    autoCreateTopicEnable=true 则自动创建Topic

Centos 7 关闭防火墙:systemctl stop firewalld

错误3

控制台查看不了数据,提示连接10909错误

原因:Rocket默认开启了VIP通道,VPI通道端口号为10911-2=10909

解决:阿里云安全组添加一个端口:10909

错误4

  无法自动创建topic:客户端版本要和服务端版本保持一致

服务器上装的是4.7.1

引入依赖项时
        <#gth#注意: 这里的版本,要和部署在服务器上的版本号一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.7.1</version>
        </dependency>

检索消息发送

SpringBoot整合RocketMQ(消费者)

创建SpringBoot项目

 

项目结构

加入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <#gth# lookup parent from repository -->
    </parent>
    <groupId>com.ybchen</groupId>
    <artifactId>ybchen-mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>ybchen-mq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <#gth#注意: 这里的版本,要和部署在服务器上的版本号一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</
登录查看全部

参与评论

评论留言

还没有评论留言,赶紧来抢楼吧~~

手机查看

返回顶部

给这篇文章打个标签吧~

棒极了 糟糕透顶 好文章 PHP JAVA JS 小程序 Python SEO MySql 确认