Erlo

mosquitto【MQTT消息代理软件】

2025-09-02 10:29:02 发布   24 浏览  
页面报错/反馈
收藏 点赞

mosquitto简述

概述

mosquitto是一款开源的MQTT消息代理(服务器)软件,实现了MQTT协议版本3.1和3.1.1,提供轻量级的,支持可发布/可订阅的的消息推送模式。

官网:Eclipse Mosquitto

API:mosquitto.h

安装

sudo apt install mosquitto  # 安装服务端
sudo apt install mosquitto-clients  # 安装客户端
sudo apt-get install libmosquitto-dev # C风格开发依赖包
sudo apt-get install libmosquittopp-dev  # C++风格封装的libmosquitto开发包

服务端指令

  • 查看服务状态

    sudo service mosquitto status
    
  • 启动服务器

    sudo service mosquitto start
    
  • 关闭服务器

    sudo service mosquitto stop
    
  • 启动服务器并实时显示所有日志

    mosquitto -v
    
  • 根据指定的配置文件启动服务器

    mosquitto -c /etc/mosquitto/mosquitto.conf -d
    
    • -c : 指定配置文件
    • -d : 后台运行
  • 指定端口启动服务器,默认端口是1883,最多指定10次

    mosquitto -p 1884
    

配置文件

/etc/mosquitto/mosquitto.conf

# 消息持久存储
persistence true
persistence_location /var/lib/mosquitto/

日志文件

log_dest file /var/log/mosquitto/mosquitto.log

其他配置

include_dir /etc/mosquitto/conf.d

禁止匿名访问

allow_anonymous false

认证配置,即登录账号信息的文件

password_file /etc/mosquitto/pwfile

权限配置

acl_file /etc/mosquitto/aclfile

监听的端口

listener 1883

客户端指令

订阅主题

mosquitto_sub -t topic

发布主题

mosquitto_pub -t topic -m '消息'

Mosquitto库编程(C风格)

发布信息客户端

mqtt_pub.c

#include 
#include 
#include 
#include 

define HOST "127.0.0.1"

define PORT 1883

define KEEP_ALIVE_TIME 60

define MSG_MAX_SIZE 512

bool session = true;

int main(void)
{
int err = 0;
printf("mqtt publish init ...n");
struct mosquitto* mosq = NULL;
char buff[MSG_MAX_SIZE];

// 初始化
err = mosquitto_lib_init();
if(err<0)
{
    printf("mosquitto lib int fail...");
    return -1;
}

// 创建客户端
mosq = mosquitto_new(NULL, session, NULL);
if(mosq==NULL)
{
    printf("create client failed...n");
    err = -1;
    mosquitto_lib_cleanup();
    return -1;
}

// 客户端连接broker
err = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE_TIME);
if(err<0)
{
    printf("connect fail");
    mosquitto_destroy(mosq);
    return -1;
}

// 启动事件循环(启动独立线程处理mosquitto事件)
err = mosquitto_loop_start(mosq);
if(err!=MOSQ_ERR_SUCCESS)
{
    printf("mosquitto loop errorn");
    mosquitto_disconnect(mosq);
    return -1;
}

// 发布信息到test主题
strncpy(buff, "hello world!", 13);
mosquitto_publish(mosq, NULL, "test", strlen(buff)+1, buff, 0, 0);

mosquitto_disconnect(mosq);
mosquitto_loop_stop(mosq, true);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();

return 0;

}

订阅信息客户端

mqtt_sub.c

#include 
#include 
#include 
#include 

define HOST "127.0.0.1"

define PORT 1883

define KEEP_ALIVE 60

bool session = true;

// 订阅主题成功时回调
void mqtt_subscribe_callback(struct mosquitto *mosq,
void *userdata, int mid, int qos_count, const int *granted_qos)
{
int i;
printf("subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i printf(", %d", granted_qos[i]);
}
printf("n");
}

