• 十七、Rust集成MQTT Client


    1、信息整理

            目前了解到的Rust MQTT项目有:

    • bytebeamio/rumqtt

      • 1.3k star、717 commits、Contributors 78、
      • tokio、futures、tls、
      • rumqttc(client):cargo add rumqttc
        • https://github.com/bytebeamio/rumqtt/tree/main/rumqttc
      • rumqttd(server):docker run -p 1883:1883 -p 1884:1884 -it bytebeamio/rumqttd
    • ntex-rs/ntex-mqttcrates/ntex-mqtt

      • 258 star、504 commits、Contributors 15
      • ntex、MQTT Client/Server、v5 and v3.1.1 protocols
    • eclipse/paho.mqtt.rust

      • 463 star、368 commits、
      • 异步、SSL/TLS、cargo add paho-mqtt = "0.12"
      • 但看自述文档所述,用于 musl linux 环境,有代码侵入。
      fn is_musl() -> bool {
          std::env::var("CARGO_CFG_TARGET_ENV").unwrap() == "musl"
      }
      
      • 1
      • 2
      • 3
    • rmqtt/rmqtt:355 star,871 commits

      • 基于 ( tokio、ntex、ntex-mqtt )
      • 一个纯服务端,遍观项目,貌似不能做 client 使用。自身的测试,使用 paho.mqtt.testing 项目完成。
      • 分布式集群(raft),TLS,WebSocket,WebSocket-TLS,x86/Arm
      • 单个服务节点上处理百万级别的并发客户端。集群提供相同的连接量、吞吐量,应该只是解决高可用问题。

    2、开始集成

           本次集成作为 Client 使用,选择了最前面的 bytebeamio/rumqtt

    • boot/mqtt.rs
    use std::sync::Arc;
    use std::time::Duration;
    
    use once_cell::sync::OnceCell;
    use rumqttc::{AsyncClient, MqttOptions};
    use serde::{Deserialize, Serialize};
    
    pub static MQTT_CLIENT: OnceCell<Arc<AsyncClient>> = OnceCell::new();
    
    #[derive(Debug, Default, Validate, Serialize, Deserialize)]
    pub struct MqttMessage {
        pub addr: Option<String>,
        #[validate(range(min = 0, max = 1))]
        pub action: u8,
    }
    
    pub async fn start() {
        let mut mqtt_options = MqttOptions::new("rumqtt-async", "192.168.1.110", 1883);
        mqtt_options.set_keep_alive(Duration::from_secs(5));
        let (client, mut event_loop) = AsyncClient::new(mqtt_options, 100);
        MQTT_CLIENT.get_or_init(|| { Arc::new(client) });
    
        tokio::spawn(async move {
            loop { while let Ok(notification) = event_loop.poll().await {}; }
        });
    
        log::info!("this point run ...");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • boot/mod.rs
    pub mod mqtt;
    
    pub async fn start() {
        // xxx others initialization
        mqtt::start().await;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • modules/mod.rs
    pub mod switch;
    
    pub mod handler {
        use actix_web::dev::HttpServiceFactory;
        use actix_web::web;
    
        use crate::module::{switch};
    
        pub fn api_routes() -> impl HttpServiceFactory {
            web::scope("")
                .service(switch::api::index)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • modules/switch/api.rs
    use rumqttc::QoS::AtLeastOnce;
    use validator::Validate;
    
    use crate::boot::mqtt::{MQTT_CLIENT, MqttMessage};
    
    #[post("/switch/{id}")]
    pub async fn index(Path(id): Path<String>, mut msg: Json<MqttMessage>) -> impl Responder {
        let ok = vec!["Hello World!", "Hello World!"];
    
        if let Err(e) = msg.validate() { return HttpResponse::BadRequest().json(e); }
        msg.addr = Some(id);
        let json = serde_json::to_string(&msg).unwrap();
        MQTT_CLIENT.get().unwrap().publish("/hello", AtLeastOnce, false, json).await.unwrap();
    
        HttpResponse::Ok().json(ok)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • main.rs
    use actix_web::{App, HttpServer, middleware};
    use booking::{boot, module};
    
    #[actix_web::main]
    async fn main() -> std::io::Result<()> {
        boot::start().await;
        HttpServer::new(move || App::new()
            // TODO other initialization
            .service(module::handler::api_routes())
            // other routes
        ).bind("0.0.0.0:8080")?.run().await
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

           代码贴完,剩下真是没啥可说的,拜了个 bye ~

  • 相关阅读:
    2023沈阳建筑大学计算机考研信息汇总
    react+electron从环境搭建到项目整合全过程
    Linux初始化mysql后外网无限制访问
    android逆向工程反编译指南(详细教程)
    获取dateTimePicker的数值
    微信公众号开发基本流程(记录初级流程)
    易航网址引导系统 v1.9 源码:去除弹窗功能的易航网址引导页管理系统
    PMP每日一练 | 考试不迷路-11.09(包含敏捷+多选)
    ElasticSearch+Kibana搭建与问题
    CarSim仿真快速入门(十九)—CarSim2021中ESC控制demo
  • 原文地址:https://blog.csdn.net/xuguangyuansh/article/details/134529708