ESP32 Picoweb教程:修改返回的HTTP代码-Arduino中文社区 - Powered by Discuz! Archiver

dfrobot 发表于 2019-5-16 12:03

ESP32 Picoweb教程:修改返回的HTTP代码

本帖最后由 dfrobot 于 2019-5-13 18:19 编辑

引 言本文主要说明如何在MicroPython Picoweb应用程序中返回特定的HTTP代码。我们将使用一个叫做http_error的函数。这个函数其实也可以返回其他类型的HTTP代码,比如在2xx范围内的代码(成功代码)。可用的HTTP响应代码可参见此处(https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html)。测试使用的是一个集成在ESP32开发板中的DFRobot的sp - wroom -32设备。代码开发是在MicroPython IDE uPyCraft上完成的。你可以在前一篇文章:ESP32 MicroPython教程:uPyCraft IDE入门 中查看如何使用uPyCraft。尽管本文将使用一个特殊的函数来返回状态代码,但是上一篇教程中用来将第一部分HTTP响应发送给客户端的start_response函数也有一个可选的状态参数可用于返回特殊的HTTP代码。默认情况下,其返回值是200,对应于OK 。

start_response函数代码如下:

# Picoweb web pico-framework for MicroPython
# Copyright (c) 2014-2018 Paul Sokolovsky
# SPDX-License-Identifier: MIT
import sys
import gc
import micropython
import utime
import uio
import ure as re
import uerrno
import uasyncio as asyncio
import pkg_resources

from .utils import parse_qs


def get_mime_type(fname):
    # Provide minimal detection of important file
    # types to keep browsers happy
    if fname.endswith(".html"):
      return "text/html"
    if fname.endswith(".css"):
      return "text/css"
    if fname.endswith(".png") or fname.endswith(".jpg"):
      return "image"
    return "text/plain"

def sendstream(writer, f):
    buf = bytearray(64)
    while True:
      l = f.readinto(buf)
      if not l:
            break
      yield from writer.awrite(buf, 0, l)


def jsonify(writer, dict):
    import ujson
    yield from start_response(writer, "application/json")
    yield from writer.awrite(ujson.dumps(dict))

def start_response(writer, content_type="text/html", status="200", headers=None):
    yield from writer.awrite("HTTP/1.0 %s NA\r\n" % status)
    yield from writer.awrite("Content-Type: ")
    yield from writer.awrite(content_type)
    if not headers:
      yield from writer.awrite("\r\n\r\n")
      return
    yield from writer.awrite("\r\n")
    if isinstance(headers, bytes) or isinstance(headers, str):
      yield from writer.awrite(headers)
    else:
      for k, v in headers.items():
            yield from writer.awrite(k)
            yield from writer.awrite(": ")
            yield from writer.awrite(v)
            yield from writer.awrite("\r\n")
    yield from writer.awrite("\r\n")

def http_error(writer, status):
    yield from start_response(writer, status=status)
    yield from writer.awrite(status)


class HTTPRequest:

    def __init__(self):
      pass

    def read_form_data(self):
      size = int(self.headers)
      data = yield from self.reader.read(size)
      form = parse_qs(data.decode())
      self.form = form

    def parse_qs(self):
      form = parse_qs(self.qs)
      self.form = form


