Python基于 BaseHTTPRequestHandler 创建简单Web服务

2024-03-27 05:44

本文主要是介绍Python基于 BaseHTTPRequestHandler 创建简单Web服务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

启动一个最基础的 WEB 服务

创建文件 server.py

# Python 3 server example
from http.server import BaseHTTPRequestHandler, HTTPServerhostName = "localhost"
serverPort = 8080class MyServer(BaseHTTPRequestHandler):def do_GET(self):self.send_response(200)self.send_header("Content-type", "text/html")self.end_headers()self.wfile.write(bytes("<html><body><p>Request: %s</p></body></html>" % self.path, "utf-8"))if __name__ == "__main__":webServer = HTTPServer((hostName, serverPort), MyServer)print("Server started http://%s:%s" % (hostName, serverPort))try:webServer.serve_forever()except KeyboardInterrupt:passwebServer.server_close()print("Server stopped.")

启动命令

python3 server.py

区分访问路径

do_GET方法内, 使用 self.path 变量区分

def do_GET(self):if self.path == '/':self.send_response(200)self.send_header("Content-type", "text/html")self.end_headers()self.wfile.write(bytes("<html><body><p>Request: %s</p></body></html>" % self.path, "utf-8"))elif self.path == '/upload':self.send_response(200)self.send_header("Content-type", "text/html")self.end_headers()self.wfile.write(bytes("<html><body><p>Request: %s</p></body></html>" % self.path, "utf-8"))

处理 POST 请求

实现do_POST方法

def do_POST(self):content_length = int(self.headers['Content-Length'])file_content = self.rfile.read(content_length)# Do what you wish with file_content#print file_content# Respond with 200 OKself.send_response(200)self.send_header("Content-type", "text/html")self.end_headers()self.wfile.write(bytes("<html><body><p>Request: %s</p></body></html>" % self.path, "utf-8"))

处理请求参数和 COOKIE 等

添加以下引用

from functools import cached_property
from http.cookies import SimpleCookie
from urllib.parse import parse_qsl, urlparse

在 MyServer 类下添加以下处理方法

    @cached_propertydef url(self):return urlparse(self.path)@cached_propertydef query_data(self):return dict(parse_qsl(self.url.query))@cached_propertydef post_data(self):content_length = int(self.headers.get("Content-Length", 0))return self.rfile.read(content_length)@cached_propertydef form_data(self):return dict(parse_qsl(self.post_data.decode("utf-8")))@cached_propertydef cookies(self):return SimpleCookie(self.headers.get("Cookie"))

处理 Multipart 文件上传

需要引入

from urllib.parse import parse_qs, parse_qsl, urlparse
import cgi

对请求根据 content-type 分别处理

    def parse_POST(self):print(self.headers)ctype, pdict = cgi.parse_header(self.headers['content-type'])if ctype == 'multipart/form-data':print("file request")pdict['boundary'] = bytes(pdict['boundary'], "utf-8")postvars = cgi.parse_multipart(self.rfile, pdict)elif ctype == 'application/x-www-form-urlencoded' or 'application/json':   print("non-file request")length = int(self.headers['content-length'])postvars = parse_qs(self.rfile.read(length).decode('utf8'),keep_blank_values=1)elif ctype == 'application/octet-stream':print("octet stream header")postvars = {}else:print("nothing")postvars = {}a = self.rfileprint(dir(a))print(a.peek())return postvars

do_POST 中调用

    def do_POST(self):postvars = self.parse_POST()print(postvars)

一个接收文件并调用 PaddleOCR 识别的WEB服务例子

server.py

from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qs, parse_qsl, urlparse
import cgi
import json
import paddleocr_helperhostName = "localhost"
serverPort = 8080class MyServer(BaseHTTPRequestHandler):def parse_POST(self):print(self.headers)ctype, pdict = cgi.parse_header(self.headers['content-type'])if ctype == 'multipart/form-data':print("file request")pdict['boundary'] = bytes(pdict['boundary'], "utf-8")postvars = cgi.parse_multipart(self.rfile, pdict)elif ctype == 'application/x-www-form-urlencoded' or 'application/json':   print("non-file request")length = int(self.headers['content-length'])postvars = parse_qs(self.rfile.read(length).decode('utf8'),keep_blank_values=1)elif ctype == 'application/octet-stream':print("octet stream header")postvars = {}else:print("nothing")postvars = {}a = self.rfileprint(dir(a))print(a.peek())return postvarsdef do_GET(self):if self.path == '/':self.send_response(200)self.send_header("Content-type", "text/html")self.end_headers()self.wfile.write(bytes("<html><body><p>Request: %s</p></body></html>\r\n" % self.path, "utf-8"))def do_POST(self):postvars = self.parse_POST()#print(postvars)#print(type(postvars['file']))result = {}try:result = paddleocr_helper.parse_and_lookup(postvars['file'][0])except:print("Error occurred")result = {"code": 1,"message": "error","data": None}pass# Respond with 200 OKself.send_response(200)self.send_header("Content-type", "application/json")self.end_headers()#self.wfile.write(bytes("<html><body><p>Request: %s</p></body></html>\r\n" % self.path, "utf-8"))self.wfile.write(bytes(json.dumps(result), "utf-8"))if __name__ == "__main__":webServer = HTTPServer((hostName, serverPort), MyServer)print("Server started http://%s:%s" % (hostName, serverPort))try:webServer.serve_forever()except KeyboardInterrupt:passwebServer.server_close()print("Server stopped.")

