本文主要是介绍python多线程并发测试过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《python多线程并发测试过程》:本文主要介绍python多线程并发测试过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教...
一、并发与并行?
1、多任务概念:操作系统可以同时运行多个任务。
2、并发:任务数量多于cpu核数,通过操作系统的任务调度算法,多个任务可以一起执行,实际总有一些任务不在一起执行,因为切换任务的速度相当快。宏观上是一起执行,微观上不是一起执行的。
3、并行:指的是任务数量小于cpu核数,即任务真的是一起执行。
⚠️多线程都是并发的状态,缺点是比并行慢
二、同步与异步的概念?
1、同步:协同步调
def work(): print("1") def work2(): print("2") if __name__ == '__main__': work() work2()
2、异步:步调各异
三、线程与进程的区别?
1、线程被包含在进程内,一个程序启动时,先启动进程,进程调用线程执行任务。
2、线程依赖进程。
3、线程可以被抢占(中断)
需求1:多线程执行不同任务
1、函数
函数式:调用thread模块中的start_new_thread()函数来产生新线程:
threaChina编程d.start_new_thread ( function, args[, kwargs] )
参数说明:
function
- 线程函数。args
- 传递给线程函数的参数,他必须是个tuple类型。kwargs
- 可选参数
示例1:主线程不等待子线程
import time import threading as td #为线程定义一个函数 def asr_test1(url): print("start") print(time.ctime(time.time())) def asr_test2(url): print("end") print(time.ctime(time.time())) if __name__ == '__main__': #target为函数名,args为元组,函数一定要输入形参 t1=td.Thread(target=asr_test1,args=("",)) t2 = td.Thread(target=asr_test1, args=("",)) t1.start() t2.start() 结果: start Fri Aug 5 16:03:05 2022 start Fri Aug 5 16:03:05 2022 示例2、主线程等待子线程执行完毕 import threading as th def work(): print("1") def work2(): print("2") #原理:启动时会调用python进程,进程调用主线程。 if __name__ == '__main__': '''子线程启动''' start_time = time.time() t1=th.Thread(target=work) t2 = th.Thread(target=work2) t1.start() t2.start() #主线程不会等待 # t1.join(5)#设置主线程等待子线程的执行时间 end = time.time() - start_time print(end)
thread提供了低级别的、原始的线程以及一个简单的锁。
Thread类提供以下方法:
run()
: 用以表示线程活动的方法。start()
:启动线程活动。join([time])
: 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。isAlive()
: 返回线程是否活动的。getName()
: 返回线程名。setName()
: 设置线程名。
2、类包装线程对象
需求2:多线程执行相同任务
1.threading并发性
线程并发运行并共享内存。
代码如下(重写run方法,继承Thread类):
import time from threading import Thread ''' Thread其他初始化参数: target:指定任务函数 name:指定线程分组 ''' class MyThread(Thread): def run(self): for i in range(100): time.sleep(0.1) print("请求第{}次".format(self.name,i)) if __name__ == '__main__': for i in range(5): t=MyThread(name=f"线程{i+1}") t.start()
传递参数一:
def work(name,age): for i in range(5): # time.sleep(1) print("{}work1----{}---{}".format(name,i,age)) if __name__ == '__main__': #给任务函数传递参数,方式一:args参数,里面是元组 # t=Thread(target=work,args=("",)) # t.start() #方式2:kargs t2 = Thread(target=work, kwargs={"name":"wupig","age":18}) t2.start()
传递参数:继承类
2.多线程并发—资源共享,资源竞争问题:
1、无法并行,原因是python解释器有一把锁:GIL,在同一时间只能执行一个线程。多线程是并发执行的,经过python解释器后进行线程调度,线程切换机制:
2、线程执行遇到IO耗时操作(sleep,网络io,文件io),python解释器释放锁,进行线程切换,再次获取全局锁。
3、线程执行时间达到一定的阈值(面试)
解决方法一:加锁
解决方法二:队列
死锁案例:
需求:
1、并发读取本地文件。
2、并发获取url地址。
xxx.txt文件中有很多URL地址,需求多并发获编程取。
'''需求:并发读取txt中的URL地址''' list_test=[] filename="../China编程../url.txt" file=open(filename,"r") #列表推导式 list_test=[i.strip("\n") for i in file] def audio_test(): while list_test: url=list_test.pop() pwIUoCYQSnrint(url) def atest_thread(THREAD_MAX, Test_Script): #创建空列表 thread_list = [] for i in range(int(THREAD_MAX)): # print("**************",i) thread_name = 'thread-' + str(i) print("thread_name",thread_name) new_thread = threading.Thread(target=Tesphpt_Script, args="", name=thread_name) thread_list.append(new_thread) new_thread.start() for thread_obj in thread_list: #thread_obj.join:等待至线程中止 thread_obj.join() if __name__ == '__main__': starttime=time.time() count = 10 # THREAD_MAX=10 atest_thread(count, audio_test) endtime=time.time()-starttime print(endtime)
占坑
这样只有一个线程执行。需要创建列表,将线程放入列表中,然后循环变量列表进行json。
总结
这篇关于python多线程并发测试过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!