mosquitto是一款开源的MQTT消息代理(服务器)软件,实现了MQTT协议版本3.1和3.1.1,提供轻量级的,支持可发布/可订阅的的消息推送模式。
API:mosquitto.h
sudo apt install mosquitto # 安装服务端
sudo apt install mosquitto-clients # 安装客户端
sudo apt-get install libmosquitto-dev # C风格开发依赖包
sudo apt-get install libmosquittopp-dev # C++风格封装的libmosquitto开发包
查看服务状态
sudo service mosquitto status
启动服务器
sudo service mosquitto start
关闭服务器
sudo service mosquitto stop
启动服务器并实时显示所有日志
mosquitto -v
根据指定的配置文件启动服务器
mosquitto -c /etc/mosquitto/mosquitto.conf -d
-c
: 指定配置文件-d
: 后台运行指定端口启动服务器,默认端口是1883,最多指定10次
mosquitto -p 1884
/etc/mosquitto/mosquitto.conf
# 消息持久存储 persistence true persistence_location /var/lib/mosquitto/
日志文件
log_dest file /var/log/mosquitto/mosquitto.log
其他配置
include_dir /etc/mosquitto/conf.d
禁止匿名访问
allow_anonymous false
认证配置,即登录账号信息的文件
password_file /etc/mosquitto/pwfile
权限配置
acl_file /etc/mosquitto/aclfile
监听的端口
listener 1883
mosquitto_sub -t topic
mosquitto_pub -t topic -m '消息'
mqtt_pub.c
#include
#include #include #include define HOST "127.0.0.1"
define PORT 1883
define KEEP_ALIVE_TIME 60
define MSG_MAX_SIZE 512
bool session = true;
int main(void)
{
int err = 0;
printf("mqtt publish init ...n");
struct mosquitto* mosq = NULL;
char buff[MSG_MAX_SIZE];// 初始化 err = mosquitto_lib_init(); if(err<0) { printf("mosquitto lib int fail..."); return -1; } // 创建客户端 mosq = mosquitto_new(NULL, session, NULL); if(mosq==NULL) { printf("create client failed...n"); err = -1; mosquitto_lib_cleanup(); return -1; } // 客户端连接broker err = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE_TIME); if(err<0) { printf("connect fail"); mosquitto_destroy(mosq); return -1; } // 启动事件循环(启动独立线程处理mosquitto事件) err = mosquitto_loop_start(mosq); if(err!=MOSQ_ERR_SUCCESS) { printf("mosquitto loop errorn"); mosquitto_disconnect(mosq); return -1; } // 发布信息到test主题 strncpy(buff, "hello world!", 13); mosquitto_publish(mosq, NULL, "test", strlen(buff)+1, buff, 0, 0); mosquitto_disconnect(mosq); mosquitto_loop_stop(mosq, true); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0;
}
mqtt_sub.c
#include
#include #include #include define HOST "127.0.0.1"
define PORT 1883
define KEEP_ALIVE 60
bool session = true;
// 订阅主题成功时回调
void mqtt_subscribe_callback(struct mosquitto *mosq,
void *userdata, int mid, int qos_count, const int *granted_qos)
{
int i;
printf("subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i printf(", %d", granted_qos[i]);
}
printf("n");
}//消息回调函数,收到订阅的消息后调用
void mqtt_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
if (message->payloadlen){
printf("%s: %s n", message->topic, (char *)message->payload);
}else{
printf("%s (null)n",message->topic);
}
}//mqtt连接回调
void mqtt_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
int ret;
if (!result){
ret = mosquitto_subscribe(mosq, NULL, "test", 2);
if(ret printf("Subscription failedn");
}else{
printf("Subscription succeededn");
}
}else{
printf("connect failedn");
}
}//日志回调函数
void mqtt_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
printf("log__ %sn", str);
}int main(void)
{int err = 0; printf("mqtt client init...n"); struct mosquitto *mosq = NULL; //libmosquitto 库初始化 err = mosquitto_lib_init(); if (err < 0){ printf("mosquitto lib int fail..."); return err; } //创建mosquitto客户端 mosq = mosquitto_new(NULL,session,NULL); if (mosq == NULL){ printf("create client failed...n"); err = -1; mosquitto_lib_cleanup(); return err; } //设置回调函数 mosquitto_log_callback_set(mosq, mqtt_log_callback); mosquitto_connect_callback_set(mosq, mqtt_connect_callback); mosquitto_message_callback_set(mosq, mqtt_message_callback); mosquitto_subscribe_callback_set(mosq, mqtt_subscribe_callback); //客户端连接服务器 err = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE); if (err < 0){ printf("connect fail"); mosquitto_destroy(mosq); return err; } //启动事件循环,永久阻塞 err = mosquitto_loop_forever(mosq, -1, 1); if (err < 0){ printf("mosquitto loop fail"); mosquitto_disconnect(mosq); return err; } mosquitto_disconnect(mosq); mosquitto_loop_stop(mosq, false); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0;
}
CMakeLists.txt
cmake_minimum_required(VERSION 3.16)
project(mosquitto_demo)
find_package(PkgConfig REQUIRED)
pkg_check_modules(MOSQ libmosquitto)
set(PUB_SOURCES mqtt_pub.c)
set(SUB_SOURCES mqtt_sub.c)add_executable(subclient ${SUB_SOURCES})
add_executable(pubclient ${PUB_SOURCES})target_include_directories(subclient PRIVATE ${MOSQ_INCLUDE_DIRS})
target_link_libraries(subclient PRIVATE ${MOSQ_LIBRARIES})
target_include_directories(pubclient PRIVATE ${MOSQ_INCLUDE_DIRS})
target_link_libraries(pubclient PRIVATE ${MOSQ_LIBRARIES})
subclient.cc
#include
#include #include const char* topic = "topic1";
const char* host = "127.0.0.1";
const int port = 1883;
const int alivetime = 60;class MqttSubClient:public mosqpp::mosquittopp
{
public:
MqttSubClient(const char* id):mosquittopp(id){}
void on_connect(int rc) override;
void on_disconnect(int rc) override;
void on_subscribe(int mid, int qos_count, const int* granted_qos) override;
void on_message(const struct mosquitto_message* message) override;
};// 连接回调函数
void MqttSubClient::on_connect(int rc)
{
if(rc == MOSQ_ERR_SUCCESS)
{
std::cout // 订阅主题
subscribe(nullptr, topic, 1);
}
else
{
std::cerr }
}// 断开连接回调函数
void MqttSubClient::on_disconnect(int rc)
{
std::cout }// 订阅成功回调函数
void MqttSubClient::on_subscribe(int mid, int qos_count, const int* granted_qos)
{
std::cout }// 消息处理回调函数
void MqttSubClient::on_message(const struct mosquitto_message* message)
{
bool match = false;
mosqpp::topic_matches_sub(topic, message->topic, &match);
if(match)
{
std::string recv(static_cast(message->payload), message->payloadlen);
std::couttopicmid }
}int main()
{
// 初始化
mosqpp::lib_init();
MqttSubClient subclient("subclient1");
int rc;
rc = subclient.connect(host, port, alivetime);
if(rc == MOSQ_ERR_ERRNO)
{
std::cout }
else if(MOSQ_ERR_SUCCESS == rc)
{
// 启动事件循环,并阻塞
rc = subclient.loop_forever();
}subclient.disconnect(); mosqpp::lib_cleanup(); return 0;
}
pubclient.cc
#include
#include #include #include const char* topic = "topic1";
const char* host = "127.0.0.1";
const int port = 1883;
const int alivetime = 60;class MqttPubClient:public mosqpp::mosquittopp
{
public:
MqttPubClient(const char* id):mosquittopp(id){}
/// @brief 连接broker时回调
/// @param rc 返回码
void on_connect(int rc) override;
/// @brief 断开连接时回调
/// @param rc 返回码
void on_disconnect(int rc) override;
/// @brief 消息发布成功时回调
/// @param mid
void on_publish(int mid) override;
/// @brief 发布消息
/// @param message 要发布的消息
/// @param qos 定义消息传输可靠性 0 1 2
void publish_message(std::string message, int qos);
};void MqttPubClient::on_connect(int rc)
{
if(rc == MOSQ_ERR_SUCCESS)
{
std::cout }
else
{
std::cout }
}void MqttPubClient::on_disconnect(int rc)
{
std::cout }void MqttPubClient::on_publish(int mid)
{
std::cout }void MqttPubClient::publish_message(std::string message, int qos)
{
int ret = publish(nullptr, topic, message.size(), message.c_str(), qos, false);
if(ret != MOSQ_ERR_SUCCESS)
{
std::cerr }
}int main()
{
// 初始化
mosqpp::lib_init();
MqttPubClient publisher("cpp_publisher");int rc = publisher.connect(host, port, alivetime); if(rc != MOSQ_ERR_SUCCESS) { std::cerr<<"connect error!"<<mosqpp::strerror(rc)<<std::endl; return -1; } // 启动异步事件循环 publisher.loop_start(); // 消息发布 std::string msg; std::this_thread::sleep_for(std::chrono::milliseconds(5)); // 两个sleep是为了确保主线程的输出在后台线程的回调函数输出之后 std::cout<<"请输入要发布的消息:"; while(std::cin>>msg) { publisher.publish_message(msg, 1); std::this_thread::sleep_for(std::chrono::milliseconds(5)); std::cout<<"请输入要发布的消息:"; } // 清理资源 publisher.loop_stop(true); publisher.disconnect(); mosqpp::lib_cleanup(); return 0;
}
CMakeLists.txt
cmake_minimum_required(VERSION 3.16)
project(mqtt_cpp_style)
find_package(PkgConfig REQUIRED)
pkg_check_modules(MOSQ libmosquittopp)
set(SUBSRC subclient.cc)
set(PUBSRC pubclient.cc)add_executable(subclient ${SUBSRC})
add_executable(pubclient ${PUBSRC})target_include_directories(subclient PRIVATE ${MOSQ_INCLUDE_DIRS})
target_link_libraries(subclient PRIVATE ${MOSQ_LIBRARIES})
target_include_directories(pubclient PRIVATE ${MOSQ_INCLUDE_DIRS})
target_link_libraries(pubclient PRIVATE ${MOSQ_LIBRARIES})
参与评论
手机查看
返回顶部