Yet another otaku

某ロリコン的自白


  • 首頁
  • 歸檔
  • 分類
  • 標籤
  • 連結
  • 關於
  •    

© 2025 SgDylan

Theme Typography by Makito

Proudly published with Hexo

使用 ESP8266 搭建 IoT 节点

發佈於 2017-01-21 評論 编程  Python MicroPython ESP8266 IoT 

刚好要买 ARM 板子,就顺手一并买了一块传说中的廉价 WiFi 开发板——使用 ESP8266 的 NodeMCU 。
那么就来说说这块据说性能强大、价格低廉,且可以写脚本语言的开发板。

由于固件基于 AI-Thinker 官方的 SDK 使用 C 语言编写,这块开发板可以使用的固件有很多。
除了官方提供的使用AT指令的固件外,大概就是 NodeMCU 和 MicroPython 这两种固件最强大了。

NodeMCU固件由于模块较多,故官方不提供二进制预编译固件,需要自己定制编译。
可以使用在线编译网站在线选择自己需要的模块编译固件使用,网站的地址是:
https://nodemcu-build.com/index.php

就个人使用而言,感觉上 NodeMCU 固件非常不便利,很多文档写的方法照着用也会错误。
(大概是我不太懂 lua 的语法)
所以最后还是选择用 MicroPython 完成全部操作,最后的效果是由 ESP8266 接收传感器数据,然后封装到 JSON,使用HTTP协议发送给API服务端。
由于手头没有传感器,故使用随机数生成器生成随机数代替传感数据。

ESP8266 使用的 MicroPython 的文档在这里可以找到:
https://docs.micropython.org/en/latest/esp8266/esp8266/quickref.html

首先是下载烧写固件,在下边的网址里找一个固件下载就可以,建议下载稳定版
http://micropython.org/download#esp8266

然后是安装配套的工具

pip install pyserial
git clone https://github.com/themadinventor/esptool.git

进入 esptool 目录,运行脚本擦除 flash (防止出现奇怪的问题)
这里要修改使用的串口,我使用的是COM5。
如果指令失败了,那么你应该需要使板子进入烧录模式,操作的方法是:
按住Flash键的同时按一下RST键,看到蓝灯闪一下就进入烧录模式了

python esptool.py --port COM5 --baud 115200 erase_flash

然后直接烧录固件(如果烧录无法进行,试着重新拔插再进入烧录模式操作)
这里要修改串口、FLASH大小及固件文件名称
FLASH大小是根据板子使用的芯片而来的,NodeMCU有v0.9 v1.0两个版本在售,判断很简单:
看靠近USB口的USART2USB芯片型号即可。使用CP2102的是v1.0版,使用CH340的是v0.9版。
v1.0版即为4MB Flash。

python esptool.py --port COM5 --baud 115200 write_flash --flash_size=4MB 0x00000 esp8266-20170108-v1.8.7.bin

烧好固件就可以使用串口或USB连接开发板烧写程序了。
其中,连接串口直接交互和在终端使用Python交互是一样的。如果需要写入程序,还需要安装 ampy

pip install adafruit-ampy

使用 ampy 很简单

# 上传程序直接运行 
ampy --port COM5 run test.py

# 拷贝程序并重命名为 
ampy --port COM5 put test.py main.py

# 删除板上程序 
ampy --port COM5 rm test.py

MicroPython 上电后会直接运行 boot.py 及 main.py,我们需把主程序写在 main.py 里。
其中使用到了第三方的 http 库,由于与固件内其他库冲突,故重命名为 requests.py
源码:https://github.com/balloob/micropython-http-client

以下为 main.py,功能就是生成12个随机串发送到服务器

import requests
import machine
import time
import os
import network
import ubinascii as binascii
import gc

def do_send(phase0, phase1, phase2, balance_info):
    readySendJSON = {
        "phase0": {"Freq": phase0[0],"Amp": phase0[1], "Phase": phase0[2]},
        "phase1": {"Freq": phase1[0],"Amp": phase1[1], "Phase": phase1[2]},
        "phase2": {"Freq": phase2[0],"Amp": phase2[1], "Phase": phase2[2]},
        "balance_info": {"Freq": balance_info[0],"Amp": balance_info[1], "Phase": balance_info[2]}
    }
    r = requests.post('http://servername:8080/api', json=readySendJSON, timeout=3) # 修改服务器地址
    r.close()
    #print(r.json())

