一、concurrent.futures模块:主要用来开进程池和线程池,继承Executor类,专门是针对异步调用的
1、开进程池:ProcessPoolExecutor类
(1)方法:
》p=ProcessPoolExecutor() 得到线程池对象,可以指定个数,默认是cpu核数
》p.shutdown(wait=True) 相当于p.close()和p.join() 当wait是False时,就只有close不会等结束直接继续执行下面的命令,而实际上不管wait是什么主进程或者是线程都会等待所有任务执行完的
》future=p.submit(函数名,参数)直接写参数,没有args,得到一个对象, 相当于p.apply_async()
》future.result() 取线程运行结果
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutorimport time,random,osdef work(n): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return n**2if __name__ == '__main__': executor=ProcessPoolExecutor() futrues=[] for i in range(10): future=executor.submit(work,i) #不断的发送任务请求,异步调用 # print(executor.submit(work,i).result()) #模拟同步的效果 futrues.append(future) executor.shutdown(wait=True) #线程池不再接受任务请求,并且等待所有任务结束 print('主') for obj in futrues: print(obj.result()) #得到结果
2、开线程池:ThreadPoolExecutor类
(1)方法:
》t=ThreadPoolExecutor() 可以指定个数,默认个数是cpu核数乘5
》其余和开进程池的方法一样:shutdown,submit,result
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutorimport time,random,osimport threadingdef work(n): print('%s is running' %threading.current_thread().getName()) time.sleep(random.randint(1,3)) return n**2if __name__ == '__main__': executor=ThreadPoolExecutor() futrues=[] for i in range(30): future=executor.submit(work,i) futrues.append(future) executor.shutdown(wait=True) print('主') for obj in futrues: print(obj.result())
3、map方法
(1)作用:只是实现了提交多少次任务,没法取结果
(2)用法:线程池对象.map(函数名,可迭代对象) 循环可迭代对象的每个元素作为函数的参数,循环次数也就是开启了多少个进程或是线程,其实就是for循环和submit的总和,但是区别就是map无法得到每一个线程对象,也就无法得到结果
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutorimport time,random,osdef work(n): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return n**2if __name__ == '__main__': executor=ProcessPoolExecutor() executor.map(work,range(10)) executor.shutdown(wait=True) print('主')
4、回调函数
submit(函数名1,参数). add_done_callback(函数名2)
》函数名2的参数是前面产生的线程对象,不是函数名1的结果
》若函数名2的参数需要的是函数名1产生的结果,那么就用对象的result方法取得就可以了
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimport requestsimport osimport time,randomdef get(url): print('%s GET %s' %(os.getpid(),url)) response=requests.get(url) time.sleep(random.randint(1,3)) if response.status_code == 200: print('%s DONE %s' % (os.getpid(), url)) return { 'url':url,'text':response.text}def parse(future): dic=future.result() print('%s PARSE %s' %(os.getpid(),dic['url'])) time.sleep(1) res='%s:%s\n' %(dic['url'],len(dic['text'])) with open('db.txt','a') as f: f.write(res)if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=ProcessPoolExecutor() start_time=time.time() objs=[] for url in urls: # obj=p.apply_async(get,args=(url,),callback=parse) #主进程负责干回调函数的活 p.submit(get,url).add_done_callback(parse) print('主',(time.time()-start_time))
补充方法:
》线程对象.cancel() 在submit后面,意思是发起请求后立即取消这个任务》线程对象.exception(4) 表示在指定时间(4秒)内线程还没结束就会抛出超时异常
二、死锁现象和递归锁
1、死锁现象:两个进程互相锁住,互相抢到对方想要的锁但无法释放
from threading import Thread,Lock,RLockimport timemutexA=Lock()mutexB=Lock()class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 抢 A锁' %self.name) mutexB.acquire() #线程2停在这一步无法继续执行,拿到A锁,因为B锁被线程1抢到, print('%s 抢 B锁' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 抢 B锁' %self.name) time.sleep(0.1) #线程1进行到这一步时停止,拿到B锁, mutexA.acquire() #继续执行,因为线程2已经拿到A锁所以线程1无法继续执行,这就产生了死锁现象 print('%s 抢 A锁' %self.name) mutexA.release() # mutexB.release() #if __name__ == '__main__': for i in range(10): t=MyThread() t.start()
2、解决办法:递归锁 RLock (threading模块)
》将两个进程互相锁住的锁设置为同一把递归锁:mutexA=mutexB=RLock()
》递归锁的实现方法:acquire()一次就加1,release()一次就减1,只要计数不为0,别的线程就无法抢到,当计数为0,别的线程才可以抢
》与Lock的最大区别:RLock可以连续的require,但是Lock只能有一次require
from threading import Thread,Lock,RLockimport timemutexA=mutexB=RLock() #设置为同一把递归锁class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() #计数加1 print('%s 抢 A锁' %self.name) mutexB.acquire() #计数加1 print('%s 抢 B锁' %self.name) mutexB.release() #计数减1 mutexA.release() #计数减1 此时为0,这样别的线程就可以抢A锁和B锁了 def f2(self): mutexB.acquire() #计数加1,此时计数不为0,别的线程不能抢B锁和A锁 print('%s 抢 B锁' %self.name) time.sleep(0.1) #所有的程序都在睡 mutexA.acquire() #计数加1 print('%s 抢 A锁' %self.name) mutexA.release() #计数减1 mutexB.release() #计数减1 计数为0,别的线程就可以抢A锁和B锁了,一旦抢着计数又不为0此时别的线程又不能抢了,这样就避免了两个线程同时锁住对方想要的锁if __name__ == '__main__': for i in range(10): t=MyThread() t.start()
三、信号量:Semaqhore类(threading模块)
1、本质:就是一把锁,规定同一时间有多少个线程并发运行(用公共厕所来理解,坑数决定人数)
》》》》可以理解为有固定钥匙个数的一把锁,而且互斥锁可以理解为是信号量的一种特殊情况(个数=1)
2:用法:
sm=Semaqhore(5) 规定同一时间的线程并发运行个数
with sm :
3、和进程池、线程池的区别:
》相同点:都是规定和控制线程或进程的运行个数
》不同:进程池或者线程池代表着当前只有5个线程或进程存在且并发运行,而且这5个进程或线程的ID是不会变的,也就是说永远只有这5个进程或是线程来执行不同的任务;而信号量只是代表当前最多有5个线程或进程在并发运行,并不代表只有5个进程或线程存在,而且当其中一个执行完后释放掉锁则会有一个全新的进程或线程来抢这把锁运行,进程的ID和线程的名称是会变的
from threading import Semaphore,Thread,current_threadimport time,randomdef task(): with sm: print('%s 正在上厕所' %current_thread().getName()) time.sleep(random.randint(1,3))if __name__ == '__main__': sm=Semaphore(5) for i in range(11): t=Thread(target=task) t.start() #最后的结果肯定是不同的线程名称,不会有重复的
四、事件:Event类(threading模块)
》当两个线程或进程需要协同工作时,一个需要知道另一个运行到哪一步了
》e=Event() 得到一个对象
》e.set() 改变值
》e.wait() 当别的程序执行了e.set()后,当前程序的e.wait()就会结束继续向下执行,若别的程序没有执行e.set(),则当前程序就会一直处在e.wait()这一步就会一直等
timeout=时间,当设置了时间参数后,则e.wait()等到timeout的时间结束后就不等了
》is_set() 判断是否改变
from threading import Event,current_thread,Threadimport timee=Event()def check(): print('%s 正在检测' %current_thread().getName()) time.sleep(5) e.set()def conn(): count=1 while not e.is_set(): if count > 3: raise TimeoutError('连接超时') print('%s 正在等待%s连接' % (current_thread().getName(),count)) e.wait(timeout=1) #等1秒没有改变就不等了,若没有timeout就会一直等下去 count+=1 print('%s 开始连接' % current_thread().getName())if __name__ == '__main__': t1=Thread(target=check) t2=Thread(target=conn) t3=Thread(target=conn) t4=Thread(target=conn) t1.start() t2.start() t3.start() t4.start()
五、定时器:Timer(threading模块)
》t = Timer(1,函数名,args=(参数,)) 1就代表时间,等待指定时间后函数才执行
from threading import Timerdef hello(n): print("hello, world",n)t = Timer(2, hello,args=(1,))t.start() # 等1秒才执行, "hello, world" 才会打印
六、线程queue:import queue
(1)q=queue.Queue() 就是队列对象,同样有put和get方法
(2)q=queue.PriorityQueue 得到一个优先级队列对象
》q.put((优先级(数字),数据)) 数字越小优先级越高,会被优先取出,注意以元组或者列表形式存进队列
当优先级相同,就比较数据的大小,字符串比较ascii码值的大小(A—z,小—大),序列就一次比较每个元素的大小
(3)q=queue.LifoQueue()得到一个堆栈对象,后进先出,同样有put和get方法