技术标签: python线程池超过最大数量
一,前言
进程:是程序,资源集合,进程控制块组成,是最小的资源单位
特点:就对Python而言,可以实现真正的并行效果
缺点:进程切换很容易消耗cpu资源,进程之间的通信相对线程来说比较麻烦
线程:是进程中最小的执行单位。
特点无法利用多核,无法实现真正意义上是并行效果。
优点:对于IO密集型的操作可以很好利用IO阻塞的时间
二,GIL(全局解释器锁)
python目前有很多解释器,目前使用最广泛的是CPython,还有PYPY和JPython等解释器,但是使用最广泛的还是CPython解释器,而对于全局解释器锁来说,就是在CPython上面才有的,它的原理是在解释器层面加上一把大锁,保证同一时刻只能有一个python线程在解释器中执行。
对于计算密集型的python多线程来说,无法利用到多线程带来的效果, 在2.7时计算密集型的python多线程执行效率比顺序执行的效率还低的多,在python3.5中对这种情况进行了优化,基本能实现这种多线程执行时间和顺序执行时间差不多的效果。
对于I/O密集型的python多线程来说,GIL的影响不是很大,因为I/O密集型的python多线程进程,每个线程在等待I/O的时候,将会释放GIL资源,供别的线程来抢占。所以对于I/O密集型的python多线程进程来说,还是能比顺序执行的效率要高的。
python的GIL这个东西。。。比较恶心,但是由于CPython解释器中的很多东西都是依赖这个东西开发的,如果改的话,将是一件浩大的工程。。。所以到现在还是存在这个问题,这也是python最为别人所诟病的地方。。。
三,多线程
多线程相当于一个并发(concunrrency)系统。并发系统一般同时执行多个任务。如果多个任务可以共享资源,特别是同时
写入某个变量的时候,就需要解决同步的问题,比如多线程火车售票系统:两个指令,一个指令检查票是否卖完,另一个指令,多
个窗口同时卖票,可能出现卖出不存在的票。
3.1 python实现多线程
python实现多线程有两种方式,分别是直接调用和继承调用,如下实例:
直接调用:
importthreadingimporttime#定义线程运行函数
defmv():print('播放=========')
time.sleep(2)print('ending=======')#带参数方式
defplay(name):print("打游戏======"+name)
time.sleep(3)print("ending======")if __name__ == '__main__':
th= threading.Thread(target=mv)
th2= threading.Thread(target=play, args=("LOL",))
th.start()
th2.start()
继承调用:
importthreadingimporttimeclassMyThread(threading.Thread):def __init__(self, num):
super(MyThread, self).__init__()
self.num=numdefrun(self):print("play game %s======" %self.num)
time.sleep(2)print("end play")if __name__ == '__main__':
mt= MyThread(1)
mt2= MyThread(2)
mt.start()
mt2.start()
可以看到直接调用是导入threading模块并定义一个函数,之后实例化threading.Thread类的时候,将刚定义的函数名通过target参数传递进去,然后调用实例的start()方法启动一个线程。
而继承式调用是创建一个类继承自threading.Thread类,并在构造方法中调用父类的构造方法,之后重写run方法,run方法中就是每个线程起来之后执行的内容,就类似于前面通过target参数传递进去的函数。之后以这个继承的类来创建对象,并执行对象的start()方法启动一个线程。
从这里可以看出,其实。。。直接调用通过使用target参数把函数带进类里面之后应该是用这个函数替代了run方法。
3.2 线程阻塞和守护线程(join和setDaemon)
join()方法在该线程对象启动了之后调用线程的join()方法之后,那么主线程将会阻塞在当前位置直到子线程执行完成才继续往下走,如果所有子线程对象都调用了join()方法,那么主线程将会在等待所有子线程都执行完之后再往下执行。
setDaemon(True)方法在子线程对象调用start()方法(启动该线程)之前就调用的话,将会将该子线程设置成守护模式启动,这是什么意思呢?当子线程还在运行的时候,父线程已经执行完了,如果这个子线程设置是以守护模式启动的,那么随着主线程执行完成退出时,子线程立马也退出,如果没有设置守护启动子线程(也就是正常情况下)的话,主线程执行完成之后,进程会等待所有子线程执行完成之后才退出。
join示例:
importthreadingimporttime#定义线程运行函数
defmv():print('播放=========')
time.sleep(2)print('ending=======')#带参数方式
defplay(name):print("打游戏======"+name)
time.sleep(3)print("ending======")
games= ["LOL", "DNF", "PUBG"]if __name__ == '__main__':
th= threading.Thread(target=mv)
th.start()#th.join() # 带上该代码,程序(也就是主线程)不会马上往下执行,回显执行完th线程
for i in [threading.Thread(target=play, args=(i, )) for i ingames]:
i.start()
setDaemon示例:
importthreadingimporttime#定义线程运行函数
defmv():print('播放=========')
time.sleep(5)print('ending=======')if __name__ == '__main__':
th= threading.Thread(target=mv)
th.setDaemon(True)#若有改行代码,程序执行直接退出,不打印出 ending=====,因为主线程执行完毕了直接退出。
th.start()
3.3 线程锁
互斥锁的产生是因为前面提到过多线程之间是共享同一块内存地址的,也就是说多个不同的线程能够访问同一个变量中的数据,那么,当多个线程要修改这个变量,会产生什么情况呢?当多个线程修改同一个数据的时候,如果操作的时间够短的话,能得到我们想要的结果,但是,如果修改数据不是原子性的(这中间的时间太长)的话。。。很有可能造成数据的错误覆盖,从而得到我们不想要的结果。例子如下:
importthreadingimporttime
count=0defadd_num():globalcount
tmp=count
time.sleep(0.001) #若没有改代码,输出100,若有改代码输出是10,11,9等不确定是数
count = tmp + 1
defrun(add_fun):globalcount
thread_list=[]for i in range(100):
t= threading.Thread(target=add_fun)
t.start()
thread_list.append(t)for j inthread_list:
j.join()print(count)if __name__ == '__main__':
run(add_num)
出现上述情况的原因:是因为,尽管count+=1是非原子操作,但是因为CPU执行的太快了,比较难以复现出多进程的非原子操作导致的进程不安全。经过代替之后,尽管只sleep了0.001秒,但是对于CPU的时间来说是非常长的,会导致这个代码块执行到一半,GIL锁就释放了。即tmp已经获取到count的值了,但是还没有将tmp + 1赋值给count。而此时其他线程如果执行完了count = tmp + 1, 当返回到原来的线程执行时,尽管count的值已经更新了,但是count = tmp + 1是个赋值操作,赋值的结果跟count的更新的值是一样的。最终导致了我们累加的值有很多丢失。
互斥锁
针对上面的情况,我们就需要引入互斥锁的这一概念。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
importthreadingimporttime
count=0defadd_num():globalcountif lock.acquire(): #获得锁,并返回True
tmp =count
time.sleep(0.001)
count= tmp + 1lock.release()#执行完释放锁
defrun(add_fun):globalcount
thread_list=[]for i in range(100):
t= threading.Thread(target=add_fun)
t.start()
thread_list.append(t)for j inthread_list:
j.join()print(count)if __name__ == '__main__':
lock=threading.Lock()
run(add_num)
另一种获得锁的方式(with):
importthreadingimporttime
count=0defadd_num():globalcount
with lock:#获得锁,并返回True
tmp =count
time.sleep(0.001)
count= tmp + 1
defrun(add_fun):globalcount
thread_list=[]for i in range(100):
t= threading.Thread(target=add_fun)
t.start()
thread_list.append(t)for j inthread_list:
j.join()print(count)if __name__ == '__main__':
lock=threading.Lock()
run(add_num)
迭代死锁
importthreadingimporttimeclassMyThread(threading.Thread):defrun(self):globalnum
time.sleep(1)if mutex.acquire(): #第一次获得锁
num = num+1msg= self.name+'set num to'+str(num)print(msg)
mutex.acquire()#在锁未释放的时候第二次获得锁,需要注意的是这里的锁指的是同一个锁对象mutex
mutex.release()
mutex.release()
num=0
mutex=threading.Lock()deftest():for i in range(5):
t=MyThread()
t.start()if __name__ == '__main__':
test() # 无输出,一直阻塞,因为没有释放锁,又想再次获得锁,该锁已经被拿走了,是空的就会一直等待锁释放
上述代码中,无输出,一直阻塞,因为没有释放锁,又想再次获得锁,该锁已经被拿走了,是空的就会一直等待锁释放使用的是同一个锁对象muex,若创建一个新的锁对象,就不会出现这些情况。
相互调用锁
importthreadingimporttimedeffun1():print('=====')
lock1.acquire()
time.sleep(1)print('-------')
lock2.acquire()#lock2在另一线程使用,为释放
time.sleep(2)
lock1.release()
lock2.release()deffun2():print("-------")
lock2.acquire()print("locl2====")
time.sleep(0.1)
lock1.acquire()#lock1在上一线程使用未释放
print("lock3=====")
time.sleep(1)
lock1.release()
lock2.release()
lock1=threading.Lock()
lock2=threading.Lock()defrun():
th=threading.Thread(target=fun1)
th2=threading.Thread(target=fun2)
th.start()
th2.start()if __name__ == '__main__':
run()
像上面这种情况,锁未释放就获取另一把锁,而另一把锁已经在使用,同时另一线程反过来调用第一把锁,从而产生这种相互等待锁的阻塞情况。
死锁解决
为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。这里以例1为例,如果使用RLock代替Lock,则不会发生死锁:
importthreadingimporttimeclassMyThread(threading.Thread):defrun(self):globalnum
time.sleep(1)if mutex.acquire(1):
num= num+1msg= self.name+'set num to'+str(num)printmsg
mutex.acquire()
mutex.release()
mutex.release()
num=0
mutex=threading.RLock()deftest():for i in range(5):
t=MyThread()
t.start()if __name__ == '__main__':
test()
3.5 信号量
Semaphore管理一个内置的计数器
Semaphore与进程池看起来类似,但是是完全不同的概念。
进程池:Pool(4),最大只能产生四个进程,而且从头到尾都只是这四个进程,不会产生新的。
信号量:信号量是产生的一堆进程/线程,即产生了多个任务都去抢那一把锁
from threading importThread,Semaphore,currentThreadimporttime,random
sm= Semaphore(5) #运行的时候有5个人
deftask():
sm.acquire()print('\033[42m %s上厕所'%currentThread().getName())
time.sleep(random.randint(1,3))print('\033[31m %s上完厕所走了'%currentThread().getName())
sm.release()if __name__ == '__main__':for i in range(20): #开了10个线程 ,这20人都要上厕所
t = Thread(target=task)
t.start()
Semaphore举例
四,线程池
线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。
4.1 介绍和基本方法
官网:https://docs.python.org/dev/library/concurrent.futures.html
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
Both implement the same interface, whichis defined by the abstract Executor class.
1、submit(fn, *args, **kwargs)
异步提交任务
2、map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作
3、shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
4、result(timeout=None)
取得结果
5、add_done_callback(fn)
回调函数
4.2 线程池的使用
from concurrent.futures importThreadPoolExecutor,ProcessPoolExecutorimportos,time,randomdeftask(n):print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))return n**2
if __name__ == '__main__':
executor=ProcessPoolExecutor(max_workers=3)
futures=[]for i in range(11):
future=executor.submit(task,i)
futures.append(future)
executor.shutdown(True)print('+++>')for future infutures:print(future.result())
4.3 异步调用和同步调用
1、同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,在执行下一行代码,导致程序是串行
2、异步调用:提交完任务后,不用原地等待任务执行完毕
回调函数:可以为进程池或线程池内得每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接受任务的返回值当作参数,该函数成为回调函数。
#提交任务的两种方式#1、同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行#
#from concurrent.futures import ThreadPoolExecutor#import time#import random#
#def la(name):#print('%s is laing' %name)#time.sleep(random.randint(3,5))#res=random.randint(7,13)*'#'#return {'name':name,'res':res}#
#def weigh(shit):#name=shit['name']#size=len(shit['res'])#print('%s 拉了 《%s》kg' %(name,size))#
#
#if __name__ == '__main__':#pool=ThreadPoolExecutor(13)#
#shit1=pool.submit(la,'alex').result()#weigh(shit1)#
#shit2=pool.submit(la,'wupeiqi').result()#weigh(shit2)#
#shit3=pool.submit(la,'yuanhao').result()#weigh(shit3)
#2、异步调用:提交完任务后,不地等待任务执行完毕,
from concurrent.futures importThreadPoolExecutorimporttimeimportrandomdefla(name):print('%s is laing' %name)
time.sleep(random.randint(3,5))
res=random.randint(7,13)*'#'
return {'name':name,'res':res}defweigh(shit):
shit=shit.result()
name=shit['name']
size=len(shit['res'])print('%s 拉了 《%s》kg' %(name,size))if __name__ == '__main__':
pool=ThreadPoolExecutor(13)
pool.submit(la,'alex').add_done_callback(weigh)
pool.submit(la,'wupeiqi').add_done_callback(weigh)
pool.submit(la,'yuanhao').add_done_callback(weigh)
五,总结
对应IO密集型任务可以通过创建线程池来提高效率,而对于计算密集型则没必要,计算密集型可以考虑分布式计算。
安卓天气应用
哈喽大家好,我是菌菌~采访写稿子、会议做笔录、上课划重点……工作学习中,我们经常会有记笔记、做记录的需求,但往往因为说话者语速过快等各种原因弄得手忙脚乱,无法记录到位,这时,录音是个好办法。然而,录音一时爽,听时火葬场。一句一句听录音,再一个字一个字地打出来,效率低、浪费时间不说,遇到想反复核查的重要部分,还要来回拖拽进度条,实在让人头疼。今天菌菌给大家推荐几个免费又好用的语音转文字&文字转语音工具,让你彻底解放双手,大大提高工作效率,码字和交流更轻松!网易见外工作台网易见外工作台j
参考:http://www.cnblogs.com/BYRans/p/4735409.html在逻辑回归模型中我们假设: 在分类问题中我们假设: 他们都是广义线性模型中的一个例子,在理解广义线性模型之前需要先理解指数分布族。指数分布族(The Exponential Family) 定义:如果一个分布可以用如下公式表达,那么这个分布就属于指数分布族: 公式中y是随机变量;h(x)称为基
近些年来,国内的苏丹红、吊白块、毒米、毒油、孔雀石绿、瘦肉精、三聚氰胺等事件,使得我国乃至全球的食品安全问题形势十分严峻。为此,从全面提升各地卫生部门监管与检验检疫工作水平和效率的角度考虑,建立以各地卫生监督部门为中心、各食品生产经营企业为对象的网络视频监控系统势在必行。TSINGSEE青犀视频研发团队设计的国标GB28181协议视频平台EasyGBS,就能够为该场景提供监控可视化的解决方案。EasyGBS通过建立以食品安全管理机关为监控中心,利用车载监控系统和单兵巡查系统,通过网络接入平台,使得整
使用yum代替up2date自动更新升级RedHatAS3(转) 最近装了一台RedHat AS3U2 服务器,AS3U2是从网上免费下载的,没有购买RedHat的服务,所以不能使用rhn up2date升级系统;虽说U2已经...
前两天探究一个直接从Excel中复制数据到html中的实现方法。现在很多html编辑器都实现了表格的复制,但是如果不借助编辑器,如何将Excel中的数据复制到html中对应的table呢?基本的思路就是获取到复制的内容,然后解析并填入相应的位置中。首先想到的是通过js获取剪切板的内容,可惜的是,在浏览器中通过js获取剪切板的内容是被限制的。另外的一种方法,是通过flash去获取,这种方法可以避开浏...
解决制作托盘气泡提示中遇到的NIF_INFO : undeclared identifier VC6下实现托盘气泡提示的关键是要更新SDK,实际上应该就是要更新SHELLAPI.h,SHELL32.lib的文件吧曾经打算在网上下载新版的VC7,可是至今未能如愿载不了!却得到了高手的回贴得知一个相对软新的SDK的微软官网下载地址:http://www.microsoft.co
在ios 系统中, 设置border-radius 可能会不生效(安卓有效),直接给要设置的元素设置 border-radius属性,再添加下面的代码即可实现功能: -webkit-backface-visibility: hidden; -webkit-transform: translate3d(0, 0, 0);下面贴上自己项目中这一部分代码:...
嵌入式C学习(二)Typedef结构体宏定义条件编译ifdefifndefundefTypedefTypedef为C语言的关键字,作用是为一种数据类型定义一个新名字。这里的数据类型包括内部数据类型(int、char等)和自定义的数据类型(struct等)。在编程中使用typedef目的一般有两个,一个是给变量一个易记且意义明确的新名字,另一个是简化一些比较复杂的类型声名。例如:typedef struct{ u16 seq_num; u16 len; u8 dev_id[GPRS_PRO_
首先声明: 本文章使用的都是windows版的.1. Grafana安装1.1 首先先去官网下载grafana(https://grafana.com/grafana/download/8.0.3?edition=oss)选中对应的版本, 系统, 就可以下载了, msi就是安装程序, zip是压缩包, 下哪个都可以.下载完成后, 打开bin目录, 执行grafana-server.exe. 如下图启动成功, 访问localhost:3000, 用户名密码都是 admin. 到..
QT入门第十天QT安装和使用alsa库和jpeg库实现音视频录制第一章 ALSA库的移植和使用1.ALSA简介2.移植ALSA(1)移植步骤3.把移植好的库下载到开发板配置4.使用移植好的ALSA工具录音和播放5.ALSA快速安装教程第二章 libjpeg移植1.安装和使用步骤
市面上关于jQuery的书很多,但在我看来,为了学jQuery买书,就像买一本《傻瓜相机操作指南》。如果有必要,只会证明jQuery作为一个JavaScript库,写得不够好。而jQuery恰恰是设计良好,容易掌握正是它的优点之一。学会jQuery,一篇文章的篇幅正合适。当然前提是已经掌握JavaScript语言。不看注释先来看几段jQuery代码(片段二和三取自jQuery官方网