//消息回调函数,收到订阅的消息后调用
void mqtt_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
if (message->payloadlen){
printf("%s: %s n", message->topic, (char *)message->payload);
}else{
printf("%s (null)n",message->topic);
}
}

//mqtt连接回调
void mqtt_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
int ret;
if (!result){
ret = mosquitto_subscribe(mosq, NULL, "test", 2);
if(ret printf("Subscription failedn");
}else{
printf("Subscription succeededn");
}
}else{
printf("connect failedn");
}
}

//日志回调函数
void mqtt_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
printf("log__ %sn", str);
}

int main(void)
{

int err = 0;
printf("mqtt client init...n");

struct mosquitto *mosq = NULL;

//libmosquitto 库初始化
err = mosquitto_lib_init();
if (err < 0){
    printf("mosquitto lib int fail...");
    return err;
}

//创建mosquitto客户端
mosq = mosquitto_new(NULL,session,NULL);
if (mosq == NULL){
    printf("create client failed...n");
    err = -1;
    mosquitto_lib_cleanup();
    return err;
}

//设置回调函数
mosquitto_log_callback_set(mosq, mqtt_log_callback);
mosquitto_connect_callback_set(mosq, mqtt_connect_callback);
mosquitto_message_callback_set(mosq, mqtt_message_callback);
mosquitto_subscribe_callback_set(mosq, mqtt_subscribe_callback);

//客户端连接服务器
err = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
if (err < 0){
    printf("connect fail");    
    mosquitto_destroy(mosq);
    return err;
}

//启动事件循环,永久阻塞
err = mosquitto_loop_forever(mosq, -1, 1);
if (err < 0){
    printf("mosquitto loop fail");
    mosquitto_disconnect(mosq);
    return err;
}

mosquitto_disconnect(mosq);
mosquitto_loop_stop(mosq, false);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();

return 0;

}

编译

CMakeLists.txt

cmake_minimum_required(VERSION 3.16)

project(mosquitto_demo)

find_package(PkgConfig REQUIRED)

pkg_check_modules(MOSQ libmosquitto)

set(PUB_SOURCES mqtt_pub.c)
set(SUB_SOURCES mqtt_sub.c)

add_executable(subclient ${SUB_SOURCES})
add_executable(pubclient ${PUB_SOURCES})

target_include_directories(subclient PRIVATE ${MOSQ_INCLUDE_DIRS})
target_link_libraries(subclient PRIVATE ${MOSQ_LIBRARIES})

target_include_directories(pubclient PRIVATE ${MOSQ_INCLUDE_DIRS})
target_link_libraries(pubclient PRIVATE ${MOSQ_LIBRARIES})

Mosquittopp库编程(C++风格)

订阅信息客户端

subclient.cc

#include
#include
#include

const char* topic = "topic1";
const char* host = "127.0.0.1";
const int port = 1883;
const int alivetime = 60;

class MqttSubClient:public mosqpp::mosquittopp
{
public:
MqttSubClient(const char* id):mosquittopp(id){}
void on_connect(int rc) override;
void on_disconnect(int rc) override;
void on_subscribe(int mid, int qos_count, const int* granted_qos) override;
void on_message(const struct mosquitto_message* message) override;
};

// 连接回调函数
void MqttSubClient::on_connect(int rc)
{
if(rc == MOSQ_ERR_SUCCESS)
{
std::cout // 订阅主题
subscribe(nullptr, topic, 1);
}
else
{
std::cerr }
}

// 断开连接回调函数
void MqttSubClient::on_disconnect(int rc)
{
std::cout }

// 订阅成功回调函数
void MqttSubClient::on_subscribe(int mid, int qos_count, const int* granted_qos)
{
std::cout }

// 消息处理回调函数
void MqttSubClient::on_message(const struct mosquitto_message* message)
{
bool match = false;
mosqpp::topic_matches_sub(topic, message->topic, &match);
if(match)
{
std::string recv(static_cast(message->payload), message->payloadlen);
std::couttopicmid }
}

