一次Python性能调优经历

2024-06-16 13:58

本文主要是介绍一次Python性能调优经历,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

从毕业到现在一直都是做java的,最近因为转到数据组,所以开始着手学python。
近期一个项目需求是把百度百科词条页面里面的“明星关系”抽取出来并存储,当然这里不用实时的从百度百科上去爬取,百度百科的词条信息已经全部在我们mongodb库中,总计1700万的词条,当然这里有很大的重复。
当然这里,因为数据量太大,所以必须要开启多线程处理,最开始用的是python的concurrent包里面的futures模块,用该模块下的ThreadPoolExecutor线程池来处理,代码如下:
import zlib
import simplejson
import pymongo
import codecs
import threading
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSouphost = "10.10.0.0"
port = 20000
db = "baike_db"
username = "username"
passwd = "passwd"
crawlCol = "crawlCol"
browserCol = "brawlCol"
dbClient = pymongo.MongoClient(host, port)[db]
dbClient.authenticate(username, passwd)
crawlDb = dbClient[crawlCol]
browserCol = dbClient[browserCol]
subviewUrl = ""
lock = threading.Lock()
writeLock = threading.Lock()
handledUrlSet = set()
exceptionFile = codecs.open("exceptionFile.o", "a", "utf-8")
relationFile = codecs.open("relationFile.o", "a", "utf-8")
handledUrlFile = codecs.open("handledUrlFile.o", "a", "utf-8")
def handleDocument(doc):try:subviewUrl = doc['subview_url']print subviewUrllock.acquire()if not(subviewUrl in handledUrlSet):handledUrlSet.add(subviewUrl)lock.release()text = zlib.decompress(doc['text'])htmlDoc = BeautifulSoup(text, 'html5lib')relations =  htmlDoc.select(".star-info-block.relations")if relations:relation = relations[0]reList = relations.select("li")if reList:realList = []for real in reList:aList = real.select("a")if aList:people = aList[0]data = simplejson.loads("{}")data['href'] = people['href']data['src'] = people.img['src']em = people.em.extract()data['name'] = em.text.strip()data['relation'] = people.text.strip()realList.append(data)if realList:record = simplejson.loads("{}")record['subviewUrl'] = realListprint(record)writeLock.acquire()relationFile.write(str(record) + "\n")writeLock.release()else:lock.release()except BaseException as e:exceptionFile.write(subviewUrl + "\t" + str(e) + "\n")# 创建一个大小为15的线程池
pool = ThreadPoolExecutor(max_workers=15)# 先处理比较新的browser数据
docBrowserList = browserCol.find()
index = 0
for doc in docBrowserList:print("browser:" + str(index))index += 1pool.submit(handleDocument, doc)index = 0# 再处理比较老的crawl数据
docCrawlList = crawlCol.find()
for doc in docCrawlList:print("crawl:" + str(index))index += 1pool.submit(handleDocument, doc)思想就是创建一个大小为15的线程池,然后主线程每次从mongodb里面取出一个document后,提交给线程池处理。线程池内部的有一个队列,当提交的任务超过了线程池最大可以同时处理的任务时,新提交的任务就会放入内部队列中,但是因为我们这里有1700w的数据,所以如果用线程池的话要么队列会被撑爆而抛异常,要么内存会被撑爆。当然在这个基础上,可以通过捕获队列满时抛出的异常来做响应处理,比如等待一段时间,理论上也是可以用线程池处理的。但是我比较懒,一般来说,我都会选择比较简便的办法。
所以我改用threading模块,代码如下
#-*-coding:utf-8 -*-
import zlib
import Queue
import simplejson
import pymongo
import codecs
import threading
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSouphost = "10.10.0.0"
port = 20000
db = "baike_db"
username = "username"
passwd = "passwd"
crawlCol = "crawlCol"
browserCol = "brawlCol"
dbClient = pymongo.MongoClient(host, port)[db]
dbClient.authenticate(username, passwd)
crawlDb = dbClient[crawlCol]
browserCol = dbClient[browserCol]
subviewUrl = ""
lock = threading.Lock()
writeLock = threading.Lock()
exceptLock = threading.Lock()
handledUrlSet = set()
docQue = Queue.Queue(maxsize=1000)
exceptionFile = codecs.open("exceptionFile.o", "a", "utf-8")
relationFile = codecs.open("relationFile.o", "a", "utf-8")
handledUrlFile = codecs.open("handledUrlFile.o", "a", "utf-8")def handleDocument(doc):try:	subviewUrl = doc['subview_url']print subviewUrllock.acquire()if not(subviewUrl in handledUrlSet):handledUrlSet.add(subviewUrl)lock.release()text = zlib.decompress(doc['text'])htmlDoc = BeautifulSoup(text, 'html5lib')relations =  htmlDoc.select(".star-info-block.relations")if relations:relation = relations[0]reList = relations.select("li")if reList:realList = []for real in reList:aList = real.select("a")if aList:people = aList[0]data = simplejson.loads("{}")data['href'] = people['href']data['src'] = people.img['src']  em = people.em.extract()data['name'] = em.text.strip()data['relation'] = people.text.strip()realList.append(data)if realList:record = simplejson.loads("{}")record['subviewUrl'] = realListprint(record) writeLock.acquire()relationFile.write(str(record) + "\n")writeLock.release()else:lock.release()except BaseException as e:exceptLock.acquire()exceptionFile.write(subviewUrl + "\t" + str(e) + "\n")    exceptLock.release()class handleThread(threading.Thread):def __init__(self, threadId):threading.Thread.__init__(self)self.threadId = threadIddef run(self):global docQuewhile True:doc = docQue.get(block=True)if doc:handleDocument(doc)else:print("thread%d run over" % threadId) breakclass getDocumentThread(threading.Thread):def __init__(self, threadId):threading.Thread.__init__(self)self.threadId = threadIddef run(self):global docQueglobal browserColglobal crawlColdocBrowserList = browserCol.find()index = 0for doc in docBrowserList:print("browser:" + str(index) + "to queue")index += 1docQue.put(doc, block=True)index = 0docCrawlList = crawlCol.find()for doc in docCrawlList:print("crawl:" + str(index) + "to queue")index += 1docQue.put(doc, block=True)#设置结束标记for i in range(15):docQue.put(None, block=True)getThread = getDocumentThread(1)
getThread.start()for j in range(15):thread = handleThread(j)thread.start()于是乎,跑起来,但是发现速度慢的跟狗,一秒处理几十个,我擦,这样算的话我1700w要搞到猴年马月去?那为什么处理这么慢呢?对于我这个刚入门python的人来说过段找了下其他长期用python的同事,得到的回复是这样的:
python本身虽然支持多线程的功能,但是这多个线程会受一个GIL限制,就类似于一个全局的锁,所以意思就是多个线程在这里的时候就要排队了具体关于GIL的介绍,大家可以参考这篇文章:
http://cenalulu.github.io/python/gil-in-python/
知道了python的多线程是这样的玩意后,我惊呆了,这是明显的缺陷啊,好在python提供了多进程来弥补这个缺陷,所以我将代码改成了多进程,代码如下:
#-*-coding:utf-8 -*-
import zlib
import Queue
import simplejson
import pymongo
import codecs
import time
from multiprocessing import queues
import multiprocessing
import threading
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
from multiprocessing import Process, Managerhost = "10.10.0.0"
port = 20000
db = "baike_db"
username = "username"
passwd = "passwd"
crawlCol = "crawlCol"
browserCol = "brawlCol"dbClient = pymongo.MongoClient(host, port)[db]
dbClient.authenticate(username, passwd)
crawlDb = dbClient[crawlCol]
browserCol = dbClient[browserCol]m = Manager()
lock = multiprocessing.Lock()
writeLock = multiprocessing.Lock()
exceptLock = multiprocessing.Lock()
handledUrlDict = m.dict()
docQue = queues.Queue(maxsize=1000)
exceptionFile = codecs.open("exceptionFile.o", "a", "utf-8")
relationFile = codecs.open("relationFile.o", "a", "utf-8")
#handledUrlFile = codecs.open("handledUrlFile.o", "a", "utf-8")
pCount = 15def handleDocument(doc, threadId, lock, exceptLock, writeLock, handledUrlDict):subviewUrl = ""#start = int(time.time() * 1000)try: subviewUrl = doc['subview_url']print "thread" + str(threadId) + subviewUrllock.acquire()if not(subviewUrl in handledUrlDict):handledUrlDict[subviewUrl] = 1lock.release()#       s2 = int(time.time() * 1000)#      print("threadId %d s2 use: %d" % (threadId, s2 - start))text = zlib.decompress(doc['text']).decode('utf-8')#     s4 = int(time.time() * 1000)#    print("threadId %d s4 use: %d" % (threadId, s4 - s2))htmlDoc = BeautifulSoup(text, 'lxml')#   s5 = int(time.time() * 1000)#  print("threadId %d s5 use: %d" % (threadId, s5 - s4))relations =  htmlDoc.select(".star-info-block.relations")# s3 = int(time.time() * 1000)# print("threadId %d s3 use: %d" % (threadId, s3 - s5))if relations:relation = relations[0]reList = relation.select("li")if reList:realList = []for real in reList:aList = real.select("a")if aList:people = aList[0]data = simplejson.loads("{}")data['href'] = people['href']data['src'] = people.img['src']  em = people.em.extract()data['name'] = em.text.strip()data['relation'] = people.text.strip()realList.append(data)if realList:record = simplejson.loads("{}")record['subviewUrl'] = subviewUrlrecord['realList']= realList print(record) writeLock.acquire()relationFile.write(str(record) + "\n")writeLock.release()# s4 = int(time.time() * 1000)# print("threadId %d s4 use: %d" % (threadId, s4 - s3))else:lock.release()passexcept BaseException as e:exceptLock.acquire()exceptionFile.write(subviewUrl + "\t" + str(e) + "\n")    exceptLock.release()#print("threadId %d total use: %d" % (threadId, int(time.time() * 1000) - start))class handleProcess(multiprocessing.Process):def __init__(self, threadId, docQue, lock, exceptLock, writeLock, handledUrlDict):multiprocessing.Process.__init__(self)self.threadId = threadIdself.docQue = docQueself.lock = lockself.exceptLock = exceptLockself.writeLock = writeLockself.handledUrlDict = handledUrlDictdef run(self):while True:doc = docQue.get(block=True)if doc:handleDocument(doc, self.threadId, lock, exceptLock, writeLock, handledUrlDict)else:print("thread%d run over" % threadId) breakclass getDocumentThread(threading.Thread):def __init__(self, threadId):threading.Thread.__init__(self)self.threadId = threadIddef run(self):global docQueglobal browserColglobal crawlColdocBrowserList = browserCol.find()index = 0for doc in docBrowserList:print("browser:" + str(index) + "to queue")index += 1docQue.put(doc, block=True)index = 0docCrawlList = crawlCol.find()for doc in docCrawlList:print("crawl:" + str(index) + "to queue")index += 1docQue.put(doc, block=True)#设置结束标记global pCountfor i in range(pCount):docQue.put(None, block=True)getThread = getDocumentThread(1)
getThread.start()for j in range(pCount):process = handleProcess(j, docQue, lock, exceptLock, writeLock, handledUrlDict)process.start()#process.join()多进程有个什么蛋疼的问题呢。因为多进程的内存是独立的,他们之间是不共享内存的,那怎么通信?多个进程之间怎么共享变量?python提供的很多种解决方案,都在multiprocessing模块里面,比如队列,字典等,如果这几个模块里面要访问线程不安全的对象怎么办?它提供的全局锁可以解决这个问题。
代码运行起来后,发现速度特别慢,评估了下,要处理完这1700w的数据需要45个小时,我去,这个时间不能忍受,于是找原因,看时间主要消耗在哪里,所以代码中我加了很多日志,记录每个步骤消耗的时间,最后发现耗时主要在如下代码:
htmlDoc = BeautifulSoup(text, 'html5lib')(这里改之前的代码)
于是我查了下BeanutifulSoup的文档,它有好几个解析器,分别为html.parse, lxml,html5lib等,你可以直接参考它的官方文档:
https://www.crummy.com/software/BeautifulSoup/bs4/doc/index.zh.html
文档中比较了这几个解释器,可以很清楚的看到文档说html5lib解析器很慢,所以我改用了lxml,所以瞬间提升了4.5倍,这个时间就可以接收了。最后抛出一个这里我还没有弄明白的一个问题:子进程和主线程的退出时机
import codecs
import time
import threading
from multiprocessing import Process, Manager
from multiprocessing import queues
import multiprocessing
# 每个子进程执行的函数
# 参数中,传递了一个用于多进程之间数据共享的特殊字典
f = codecs.open("exceptionFile.o", "a", "utf-8")def func1(d,q, f,s):print(d)f.write("11\n")d['k3']= 'k3's.add('33')print("11:" + str('11' in s))print("k1:" + str("k1" in d))
def func2(d,q, f,s):print(d)f.write("11\n")print("33:" + str('33' in s))print("k3:" + str('k3' in d))q = queues.Queue(1000)
for j in range(1000):q.put(j)class testProc(Process):def __init__(self, d):multiprocessing.Process.__init__(self)self.d = ddef run(self):print("testk3:" + str('k3' in d))class testThread(threading.Thread):def run(self):#time.sleep(5)pass# 在主进程中创建特殊字典
m = Manager()
d = m.dict()
s = set()
s.add('11')
d['k1'] = 'k1'
d['k2'] = 3
t = testThread()
t.start()
p1 = Process(target=func1, args=(d,q,f,s))
p1.start()p2 = Process(target=func2, args=(d,q,f,s))
p2.start()如上代码,会报如下错误:
<DictProxy object, typeid 'dict' at 0x7f7110976dd0; '__str__()' failed>
Process Process-2:self._connect()File "/usr/local/lib/python2.7/multiprocessing/managers.py", line 742, in _connect
Traceback (most recent call last):conn = self._Client(self._token.address, authkey=self._authkey)File "/usr/local/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrapFile "/usr/local/lib/python2.7/multiprocessing/connection.py", line 169, in Clientself.run()File "/usr/local/lib/python2.7/multiprocessing/process.py", line 114, in runself._target(*self._args, **self._kwargs)File "test.py", line 15, in func1c = SocketClient(address)d['k3']= 'k3'File "<string>", line 2, in __setitem__File "/usr/local/lib/python2.7/multiprocessing/connection.py", line 308, in SocketClientFile "/usr/local/lib/python2.7/multiprocessing/managers.py", line 755, in _callmethods.connect(address)File "/usr/local/lib/python2.7/socket.py", line 228, in methself._connect()File "/usr/local/lib/python2.7/multiprocessing/managers.py", line 742, in _connectconn = self._Client(self._token.address, authkey=self._authkey)File "/usr/local/lib/python2.7/multiprocessing/connection.py", line 169, in Clientreturn getattr(self._sock,name)(*args)
error: [Errno 2] No such file or directoryc = SocketClient(address)File "/usr/local/lib/python2.7/multiprocessing/connection.py", line 308, in SocketClients.connect(address)File "/usr/local/lib/python2.7/socket.py", line 228, in methreturn getattr(self._sock,name)(*args)
error: [Errno 2] No such file or directory查了下错误原因:说是因为主线程退出了,导致子线程执行失败,实践证明解决办法有两个
第一个:在代码最后执行:
p1.join()
p2.join()
即明确告诉主进程,你必须在我们两个子进程执行完之后才能继续执行,这样主线程就不会提前退出
第二个:让testThread开启后休眠,这样也可以正常执行,但是这是为什么呢?如果testThread休眠时间到了,但是p1和p2还没有执行完,是不是也会直接抛异常呢?
通过这个例子可以发现,我处理百度百科的数据代码中,主线程开启了一个getDocumentThread线程不停的往队列中丢数据,另外开启了15个进程不段的从队列中取数据并处理,现在初期可以正常执行时因为getDocumentThread线程还在执行,如果到了最后的1000条记录,该线程执行完后,先退出,此时15个进程因为还有1000个数据要处理,此时会不会因为getDocumentThread线程的退出而抛异常呢?
希望看了这篇的文档的朋友,如果知道,可以回复下我。