paddleocr_helper.py

import json
import re
import math
from paddleocr import PaddleOCRdef parse_image(imagedata):ocr = PaddleOCR(show_log=False, use_angle_cls=True, lang="ch") # need to run only once to download and load model into memoryresult = ocr.ocr(imagedata, cls=True)outputs = []for idx in range(len(result)):res = result[idx]output = []for line in res:ocr_result = {'boxes' : line[0],'text'  : line[1][0],'score' : line[1][1]}output.append(ocr_result)outputs.append(output)return outputsdef lookup_invoice_number(ocr_blocks):block_no = Noneblock_numbers = []for ocr_block in ocr_blocks:#print(ocr_block['text'])regex_result = re.compile('(票据号码|柔据号码|桑据号码|柔线号码|柔楼号码|系热号码):(\d+)').search(ocr_block['text'])if not regex_result is None:#print('----> ' + regex_result.group(2))return regex_result.group(2)regex_result = re.compile('N(?:.*?)(\d{8,})').search(ocr_block['text'])if not regex_result is None:#print('----> ' + regex_result.group(1))return regex_result.group(1)if re.match('No(\.|:|:)', ocr_block['text']):#print('- No. block: {}'.format(ocr_block['text']))block_no = ocr_blockregex_result = re.compile('(\d{8,})').search(ocr_block['text'])if not regex_result is None:#print('- Num block: {}'.format(regex_result.group(1)))ocr_block['text'] = regex_result.group(1)block_numbers.append(ocr_block)if not block_no is None and not len(block_numbers) == 0:#print('- block_no:{}'.format(block_no))distance_min = Nonecandidate = Nonefor block_number in block_numbers:# calculate distance between number and Nodistance = calcu_distance(block_no['boxes'], block_number['boxes'])#print('- dist:{}, block:{}'.format(distance, block_number))print('- dist:{}, block:{}'.format(distance, block_number['text']))if (distance_min is None) or (distance < distance_min):distance_min = distancecandidate = block_number['text']return candidatereturn Nonedef parse_and_lookup(imagedata):invoice_numbers = []ocr_result = parse_image(imagedata)if len(ocr_result) > 0:for ocr_blocks in ocr_result:invoice_number = lookup_invoice_number(ocr_blocks)if (not invoice_number is None):invoice_numbers.append(invoice_number)resp = {"code": 0,"message": "succ","data": {"invoice_numbers": invoice_numbers}}return respdef calcu_distance(boxes1, boxes2):distance_min = Nonebox1 = Nonefor box in boxes1:distance = point_distance(box, boxes2[0])if (distance_min is None) or (distance < distance_min):distance_min = distancebox1 = boxfor box in boxes2:distance = point_distance(box, box1)if (distance_min is None) or (distance < distance_min):distance_min = distancereturn distance_mindef point_distance(point1, point2):x = point1[0] - point2[0]y = point1[1] - point2[1]qrt = math.sqrt(x**2 + y**2)return qrt

参考

  • https://realpython.com/python-http-server/

这篇关于Python基于 BaseHTTPRequestHandler 创建简单Web服务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/851111

相关文章

Git打标签从本地创建到远端推送的详细流程

《Git打标签从本地创建到远端推送的详细流程》在软件开发中,Git标签(Tag)是为发布版本、标记里程碑量身定制的“快照锚点”,它能永久记录项目历史中的关键节点,然而,仅创建本地标签往往不够,如何将其... 目录一、标签的两种“形态”二、本地创建与查看1. 打附注标http://www.chinasem.cn

Python的Darts库实现时间序列预测

《Python的Darts库实现时间序列预测》Darts一个集统计、机器学习与深度学习模型于一体的Python时间序列预测库,本文主要介绍了Python的Darts库实现时间序列预测,感兴趣的可以了解... 目录目录一、什么是 Darts?二、安装与基本配置安装 Darts导入基础模块三、时间序列数据结构与

Python正则表达式匹配和替换的操作指南

《Python正则表达式匹配和替换的操作指南》正则表达式是处理文本的强大工具,Python通过re模块提供了完整的正则表达式功能,本文将通过代码示例详细介绍Python中的正则匹配和替换操作,需要的朋... 目录基础语法导入re模块基本元字符常用匹配方法1. re.match() - 从字符串开头匹配2.

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

通过Docker容器部署Python环境的全流程

《通过Docker容器部署Python环境的全流程》在现代化开发流程中,Docker因其轻量化、环境隔离和跨平台一致性的特性,已成为部署Python应用的标准工具,本文将详细演示如何通过Docker容... 目录引言一、docker与python的协同优势二、核心步骤详解三、进阶配置技巧四、生产环境最佳实践

Python一次性将指定版本所有包上传PyPI镜像解决方案

《Python一次性将指定版本所有包上传PyPI镜像解决方案》本文主要介绍了一个安全、完整、可离线部署的解决方案,用于一次性准备指定Python版本的所有包,然后导出到内网环境,感兴趣的小伙伴可以跟随... 目录为什么需要这个方案完整解决方案1. 项目目录结构2. 创建智能下载脚本3. 创建包清单生成脚本4

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

python获取指定名字的程序的文件路径的两种方法

《python获取指定名字的程序的文件路径的两种方法》本文主要介绍了python获取指定名字的程序的文件路径的两种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要... 最近在做项目,需要用到给定一个程序名字就可以自动获取到这个程序在Windows系统下的绝对路径,以下

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、