int main()
{
// 初始化
mosqpp::lib_init();
MqttSubClient subclient("subclient1");
int rc;
rc = subclient.connect(host, port, alivetime);
if(rc == MOSQ_ERR_ERRNO)
{
std::cout }
else if(MOSQ_ERR_SUCCESS == rc)
{
// 启动事件循环,并阻塞
rc = subclient.loop_forever();
}

subclient.disconnect();
mosqpp::lib_cleanup();


return 0;

}

发布信息客户端

pubclient.cc

#include
#include
#include
#include

const char* topic = "topic1";
const char* host = "127.0.0.1";
const int port = 1883;
const int alivetime = 60;

class MqttPubClient:public mosqpp::mosquittopp
{
public:
MqttPubClient(const char* id):mosquittopp(id){}
/// @brief 连接broker时回调
/// @param rc 返回码
void on_connect(int rc) override;
/// @brief 断开连接时回调
/// @param rc 返回码
void on_disconnect(int rc) override;
/// @brief 消息发布成功时回调
/// @param mid
void on_publish(int mid) override;
/// @brief 发布消息
/// @param message 要发布的消息
/// @param qos 定义消息传输可靠性 0 1 2
void publish_message(std::string message, int qos);
};

void MqttPubClient::on_connect(int rc)
{
if(rc == MOSQ_ERR_SUCCESS)
{
std::cout }
else
{
std::cout }
}

void MqttPubClient::on_disconnect(int rc)
{
std::cout }

void MqttPubClient::on_publish(int mid)
{
std::cout }

void MqttPubClient::publish_message(std::string message, int qos)
{
int ret = publish(nullptr, topic, message.size(), message.c_str(), qos, false);
if(ret != MOSQ_ERR_SUCCESS)
{
std::cerr }
}

int main()
{
// 初始化
mosqpp::lib_init();
MqttPubClient publisher("cpp_publisher");

int rc = publisher.connect(host, port, alivetime);
if(rc != MOSQ_ERR_SUCCESS)
{
    std::cerr<<"connect error!"<<mosqpp::strerror(rc)<<std::endl;
    return -1;
}

// 启动异步事件循环
publisher.loop_start();

// 消息发布
std::string msg;
std::this_thread::sleep_for(std::chrono::milliseconds(5));  // 两个sleep是为了确保主线程的输出在后台线程的回调函数输出之后
std::cout<<"请输入要发布的消息:";
while(std::cin>>msg)
{
    publisher.publish_message(msg, 1);
    std::this_thread::sleep_for(std::chrono::milliseconds(5));
    std::cout<<"请输入要发布的消息:";
}

// 清理资源
publisher.loop_stop(true);
publisher.disconnect();
mosqpp::lib_cleanup();

return 0;

}

编译

CMakeLists.txt

cmake_minimum_required(VERSION 3.16)

project(mqtt_cpp_style)

find_package(PkgConfig REQUIRED)

pkg_check_modules(MOSQ libmosquittopp)

set(SUBSRC subclient.cc)
set(PUBSRC pubclient.cc)

add_executable(subclient ${SUBSRC})
add_executable(pubclient ${PUBSRC})

target_include_directories(subclient PRIVATE ${MOSQ_INCLUDE_DIRS})
target_link_libraries(subclient PRIVATE ${MOSQ_LIBRARIES})

target_include_directories(pubclient PRIVATE ${MOSQ_INCLUDE_DIRS})
target_link_libraries(pubclient PRIVATE ${MOSQ_LIBRARIES})

登录查看全部

参与评论

评论留言

还没有评论留言,赶紧来抢楼吧~~

手机查看

返回顶部

给这篇文章打个标签吧~

棒极了 糟糕透顶 好文章 PHP JAVA JS 小程序 Python SEO MySql 确认