• 2023-10-28 关于MICROPYTHON的OTA实现


    OTA并不是必要功能但是却是一个在设备后期维护上及其有价值的一个功能,MICROPYTHON的OTA实现方式不一而足,今天记录一种云升级主程序的方式,这几乎是最简单的一种方式,可以实现云升级程序的功能,但是同时也是一种功能比较单一的OTA方式,即不能在线升级固件,也不能更新其他模块的一种OTA方式。

    这种OTA依赖码云作为空中平台,当然如果有其他的自己的平台也可用使用毕竟仅仅是使用request库来获取代码以及写入主文件的过程。

    存在问题:安全性不好,也可以说没啥安全性。但是作为最简单的方式,也没有什么问题。

    首先在boot.py里写下联网以及连接更新库的内容,然后再让程序去执行主程序。下面程序由于我考虑了如果没有网络的环境程序会故障所以做了一些联网检测的处理所以程序比较乱一些

    #import esp
    #esp.osdebug(None)
    #import webrepl
    #webrepl.start()
    # 功能:联手机热点上网并获取gitee上最新文件替换esp32上的main.py
    import network,time
    import urequests
    wlan = network.WLAN(network.STA_IF)
    print(0)
    wlan.active(False)
    time.sleep(1)
    wlan.active(True)
    print(1)
    wlan_if=0
    def do_connect():
        add=0
        if not wlan.isconnected():
            print('connecting to network...')
            wlan.connect('100king', '13704677369')
            while not wlan.isconnected():
                time.sleep(0.5)
                add=add+1
                if add>40:
                    global wlan_if
                    wlan_if=1
                    break
                    
    if wlan_if==0:            
        print('network config:', wlan.ifconfig())
        try:
            do_connect()
            #url="https://gitee.com/tianyudi/esp32/raw/master/webpush/main.py
            url="https://gitee.com/gauck/my/raw/master/%E8%8A%A6%E4%B8%81%E9%B8%A1%E5%8D%87%E7%BA%A7/main.py"
            data=urequests.get(url,).content
            print(data)
            if chr(data[9])=="1":
                # 如果main.py第十个字符为1,说明需要更新
                with open(r"main.py", "wb") as f:
                    print("开始更新")
                    f.write(data)
            else:
                print("不更新")
        except:
            pass
    else:
        pass
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    然后就是在码云平台上托管一个代码,注意以下要点:

    1、仓库要公开,随便访问那种
    2、发布代码后点击原始数据按钮查看URL地址该地址内容要写入BOOT.PY
    3、主程序代码要预留更新标志

    下面是,这次为了孵芦丁鸡蛋做的温控器代码,功能都在定时器和中断里,主程序没啥功能。

    # update=1
    # 上面1表示更新,0表示不更新
    ver=1.81
    print(ver)
    #####
    print('oled:  scl=14,sda=12,vcc=27,gnd=26')
    print('jdq:  p1=19,p2=21,p3=22,p4=23')
    print('ds18b02:   data=13')
    ##非易失存储
    import esp32
    nvs_data=esp32.NVS('abc')
    now_tmp=0
    try:
        set_tmp=nvs_data.get_i32('set_tmp')
    except:
        nvs_data.set_i32('set_tmp',384)
    ##
    add=0
    import machine
    def callback_k1(pin):#定义回调函数
        time.sleep(0.2)
        if k_1.value()==0:
            oled.fill(0)
            oled.text('jdq ce shi',0,0,1)
            p_out.value(not p_out.value())
            oled.show()
    def callback_k2(pin):#定义回调函数
        time.sleep(0.2)
        if k_2.value()==0:
            oled.fill(0)
            oled.text('jdq ce shi',0,0,1)
            p_out.value(not p_out.value())
            oled.show()
          
    def callback_k3(pin):#定义回调函数
        time.sleep(0.2)
        if k_3.value()==0:
            oled.fill(0)
            global set_tmp
            set_tmp=set_tmp+1
            
            oled.text('set_tmp=%s'%str(set_tmp),0,0,1)
            nvs_data.set_i32('set_tmp',set_tmp)
            oled.show()
           
    def callback_k4(pin):#定义回调函数
        time.sleep(0.2)
        if k_4.value()==0:
            oled.fill(0)
            global set_tmp
            set_tmp=set_tmp-1
            oled.text('set_tmp=%s'%str(set_tmp),0,0,1)
            nvs_data.set_i32('set_tmp',set_tmp)
            oled.show()
          
    def callback_t0(t):
    
        try:
            global now_tmp,set_tmp
            ds.convert_temp()
            time.sleep_ms(750)
            for rom in roms:
                now_tmp=ds.read_temp(rom)
                print(now_tmp)
            if  now_tmp>=(set_tmp/10 ):
                p_out.value(1)
                print(11111)
            if  now_tmp<(set_tmp/10 ):
                p_out.value(0)
                print(22222)
            oled.fill_rect(0,16,128,32,0)
            oled.text('now_tmp=%s'%str(now_tmp),0,16,1)
            oled.text('jdq_now=%s'%str(not p_out.value()),0,32,1)
            oled.show()
            global add
            add=add+1
        except:
            print('读取失败')
            oled.text('errrr',0,48,1)
            oled.show()
                
    wl_list=[]    
    t1 = machine.Timer(0)#实例化一个定时器通道,可选0-3一共四个定时器
    t1.init( period=10000, mode=machine.Timer.PERIODIC, callback=callback_t0)#1秒循环定时 参数为:间隔时间(毫秒)定时模式:循环还是一次  回调函数:指定\
    
    
    import time,json,network
    
    from machine import Pin
    import onewire,time, ds18x20
    ow = onewire.OneWire(Pin(13)) # create a OneWire bus on GPIO13
    ds = ds18x20.DS18X20(ow)
    roms = ds.scan()
    ds.convert_temp()
    time.sleep_ms(750)
    for rom in roms:
        tmp=ds.read_temp(rom)
        print(tmp)
        
    p_out=Pin(22,Pin.OUT,value=1)
    
    k_gnd=Pin(4,Pin.OUT,value=0)
    k_4=Pin(17,Pin.IN,Pin.PULL_UP)
    k_3=Pin(16,Pin.IN,Pin.PULL_UP)
    
    k_2=Pin(5, Pin.IN,Pin.PULL_UP)
    k_1=Pin(18,Pin.IN,Pin.PULL_UP)
    k_4.irq(trigger=machine.Pin.IRQ_FALLING, handler=callback_k4)
    k_1.irq(trigger=machine.Pin.IRQ_FALLING, handler=callback_k1)
    k_2.irq(trigger=machine.Pin.IRQ_FALLING, handler=callback_k2)
    k_3.irq(trigger=machine.Pin.IRQ_FALLING, handler=callback_k3)
    oled_vcc=Pin(27,Pin.OUT,value=1)
    oled_gnd=Pin(26,Pin.OUT,value=0)
    import machine,ssd1306
    i2c=machine.SoftI2C(scl=machine.Pin(14),sda=machine.Pin(12)) #这里是引脚改成自己的,不能偷懒
    oled=ssd1306.SSD1306_I2C(128,64,i2c)
    oled.fill(0)
    oled.text('s_tmp=%s'%str(set_tmp),0,0,1)
    oled.text('%s'%str(ver),84,0,1)
    oled.show()
    
    while 1:
        time.sleep(1)
        add=0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125

    继续~~~ 能更新一个文件就应该能更新一群文件那么我再实现一个更新多个文件的方案,首先是写入板子的BOOT.PY文件的内容,这次我们更新的文件叫UPDATA.PY并保存好,然后本地化执行并引用这个文件

    # This file is executed on every boot (including wake-boot from deepsleep)
    #import esp
    #esp.osdebug(None)
    #import webrepl
    #webrepl.start()
    # This file is executed on every boot (including wake-boot from deepsleep)
    # 功能:联手机热点上网并获取gitee上最新文件替换esp32上的main.py
    import network,time
    import urequests
    wlan = network.WLAN(network.STA_IF)
    print(0)
    wlan.active(False)
    time.sleep(1)
    wlan.active(True)
    print(1)
    wlan_if=0
    def do_connect():
        add=0
        if not wlan.isconnected():
            print('connecting to network...')
            wlan.connect('100king', '13704677369')
            while not wlan.isconnected():
                time.sleep(0.5)
                add=add+1
                if add>40:
                    global wlan_if
                    wlan_if=1
                    break
                    
    if wlan_if==0:            
        print('network config:', wlan.ifconfig())
        try:
            do_connect()
            url="https://gitee.com/gauck/my/raw/master/%E6%B5%8B%E8%AF%952/updata.py"
            data=urequests.get(url,).content
            if chr(data[9])=="1":
                # 如果main.py第十个字符为1,说明需要更新
                with open(r"updata.py", "wb") as f:
                    print("开始更新")
                    f.write(data)
                import updata
            else:
                print("不更新")
        except:
            print('errr')
        
    else:
        pass
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    下面是UPDATA.PY文件里的内容,主要是把待更新的仓库位置以字典键值对的形式输入FOR循环,因为这个UPDATA.PY文件是云端更新下来的所以这里的内容就不用预先在板子上写入了,也就是文件系统彻底脱离板子了。基本实现了灵活更新文件系统的目的。

    # update=1
    # 上面1表示更新,0表示不更新
    ver=1.1
    import urequests
    print(ver)
    list_dir={'main.py':'https://gitee.com/gauck/my/raw/master/%E6%B5%8B%E8%AF%952/main.py','a.py':'https://gitee.com/gauck/my/raw/master/%E6%B5%8B%E8%AF%952/a.py','b.py':'https://gitee.com/gauck/my/raw/master/%E6%B5%8B%E8%AF%952/b.py'}
    try:
        for k,v in list_dir.items():
            print(k,v)
            data=urequests.get(v,).content
            with open(r'%s'%k, "wb") as f:
                print("开始更新")
                f.write(data)
    except:
        ver=ver-0.1
        print('ver=',ver)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    到这里基本也就完成了OTA的主要功能,只不过已经遗留在板子里的文件在这种模式下智能刷成空的确不能删除文件,就这么点缺点。

  • 相关阅读:
    uni-app的uni-list在真机调试中无法显示
    AI推介-大语言模型LLMs论文速览(arXiv方向):2024.03.05-2024.03.10—(1)
    使用过邮箱服务,你对`SMTP`、`POP3`、`IMAP`三大协议有过了解吗?
    如何解决找不到xinput1_3.dll无法继续执行此代码?5个解决方法分享
    Elastic Stack从入门到实践(一)--Elastic Stack入门(2)--Beats、Filebea入门
    【AI】Genetic Algorithm 遗传算法
    Oracle-表空间基于时间点恢复(TSPITR)
    mysql学习笔记1:mysql字符集和字符集编码
    nginx proxy_set_header设置、自定义header
    第十九章 Kali Linux桌面概览
  • 原文地址:https://blog.csdn.net/gaoke11240/article/details/134093775