目前了解到的Rust MQTT项目有:
ntex-rs/ntex-mqtt、crates/ntex-mqtt
cargo add paho-mqtt = "0.12"
、fn is_musl() -> bool {
std::env::var("CARGO_CFG_TARGET_ENV").unwrap() == "musl"
}
rmqtt/rmqtt:355 star,871 commits
paho.mqtt.testing
项目完成。本次集成作为 Client 使用,选择了最前面的 bytebeamio/rumqtt 。
use std::sync::Arc;
use std::time::Duration;
use once_cell::sync::OnceCell;
use rumqttc::{AsyncClient, MqttOptions};
use serde::{Deserialize, Serialize};
pub static MQTT_CLIENT: OnceCell<Arc<AsyncClient>> = OnceCell::new();
#[derive(Debug, Default, Validate, Serialize, Deserialize)]
pub struct MqttMessage {
pub addr: Option<String>,
#[validate(range(min = 0, max = 1))]
pub action: u8,
}
pub async fn start() {
let mut mqtt_options = MqttOptions::new("rumqtt-async", "192.168.1.110", 1883);
mqtt_options.set_keep_alive(Duration::from_secs(5));
let (client, mut event_loop) = AsyncClient::new(mqtt_options, 100);
MQTT_CLIENT.get_or_init(|| { Arc::new(client) });
tokio::spawn(async move {
loop { while let Ok(notification) = event_loop.poll().await {}; }
});
log::info!("this point run ...");
}
pub mod mqtt;
pub async fn start() {
// xxx others initialization
mqtt::start().await;
}
pub mod switch;
pub mod handler {
use actix_web::dev::HttpServiceFactory;
use actix_web::web;
use crate::module::{switch};
pub fn api_routes() -> impl HttpServiceFactory {
web::scope("")
.service(switch::api::index)
}
}
use rumqttc::QoS::AtLeastOnce;
use validator::Validate;
use crate::boot::mqtt::{MQTT_CLIENT, MqttMessage};
#[post("/switch/{id}")]
pub async fn index(Path(id): Path<String>, mut msg: Json<MqttMessage>) -> impl Responder {
let ok = vec!["Hello World!", "Hello World!"];
if let Err(e) = msg.validate() { return HttpResponse::BadRequest().json(e); }
msg.addr = Some(id);
let json = serde_json::to_string(&msg).unwrap();
MQTT_CLIENT.get().unwrap().publish("/hello", AtLeastOnce, false, json).await.unwrap();
HttpResponse::Ok().json(ok)
}
use actix_web::{App, HttpServer, middleware};
use booking::{boot, module};
#[actix_web::main]
async fn main() -> std::io::Result<()> {
boot::start().await;
HttpServer::new(move || App::new()
// TODO other initialization
.service(module::handler::api_routes())
// other routes
).bind("0.0.0.0:8080")?.run().await
}
代码贴完,剩下真是没啥可说的,拜了个 bye ~