def do_connect():
    # if is connected, light LED
    LED = machine.Pin(16, machine.Pin.OUT)
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        LED.high()
        print('connecting to network...')
        wlan.connect('SSID', 'PASSWORD') # 这里修改为你的WiFi名称及密码
        while not wlan.isconnected():
            pass
    LED.low()
    print('network config:', wlan.ifconfig())

def do_receive_data():
    receive_data = []
    for i in range(12):
        receive_data.append(binascii.hexlify(os.urandom(8)))
    return receive_data

def do_process_data():
    data_raw = do_receive_data()
    phase0 = []
    phase1 = []
    phase2 = []
    balance_info = []
    # print(data_raw)
    for i in range(0,3):
        phase0.append(data_raw[i])
        phase1.append(data_raw[i+3])
        phase2.append(data_raw[i+6])
        balance_info.append(data_raw[i+9])
    return phase0, phase1, phase2, balance_info

def main():
    gc.enable()
    do_connect()
    while(True):
        try:
            phase0, phase1, phase2, balance_info = do_process_data()
            do_send(phase0, phase1, phase2, balance_info)
            print("Send data successful.")
        except OSError:
            print("Error occurs.")
            pass
        gc.collect()
        time.sleep(1.5)

if __name__ == "__main__":
    main()

API服务端使用 Flask 编写,以下为主程序

API的接口为 /api
使用GET方法时 参数 method 和 value 同时为 status 及 view 时,返回传回的JSON
使用POST方法时,接收开发板传回的JSON

展示页面地址为 /status,可以直接看到解析后的信息

from flask import Flask,request
import json
import datetime
app = Flask(__name__)

global lastestValue

@app.route('/api', methods=['GET', 'POST'])
def api_process():
    global lastestValue
    if request.method == 'GET':
        method = request.args.get("method")
        value = request.args.get("value")
        if method == "status" and value == "view":
            try:
                return json.dumps(lastestValue,ensure_ascii=False)
            except NameError:
                return "<h3>NULL DATA</h3><br>"
            except Exception as e:
                return "<h3>Internal Server Error</h3><br>" + str(e)
    if request.method == 'POST':
        #print(request.data.decode(encoding="utf-8"))
        try:
            lastestValue = json.loads(request.data.decode(encoding="utf-8"))
            lastestValue['update_time'] = str(datetime.datetime.now())
        except:
            print("Decode error")
        #print(lastestValue,type(lastestValue),lastestValue['phase0'])
        return "<h3>POST OK</h3>"

@app.route('/status')
def app_status():
    try:
        return """<h2>Update date %s</h2>
                  <h3>Phase0: %s %s %s</h3>
                  <h3>Phase1: %s %s %s</h3>
                  <h3>Phase2: %s %s %s</h3>
                  <h3>balance_info: %s %s %s</h3>
            """ % (lastestValue['update_time'], \
                   lastestValue['phase0']['Freq'], lastestValue['phase0']['Freq'], lastestValue['phase0']['Freq'], \
                   lastestValue['phase1']['Freq'], lastestValue['phase1']['Freq'], lastestValue['phase1']['Freq'], \
                   lastestValue['phase2']['Freq'], lastestValue['phase2']['Freq'], lastestValue['phase2']['Freq'], \
                   lastestValue['balance_info']['Freq'], lastestValue['balance_info']['Freq'], lastestValue['balance_info']['Freq'],
            )
    except NameError:
        return "<h3>NULL DATA</h3><br>"
    except Exception as e:
        return "<h3>Internal Server Error</h3><br>" + str(e)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)

2017-02-25 更新:
上边这套程序有一个问题,即初始化的时候不会关闭默认启动的AP。
可以在程序本体启动前执行以下代码以关闭默认AP:

wlanAP = network.WLAN(network.AP_IF)
self.wlanAP.active(False)

分享到 

 上一篇: 在视频处理中使用SVP4 下一篇: 饥荒 Don't Starve Together 服务器安装记录 

© 2025 SgDylan

Theme Typography by Makito

Proudly published with Hexo