class WebApp:

    def __init__(self, pkg, routes=None, serve_static=True):
      if routes:
            self.url_map = routes
      else:
            self.url_map = []
      if pkg and pkg != "__main__":
            self.pkg = pkg.split(".", 1)
      else:
            self.pkg = None
      if serve_static:
            self.url_map.append((re.compile("^/(static/.+)"), self.handle_static))
      self.mounts = []
      self.inited = False
      # Instantiated lazily
      self.template_loader = None
      self.headers_mode = "parse"

    def parse_headers(self, reader):
      headers = {}
      while True:
            l = yield from reader.readline()
            if l == b"\r\n":
                break
            k, v = l.split(b":", 1)
            headers = v.strip()
      return headers

    def _handle(self, reader, writer):
      if self.debug > 1:
            micropython.mem_info()

      close = True
      try:
            request_line = yield from reader.readline()
            if request_line == b"":
                if self.debug >= 0:
                  self.log.error("%s: EOF on request start" % reader)
                yield from writer.aclose()
                return
            req = HTTPRequest()
            # TODO: bytes vs str
            request_line = request_line.decode()
            method, path, proto = request_line.split()
            if self.debug >= 0:
                self.log.info('%.3f %s %s "%s %s"' % (utime.time(), req, writer, method, path))
            path = path.split("?", 1)
            qs = ""
            if len(path) > 1:
                qs = path
            path = path

            #print("================")
            #print(req, writer)
            #print(req, (method, path, qs, proto), req.headers)

            # Find which mounted subapp (if any) should handle this request
            app = self
            while True:
                found = False
                for subapp in app.mounts:
                  root = subapp.url
                  #print(path, "vs", root)
                  if path[:len(root)] == root:
                        app = subapp
                        found = True
                        path = path
                        if not path.startswith("/"):
                            path = "/" + path
                        break
                if not found:
                  break

            # We initialize apps on demand, when they really get requests
            if not app.inited:
                app.init()

            # Find handler to serve this request in app's url_map
            found = False
            for e in app.url_map:
                pattern = e
                handler = e
                extra = {}
                if len(e) > 2:
                  extra = e

                if path == pattern:
                  found = True
                  break
                elif not isinstance(pattern, str):
                  # Anything which is non-string assumed to be a ducktype
                  # pattern matcher, whose .match() method is called. (Note:
                  # Django uses .search() instead, but .match() is more
                  # efficient and we're not exactly compatible with Django
                  # URL matching anyway.)
                  m = pattern.match(path)
                  if m:
                        req.url_match = m
                        found = True
                        break

            if not found:
                headers_mode = "skip"
            else:
                headers_mode = extra.get("headers", self.headers_mode)

            if headers_mode == "skip":
                while True:
                  l = yield from reader.readline()
                  if l == b"\r\n":
                        break
            elif headers_mode == "parse":
                req.headers = yield from self.parse_headers(reader)
            else:
                assert headers_mode == "leave"

            if found:
                req.method = method
                req.path = path
                req.qs = qs
                req.reader = reader
                close = yield from handler(req, writer)
            else:
                yield from start_response(writer, status="404")
                yield from writer.awrite("404\r\n")
            #print(req, "After response write")
      except Exception as e:
            if self.debug >= 0:
                self.log.exc(e, "%.3f %s %s %r" % (utime.time(), req, writer, e))

      if close is not False:
            yield from writer.aclose()
      if __debug__ and self.debug > 1:
            self.log.debug("%.3f %s Finished processing request", utime.time(), req)

    def mount(self, url, app):
      "Mount a sub-app at the url of current app."
      # Inspired by Bottle. It might seem that dispatching to
      # subapps would rather be handled by normal routes, but
      # arguably, that's less efficient. Taking into account
      # that paradigmatically there's difference between handing
      # an action and delegating responisibilities to another
      # app, Bottle's way was followed.
      app.url = url
      self.mounts.append(app)

    def route(self, url, **kwargs):
      def _route(f):
            self.url_map.append((url, f, kwargs))
            return f
      return _route

    def add_url_rule(self, url, func, **kwargs):
      # Note: this method skips Flask's "endpoint" argument,
      # because it's alleged bloat.
      self.url_map.append((url, func, kwargs))

    def _load_template(self, tmpl_name):
      if self.template_loader is None:
            import utemplate.source
            self.template_loader = utemplate.source.Loader(self.pkg, "templates")
      return self.template_loader.load(tmpl_name)

    def render_template(self, writer, tmpl_name, args=()):
      tmpl = self._load_template(tmpl_name)
      for s in tmpl(*args):
            yield from writer.awrite(s)

    def render_str(self, tmpl_name, args=()):
      #TODO: bloat
      tmpl = self._load_template(tmpl_name)
      return ''.join(tmpl(*args))

    def sendfile(self, writer, fname, content_type=None, headers=None):
      if not content_type:
            content_type = get_mime_type(fname)
      try:
            with pkg_resources.resource_stream(self.pkg, fname) as f:
                yield from start_response(writer, content_type, "200", headers)
                yield from sendstream(writer, f)
      except OSError as e:
            if e.args == uerrno.ENOENT:
                yield from http_error(writer, "404")
            else:
                raise

    def handle_static(self, req, resp):
      path = req.url_match.group(1)
      print(path)
      if ".." in path:
            yield from http_error(resp, "403")
            return
      yield from self.sendfile(resp, path)

    def init(self):
      """Initialize a web application. This is for overriding by subclasses.
      This is good place to connect to/initialize a database, for example."""
      self.inited = True

    def run(self, host="127.0.0.1", port=8081, debug=False, lazy_init=False, log=None):
      if log is None and debug >= 0:
            import ulogging
            log = ulogging.getLogger("picoweb")
            if debug > 0:
                log.setLevel(ulogging.DEBUG)
      self.log = log
      gc.collect()
      self.debug = int(debug)
      self.init()
      if not lazy_init:
            for app in self.mounts:
                app.init()
      loop = asyncio.get_event_loop()
      if debug > 0:
            print("* Running on http://%s:%s/" % (host, port))
      loop.create_task(asyncio.start_server(self._handle, host, port))
      loop.run_forever()
      loop.close()



