• micropython实现mpy的ota(有用,推荐)


    ota到底是个啥功能,其实我也不一定理解的很准确,我认为就是网络升级功能,连上wifi升级程序,就这样。
    传统的嵌入式想升级确实比较费劲,因为没有文件管理系统,所以就得考虑专门给ota的分区,直接整个bin都重新烧录引导,这部分比较复杂,我也只是看个皮毛,说的不一定准,当然也不是今天的主题。
    所以我就在想,咱大MPY需要这种OTA么,好像并不需要,就是更新个py的事儿嘛,哪有这么复杂呢?结合uruquests库弄就完了!也省得搭服务器,直接建立个库多好啊。
    现在比较新的固件,我印象1.17以后都是了,官方固件自带urequests库,不过这个库也就一个文件,光杆司令,你完全可以直接拷贝到开发板上。
    然后github找了个类似的项目, senko。好家伙两年没更新了,也没啥大问题,可能关注度也不高。
    这个项目本来是用来更新github库的,我必须给他本土化一下,改成咱csdn的库,主要是不用看网络爸爸的脸色。
    我建了库,你们可以测试用
    库里啥也没有就一个main.py用来验证是否可行。
    折腾一晚上,开始卡在urequests.py总报错,后来才发现链接地址没搞对。。。

    总体步骤跟我来一遍:
    1 改boot.py这个是板子启动的第一个东西,我们把设置弄进去:

    import gc
    import machine
    import network
    
    def connect_wlan(ssid, password):
        sta_if = network.WLAN(network.STA_IF)
        ap_if = network.WLAN(network.AP_IF)
        sta_if.active(True)
        ap_if.active(False)
    
        if not sta_if.isconnected():
            print("Connecting to WLAN ({})...".format(ssid))
            sta_if.active(True)
            sta_if.connect(ssid, password)
            while not sta_if.isconnected():
                pass
    
        return True
    
    
    def main():
        gc.collect()
    
        # Wi-Fi credentials
        SSID = "jd_work"
        PASSWORD = "800080008000"
    
        connect_wlan(SSID, PASSWORD)
    
        # Install Senko from PyPi
        #upip.install("micropython-senko")
    
        import senko
        OTA = senko.Senko(user="jd3096", repo="micropython-ota-test", working_dir="app", files=["main.py"])
        
        print('ota init')
    
        if OTA.update():
            print("Updated to the latest version!")
            machine.reset()
            
    
    
    if __name__ == "__main__":
        main()
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    其实就是运行main.py之前,联网,并尝试用senko库更新
    核心就一句话:

    OTA = senko.Senko(user="jd3096", repo="micropython-ota-test", working_dir="app", files=["main.py"])
    
    • 1

    更加详细的参数可以自己看库,不大也不难

    2 官方用的是upip的方法安装senko库,但是网络是爹,何必求爸爸呢,所以直接放进去一个senko.py也一样,我做了一些适配csdn库的修改。

    import urequests
    import uhashlib
    import gc
    gc.collect()
    
    
    class Senko:
        #https://gitcode.net/jd3096/micropython-ota-test.git
        raw = "https://gitcode.net"
        #github = "https://gitcode.net"
    
        def __init__(self, user, repo, url=None, branch="-/raw/master", working_dir="app", files=["boot.py", "main.py"], headers={}):
            """Senko OTA agent class.
    
            Args:
                user (str): GitHub user.
                repo (str): GitHub repo to fetch.
                branch (str): GitHub repo branch. (master)
                working_dir (str): Directory inside GitHub repo where the micropython app is.
                url (str): URL to root directory.
                files (list): Files included in OTA update.
                headers (list, optional): Headers for urequests.
            """
            self.base_url = "{}/{}/{}".format(self.raw, user, repo) if user else url.replace(self.github, self.raw)
            self.url = url if url is not None else "{}/{}/{}".format(self.base_url, branch, working_dir)
            self.headers = headers
            self.files = files
            print(self.base_url,self.url,self.headers,self.files)
    
        def _check_hash(self, x, y):
            x_hash = uhashlib.sha1(x.encode())
            y_hash = uhashlib.sha1(y.encode())
    
            x = x_hash.digest()
            y = y_hash.digest()
    
            if str(x) == str(y):
                return True
            else:
                return False
    
        def _get_file(self, url):
            gc.collect()
            print(url)
            payload = urequests.get(url)
            code = payload.status_code
    
            if code == 200:
                return payload.text
            else:
                return None
    
        def _check_all(self):
            changes = []
    
            for file in self.files:
                latest_version = self._get_file(self.url + "/" + file)
                if latest_version is None:
                    continue
    
                try:
                    with open(file, "r") as local_file:
                        local_version = local_file.read()
                except:
                    local_version = ""
    
                if not self._check_hash(latest_version, local_version):
                    changes.append(file)
    
            return changes
    
        def fetch(self):
            """Check if newer version is available.
    
            Returns:
                True - if is, False - if not.
            """
            if not self._check_all():
                return False
            else:
                return True
    
        def update(self):
            """Replace all changed files with newer one.
    
            Returns:
                True - if changes were made, False - if not.
            """
            changes = self._check_all()
    
            for file in changes:
                with open(file, "w") as local_file:
                    local_file.write(self._get_file(self.url + "/" + file))
    
            if changes:
                return True
            else:
                return False
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    3 板子的工作完事儿了,剩下的就是改变仓库的内容。真心方便,优雅!!!

    做这玩意有啥用呢?
    以前写程序的方式:
    板子连接电脑——打开thonny——修改代码——保存
    现在的方式:
    首次设置后
    电脑端改好代码——push到仓库——板子上按一下重启

    再也不用线了,只要板子有网就行,多优雅。
    还有很重要的一点,如果部署一个板子,插线连接ok,如果要同时更新几十个几百个板子,而且板子分散在各种,这种ota的优势就凸显出来了。
    评论区提到的配网问题,也是一个方向,目前我所了解的配网就是esp32建立服务器热点,其他设备连接,打开指定ip的浏览器,输入配网信息,还不够优雅,有个叫强制门户的技术,类似机场的wifi,连接自动弹出网页填写信息,这个能更好些,当然除了mpy,esp32可以微信小程序配网之类的,关于配网,有时间我专门开一期。

  • 相关阅读:
    【UVM基础】5、sequence和sequencer相关宏
    数据结构之索引查找(分块查找)
    主成分分析(机器学习)
    【斗破年番】紫研新形象,萧炎终成翻海印,救援月媚,三宗决战
    MySQL 索引测试
    【SQL刷题】Day2----SQL语法基础查询
    【Linux】《Linux命令行与shell脚本编程大全 (第4版) 》笔记-Chapter16-脚本控制
    java--拼图游戏
    【无标题】
    RackNerd 圣何塞 VPS 测评
  • 原文地址:https://blog.csdn.net/jd3096/article/details/126594241