这篇关于一次Python性能调优经历的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1066635

相关文章

Conda与Python venv虚拟环境的区别与使用方法详解

《Conda与Pythonvenv虚拟环境的区别与使用方法详解》随着Python社区的成长,虚拟环境的概念和技术也在不断发展,:本文主要介绍Conda与Pythonvenv虚拟环境的区别与使用... 目录前言一、Conda 与 python venv 的核心区别1. Conda 的特点2. Python v

Python使用python-can实现合并BLF文件

《Python使用python-can实现合并BLF文件》python-can库是Python生态中专注于CAN总线通信与数据处理的强大工具,本文将使用python-can为BLF文件合并提供高效灵活... 目录一、python-can 库:CAN 数据处理的利器二、BLF 文件合并核心代码解析1. 基础合

Python使用OpenCV实现获取视频时长的小工具

《Python使用OpenCV实现获取视频时长的小工具》在处理视频数据时,获取视频的时长是一项常见且基础的需求,本文将详细介绍如何使用Python和OpenCV获取视频时长,并对每一行代码进行深入解析... 目录一、代码实现二、代码解析1. 导入 OpenCV 库2. 定义获取视频时长的函数3. 打开视频文

Python中你不知道的gzip高级用法分享

《Python中你不知道的gzip高级用法分享》在当今大数据时代,数据存储和传输成本已成为每个开发者必须考虑的问题,Python内置的gzip模块提供了一种简单高效的解决方案,下面小编就来和大家详细讲... 目录前言:为什么数据压缩如此重要1. gzip 模块基础介绍2. 基本压缩与解压缩操作2.1 压缩文

Python设置Cookie永不超时的详细指南

《Python设置Cookie永不超时的详细指南》Cookie是一种存储在用户浏览器中的小型数据片段,用于记录用户的登录状态、偏好设置等信息,下面小编就来和大家详细讲讲Python如何设置Cookie... 目录一、Cookie的作用与重要性二、Cookie过期的原因三、实现Cookie永不超时的方法(一)

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客

Python函数作用域示例详解

《Python函数作用域示例详解》本文介绍了Python中的LEGB作用域规则,详细解析了变量查找的四个层级,通过具体代码示例,展示了各层级的变量访问规则和特性,对python函数作用域相关知识感兴趣... 目录一、LEGB 规则二、作用域实例2.1 局部作用域(Local)2.2 闭包作用域(Enclos

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

Python中注释使用方法举例详解

《Python中注释使用方法举例详解》在Python编程语言中注释是必不可少的一部分,它有助于提高代码的可读性和维护性,:本文主要介绍Python中注释使用方法的相关资料,需要的朋友可以参考下... 目录一、前言二、什么是注释?示例:三、单行注释语法:以 China编程# 开头,后面的内容为注释内容示例:示例:四