代 码首先,我们要导入HTTP服务器设置所需要的picoweb模块,以及建立ESP32 WiFi网络连接所需要的网络模块。

import picoweb
import network
然后,连接到WiFi网络。这一段代码是通用代码,在上一篇教程:ESP32 Picoweb教程:提供JSON内容 中有详细说明。您只需要把ssid和密码变量换成您的WiFi网络信息,ESP32就能连接到WiFi网络。请注意,我们将在程序末尾把分配给ESP32的IP地址存储到一个变量中,以便稍后将其作为参数传递给app(应用程序)实例的run(运行)方法。
ssid = "yourNetworkName"
password ="yourPassword"
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(ssid, password)
while station.isconnected() == False:
pass
ip = station.ifconfig()
连接到WiFi网络之后,我们就需要创建app(应用程序)实例,并声明它要监听的路由。在本例中,我们只使用一个路由对服务器返回的HTTP内部服务器错误(https://www.lifewire.com/500-internal-server-error-explained-2622938)进行测试。我们将监听“/internalerror”端点。
app = picoweb.WebApp(__name__)
@app.route("/internalerror")
def internalError(req, resp):
##Handling function code
为简单起见,我们的路由处理函数会立即返回HTTP错误。当然,在实际的应用中,在向某个端点返回一条错误之前,肯定会有一些相关的控制逻辑。只需调用picoweb模块的http_error函数(https://github.com/pfalcon/picoweb/blob/master/picoweb/__init__.py#L42)即可返回错误代码,函数参数是以字符串格式表示的客户端数据流写入器对象和错误代码。内部服务器错误对应的代码是500 。您也可以对其他状态代码进行测试。由于该函数内部使用了数据流写入器的awrite方法,所以我们需要使用yield from关键字。
yield from picoweb.http_error(resp, "500")
最后,只需调用app(应用程序)实例的run(运行)方法启动服务器即可。最终的源代码如下所示,其中包含了函数调用(使用分配给ESP32的IP地址)。
import picoweb
import network
ssid = "yourNetworkName"
password ="yourPassword"
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(ssid, password)
while station.isconnected() == False:
pass
ip = station.ifconfig()
app = picoweb.WebApp(__name__)
@app.route("/internalerror")
def internalError(req, resp):
    yield from picoweb.http_error(resp, "500")
app.run(debug=True, host =ip)


测试代码要对代码进行测试,只需将脚本上传到ESP32开发板运行即可。脚本执行时,控制台上会显示一条消息,表示服务器正在监听的根路径。您只需将URL复制到一个网页浏览器中并在后面加上internalerror字样(对应于应用程序正在监听的路由)。所得到的输出结果如图1所示。请注意,我打开了Chaome浏览器的开发者工具选项,以查看更详细的请求信息。

http://mc.dfrobot.com.cn/data/attachment/forum/201901/29/102734h75qsaqa4eeq74el.png图1 - 返回的内部服务器错误HTTP代码。

如引言部分所述,尽管该函数名为http_error,但是实际上我们也可以用它来返回非错误代码(比如对应于新建资源的代码201 )。被返回的HTTP代码如图2所示。注意,我仅仅在之前的代码中把传递给http_error的状态代码从500改成了201,所以路由仍将其称为“internalerror”。
http://mc.dfrobot.com.cn/data/attachment/forum/201901/29/102741z7dx6vi746477fh5.png图2 - 修改的HTTP代码。

查看更多ESP32/ESP8266教程和项目,请点击 :ESP32教程汇总贴英文版教程 : ESP32 tutorial

页: [1]
查看完整版本: ESP32 Picoweb教程:修改返回的HTTP代码