进程与线程
几乎所有的操作系统都支持同时运行多个任务,每个任务通常是一个程序,每一个运行中的程序就是一个进程,即进程是应用程序的执行实例。现代的操作系统几乎都支持多进程并发执行。
并发和并行是两个概念,并行指在同一时刻有多条指令在多个处理器上同时执行;并发是指在同一时刻只能有一条指令执行,但多个进程指令被快速轮换执行,使得在宏观上具有多个进程同时执行的效果。
线程是进程的组成部分,一个进程可以拥有多个线程。在多线程中,会有一个主线程来完成整个进程从开始到结束的全部操作,而其他的线程会在主线程的运行过程中被创建或退出。进程初始化后,要求有一个主线程,但也可以在进程内创建多个顺序执行流,这些顺序执行流就是线程。
当一个进程里只有一个线程时,叫作单线程。超过一个线程就叫作多线程。
每个线程必须有自己的父进程,且它可以拥有自己的堆栈、程序计数器和局部变量,但不拥有系统资源,因为它和父进程的其他线程共享该进程所拥有的全部资源。线程可以完成一定的任务,可以与其他线程共享父进程中的共享变量及部分环境,相互之间协同完成进程所要完成的任务。
多个线程共享父进程里的全部资源,会使得编程更加方便,需要注意的是,要确保线程不会妨碍同一进程中的其他线程。
线程是独立运行的,它并不知道进程中是否还有其他线程存在。线程的运行是抢占式的,也就是说,当前运行的线程在任何时候都可能被挂起,以便另外一个线程可以运行。多线程也是并发执行的,即同一时刻,Python主程序只允许有一个线程执行,这和全局解释器锁有关系,后续会做详细介绍。
一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发运行。对多线程实现调度和管理以及资源分配,线程的调度和管理由进程本身负责完成。
创建线程
threading模块提供功能丰富的多线程支持,创建线程方式:
- 使用threading模块中Thread类的构造器创建线程。即直接对类 threading.Thread进行实例化创建线程,并调用实例化对象的start()方法启动线程。
- 继承 threading 模块中的 Thread 类创建线程类。即用 threading.Thread派生出一个新的子类,将新建类实例化创建线程,并调用其start()方法启动线程。
调用Thread类的构造器创建线程
Thread 类提供了如下的 init() 构造器,可以用来创建线程:__init__(self, group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)
此构造方法中,以上所有参数都是可选参数,即可以使用,也可以忽略。其中各个参数的含义如下:
- group:指定所创建的线程隶属于哪个线程组(此参数尚未实现,无需调用);
- target:指定所创建的线程要调度的目标方法(最常用);
- args:以元组的方式,为target指定的方法传递参数;
- kwargs:以字典的方式,为target指定的方法传递参数;
- daemon:指定所创建的线程是否为后代线程。
# Thread类的构造函数创建一个线程
import threading
# 定义线程要调用的方法,*add可接受多个非关键字方式传入的参数
def action(*add):
for arc in add:
# 调用getname()方法获取线程名
print(threading.current_thread().getName() + " " + arc)
# 定义为线程方法传入的参数
my_tuple = ("the test1",\
"the test2",\
"the test3")
# 创建线程
thread = threading.Thread(target = action, args=my_tuple)
# threading模块提供start()方法启动线程
thread.start()
for i in range(2):
print(threading.current_thread().getName())
'''
Thread-1 the test1
Thread-1 the test2
Thread-1 the test3
MainThread
MainThread
'''
默认情况下,主线程的名字为 MainThread,用户启动的多个线程的名字依次为 Thread-1、Thread-2、Thread-3、…、Thread-n等。如果程序中不显式创建任何线程,则所有程序的执行,都将由主线程 MainThread 完成,程序就只能按照顺序依次执行。
当前程序中有2个线程,分别为主线程 MainThread 和子线程 Thread-1,它们以并发方式执行,即 Thread-1 执行一段时间,然后 MainThread 执行一段时间。通过轮流获得CPU执行一段时间的方式,程序的执行在多个线程之间切换,从而给用户一种错觉,即多个线程似乎同时在执行。
继承Thread类创建线程类
通过继承 Thread 类,我们可以自定义一个线程类,从而实例化该类对象,获得子线程。需要注意的是,在创建 Thread类的子类时,必须重写从父类继承得到的run()方法。因为该方法即为要创建的子线程执行的方法,其功能如同第一种创建方法中的action()自定义函数。
import threading
# 创建子线程
class my_thread(threading.Thread):
def __init__(self, add):
threading.Thread.__init__(self)
self.add = add
# 重写run()方法
def run(self):
for arc in self.add:
# 调用getName()方法获取当前执行该程序的线程名
print(threading.current_thread().getName() + " " + arc)
# 定义run()方法的参数
my_tuple = ("The test1", "The test2", "The test3")
mythread = my_thread(my_tuple)
# 启动
mythread.start()
# 主线程
for i in range(2):
print(threading.current_thread().getName())
'''
Thread-1 The test1
Thread-1 The test2
Thread-1 The test3
MainThread
MainThread
'''
线程的生命周期(新建、就绪、运行、阻塞和死亡)
CPU 在轮换执行线程过程中,线程都经历了什么呢?线程从创建到消亡的整个过程,可能会历经 5 种状态,分别是新建、就绪、运行、阻塞和死亡,如图 1 所示。

线程的新建和就绪状态
所谓就绪,就是告诉 CPU,该线程已经可以执行了,但是具体什么时候执行,取决于 CPU 什么时候调度它。换句话说,如果一个线程处于就绪状态,只能说明此线程已经做好了准备,随时等待 CPU 调度执行,并不是说执行了 start() 方法此线程就会立即被执行。
一方面 Python解释器会将它们看做是普通的函数调用和类方法调用。另一方面,由于新建的线程属于新建状态而不是就绪状态,因此不会得到 CPU 的调度。
线程的运行和阻塞状态
如果当前有多个线程处于就绪状态(等待 CPU 调度)时,处于运行状态的线程将无法一直霸占 CPU 资源,为了使其它线程也有执行的机会,CPU 会在一定时间内强制当前运行的线程让出 CPU 资源,以供其他线程使用。而对于获得 CPU 调度却没有执行完毕的线程,就会进入阻塞状态。
如果处于运行状态的线程发生如下几种情况,也将会由运行状态转到阻塞状态:
- 线程调用了 sleep() 方法;
- 线程等待接收用户输入的数据;
- 线程试图获取某个对象的同步锁(后续章节会详细讲解)时,如果该锁被其他线程所持有,则当前线程进入阻塞状态;
- 线程调用 wait() 方法,等待特定条件的满足;
该线程才会有阻塞状态转到就绪状态,继续等待 CPU 调度(如图 1 所示)。以上 4 种可能发生线程阻塞的情况,解决措施分别如下:
- sleep() 方法规定的时间已过;
- 线程接收到了用户输入的数据;
- 其他线程释放了该同步锁,并由该线程获得;
- 调用set()方法发出通知;
线程死亡状态
执行结束的线程将处于死亡状态。线程执行结束,除了正常执行结束外,如果程序执行过程发生异常(Exception)或者错误(Error),线程也会进入死亡状态。对于处于死亡状态的线程,有以下2点需要注意:
- 主线程死亡,并不意味着所有线程全部死亡。也就是说,主线程的死亡,不会影响子线程继续执行;反之也是如此。
- 对于死亡的线程,无法再调用start()方法使其重新启动,否则Python解释器将抛出RuntimeError异常。
Thread join()用法
join() 方法的功能是在程序指定位置,优先让该方法的调用者使用 CPU 资源。该方法的语法格式如下:
thread.join( [timeout] )
其中,thread 为 Thread 类或其子类的实例化对象;timeout 参数作为可选参数,其功能是指定 thread 线程最多可以霸占 CPU 资源的时间(以秒为单位),如果省略,则默认直到 thread 执行结束(进入死亡状态)才释放 CPU 资源。
daemon守护线程
当程序中拥有多个线程时,主线程执行结束并不会影响子线程继续执行。换句话说,只有程序中所有线程全部执行完毕后,程序才算真正结束。
除此之外,Python 还支持创建另一种线程,称为守护线程(或后台线程)。此类线程的特点是,当程序中主线程及所有非守护线程执行结束时,未执行完毕的守护线程也会随之消亡(进行死亡状态),程序将结束运行。
Python 解释器的垃圾回收机制就是守护线程的典型代表,当程序中所有主线程及非守护线程执行完毕后,垃圾回收机制也就没有再继续执行的必要了。
通过调用thread线程的daemon属性并赋值为True,则该 thread 线程就变成了守护线程。
thread.daemon = True
sleep()函数:线程睡眠
位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待CPU调度。需要先引入time模块,语法规则如下:
time.sleep(secs) # secs参数用于指定暂停秒数
互斥锁(Lock):解决多线程安全问题
多线程的优势在于并发性,即可以同时运行多个任务。但是当线程需要使用共享数据时,也可能会由于数据不同步产生“错误情况”,这是由系统的线程调度具有一定的随机性造成的。银行取钱的基本流程可以分为如下几个步骤:
- 用户输入账户、密码,系统判断用户的账户、密码是否匹配。
- 用户输入取款金额。
- 系统判断账户余额是否大于取款金额。
- 如果余额大于取款金额,则取款成功;如果余额小于取款金额,则取款失败。
# 模拟银行取钱
class Account:
# 定义构造器
def __init__(self, account_no, balance):
self.account_no = account_no
self.balance = balance
# 启动两个线程取钱
import threading
import time
import Account
# 定义一个函数来模拟取钱操作
def draw(account, draw_amount):
# 账户余额大于取钱数目
if account.balance >= draw_amount:
# 吐出钞票
print(threading.current_thread().name\
+ "取钱成功!吐出钞票:" + str(draw_amount))
# time.sleep(0.001)
# 修改余额
account.balance -= draw_amount
print("\t余额为: " + str(account.balance))
else:
print(threading.current_thread().name\
+ "取钱失败!余额不足!")
# 创建一个账户
acct = Account.Account("1234567" , 1000)
# 模拟两个线程对同一个账户取钱
threading.Thread(name='甲', target=draw , args=(acct , 800)).start()
threading.Thread(name='乙', target=draw , args=(acct , 800)).start()
'''
甲取钱成功!吐出钞票:800
乙取钱成功!吐出钞票:800
余额为: 200
余额为: -600
'''
账户余额只有1000元时取出了 1600 元,而且账户余额出现了负值,远不是银行所期望的结果。虽然上面程序是人为地使用 time.sleep(0.001) 来强制线程调度切换,但这种切换也是完全可能发生的(100000 次操作只要有 1 次出现了错误,那就是由编程错误引起的)。
Python互斥锁同步线程
run()方法的方法体不具有线程安全性,程序中有两个并发线程在修改Account对象,而且系统恰好在注释代码处执行线程切换,切换到另一个修改Account对象的线程,所以就出现了问题。为了解决这个问题,Python 的threading模块引入了互斥锁(Lock)。threading 模块提供了Lock和RLoc 两个类,它们都提供了如下两个方法来加互斥锁和释放互斥锁:
- acquire(blocking=True, timeout=-1):请求对 Lock或RLock加锁,其中timeout参数指定加锁多少秒。
- release():释放锁。
Lock和RLock的区别如下:
- threading.Lock:它是一个基本的锁对象,每次只能锁定一次,其余的锁请求,需等待锁释放后才能获取。
- threading.RLock:它代表可重入锁(Reentrant Lock)。对于可重入锁,在同一个线程中可以对它进行多次锁定,也可以多次释放。如果使用 RLock,那么 acquire()和release()方法必须成对出现。如果调用了n次acquire()加锁,则必须调用n次release()才能释放锁。
RLock锁具有可重入性。也就是说,同一个线程可以对已被加锁的RLock锁再次加锁,RLock对象会维持一个计数器来追踪acquire()方法的嵌套调用,线程在每次调用acquire()加锁后,都必须显式调用release()方法来释放锁。所以,一段被锁保护的方法可以调用另一个被相同锁保护的方法。
Lock是控制多个线程对共享资源进行访问的工具。通常,锁提供了对共享资源的独占访问,每次只能有一个线程对Lock对象加锁,线程在开始访问共享资源之前应先请求获得Lock对象。当对共享资源访问完成后,程序释放对Lock对象的锁定。RLock代码格式如下:
class X:
# 保证线程安全
def m():
# 加锁
self.lock.acquire()
try:
# 需要保证线程安全的代码
#...方法体
# finally块保证释放锁
finally:
# 修改完成,释放锁
self.lock.release()
使用RLock对象来控制线程安全,当加锁和释放锁出现在不同的作用范围内时,通常建议使用finally块来确保在必要时释放锁。通过使用 Lock 对象可以非常方便地实现线程安全的类,线程安全的类具有如下特征:
- 该类的对象可以被多个线程安全地访问。
- 每个线程在调用该对象的任意方法之后,都将得到正确的结果。
- 每个线程在调用该对象的任意方法之后,该对象都依然保持合理的状态。
总的来说,不可变类总是线程安全的,因为它的对象状态不可改变;但可变对象需要额外的方法来保证其线程安全。例如,上面的 Account 就是一个可变类,它的self.account_no和self._balance(为了更好地封装,将 balance改名为_balance)两个成员变量都可以被改变,当两个钱程同时修改Account对象的self._balance成员变量的值时,程序就出现了异常。下面将Account类对self.balance的访问设置成线程安全的,那么只需对修改self.balance的方法增加线程安全的控制即可。
# 启动两个线程取钱
import threading
import time
class account:
# 定义构造器
def __init__(self, account_no, balance):
self.account_no = account_no
self._balance = balance
self.lock = threading.RLock()
def getBalance(self):
return self._balance
# 定义一个函数来模拟取钱操作
def draw(self, draw_amount):
# 加锁
self.lock.acquire()
try:
# 账户余额大于取钱数目
if self._balance >= draw_amount:
# 吐出钞票
print(threading.current_thread().name\
+ "取钱成功!吐出钞票:" + str(draw_amount))
time.sleep(0.001)
# 修改余额
self._balance -= draw_amount
print("\t余额为: " + str(self._balance))
else:
print(threading.current_thread().name\
+ "取钱失败!余额不足!")
finally:
# 修改完成,释放锁
self.lock.release()
def draw(account, draw_amount):
# 直接调用account对象的draw()方法来执行取钱操作
account.draw(draw_amount)
# 创建一个账户
acct = account("1234567", 1000)
# 模拟两个线程对同一个账户取钱
threading.Thread(name='甲', target=draw, args=(acct, 800)).start()
threading.Thread(name='乙', target=draw, args=(acct, 800)).start()
'''
甲取钱成功!吐出钞票:800
余额为: 200
乙取钱失败!余额不足!
'''
可变类的线程安全是以降低程序的运行效率作为代价的,为了减少线程安全所带来的负面影响,程序可以采用如下策略:
- 不要对线程安全类的所有方法都进行同步,只对那些会改变竞争资源(竞争资源也就是共享资源)的方法进行同步。例如,上面Account类中的account_no实例变量就无须同步,所以程序只对draw()方法进行了同步控制。
- 如果可变类有两种运行环境,单线程环境和多线程环境,则应该为该可变类提供两种版本,即线程不安全版本和线程安全版本。在单线程环境中使用钱程不安全版本以保证性能,在多线程环境中使用线程安全版本。
死锁,如何避免死锁
当两个线程相互等待对方释放资源时,就会发生死锁。Python 解释器没有监测,也不会主动采取措施来处理死锁情况,所以在进行多线程编程时应该采取措施避免出现死锁。
一旦出现死锁,整个程序既不会发生任何异常,也不会给出任何提示,只是所有线程都处于阻塞状态,无法继续。
在系统中出现多个同步监视器,容易出现死锁:
import threading
import time
class A:
def __init__(self):
self.lock = threading.RLock()
def foo(self, b):
try:
self.lock.acquire()
print("当前线程名: " + threading.current_thread().name\
+ " 进入了A实例的foo()方法" ) # ①
time.sleep(0.2)
print("当前线程名: " + threading.current_thread().name\
+ " 企图调用B实例的last()方法") # ③
b.last()
finally:
self.lock.release()
def last(self):
try:
self.lock.acquire()
print("进入了A类的last()方法内部")
finally:
self.lock.release()
class B:
def __init__(self):
self.lock = threading.RLock()
def bar(self, a):
try:
self.lock.acquire()
print("当前线程名: " + threading.current_thread().name\
+ " 进入了B实例的bar()方法" ) # ②
time.sleep(0.2)
print("当前线程名: " + threading.current_thread().name\
+ " 企图调用A实例的last()方法") # ④
a.last()
finally:
self.lock.release()
def last(self):
try:
self.lock.acquire()
print("进入了B类的last()方法内部")
finally:
self.lock.release()
a = A()
b = B()
def init():
threading.current_thread().name = "主线程"
# 调用a对象的foo()方法
a.foo(b)
print("进入了主线程之后")
def action():
threading.current_thread().name = "副线程"
# 调用b对象的bar()方法
b.bar(a)
print("进入了副线程之后")
# 以action为target启动新线程
threading.Thread(target=action).start()
# 调用init()函数
init()
程序中 A 对象和 B 对象的方法都是线程安全的方法。程序中有两个线程执行,副线程的线程执行体是 action() 函数,主线程的线程执行体是 init() 函数(主程序调用了 init() 函数)。其中在 action() 函数中让 B 对象调用 bar() 方法,而在 init() 函数中让 A 对象调用 foo() 方法。
图 1 显示 action() 函数先执行,调用了 B 对象的 bar() 方法,在进入 bar() 方法之前,该线程对 B 对象的 Lock 加锁(当程序执行到 ② 号代码时,副线程暂停 0.2s);CPU 切换到执行另一个线程,让 A 对象执行 foo() 方法,所以看到主线程开始执行 A 实例的 foo() 方法,在进入 foo() 方法之前,该线程对 A 对象的 Lock 加锁(当程序执行到 ① 号代码时,主线程也暂停 0.2s)。
接下来副线程会先醒过来,继续向下执行,直到执行到 ④ 号代码处希望调用 A 对象的 last() 方法(在执行该方法之前,必须先对 A 对象的 Lock 加锁),但此时主线程正保持着 A 对象的 Lock 的锁定,所以副线程被阻塞。
接下来主线程应该也醒过来了,继续向下执行,直到执行到 ③ 号代码处希望调用 B 对象的 last() 方法(在执行该方法之前,必须先对 B 对象的 Lock 加锁),但此时副线程没有释放对 B 对象的 Lock 的锁定。
至此,就出现了主线程保持着 A 对象的锁,等待对 B 对象加锁,而副线程保持着 B对象的锁,等待对 A 对象加锁,两个线程互相等待对方先释放锁,所以就出现了死锁。
死锁是不应该在程序中出现的,在编写程序时应该尽量避免出现死锁。下面有几种常见的方式用来解决死锁问题:
- 避免多次锁定。尽量避免同一个线程对多个 Lock 进行锁定。例如上面的死锁程序,主线程要对 A、B 两个对象的 Lock 进行锁定,副线程也要对 A、B 两个对象的 Lock 进行锁定,这就埋下了导致死锁的隐患。
- 具有相同的加锁顺序。如果多个线程需要对多个 Lock 进行锁定,则应该保证它们以相同的顺序请求加锁。比如上面的死锁程序,主线程先对 A 对象的 Lock 加锁,再对 B 对象的 Lock 加锁;而副线程则先对 B 对象的 Lock 加锁,再对 A 对象的 Lock 加锁。这种加锁顺序很容易形成嵌套锁定,进而导致死锁。如果让主线程、副线程按照相同的顺序加锁,就可以避免这个问题。
- 使用定时锁。程序在调用 acquire() 方法加锁时可指定 timeout 参数,该参数指定超过 timeout 秒后会自动释放对 Lock 的锁定,这样就可以解开死锁了。
- 死锁检测。死锁检测是一种依靠算法机制来实现的死锁预防机制,它主要是针对那些不可能实现按序加锁,也不能使用定时锁的场景的。
condition实现线程通信
线程的调度具有一定的透明性,通常程序无法准确控制线程的轮换执行,如果有需要,Python可通过线程通信来保证线程协调运行。
假设系统中有两个线程,这两个线程分别代表存款者和取钱者,现在假设系统有一种特殊的要求,即要求存款者和取钱者不断地重复存款、取钱的动作,而且要求每当存款者将钱存入指定账户后,取钱者就立即取出该笔钱。不允许存款者连续两次存钱,也不允许取钱者连续两次取钱。
将Condition对象与Lock对象组合使用,可以为每个对象提供多个等待集(wait-set)。因此,Condition对象总是需要有对应的Lock对象。从Condition的构造器 init(self, lock=None) 可以看出,程序在创建 Condition 时可通过 lock 参数传入要绑定的 Lock 对象;如果不指定 lock 参数,在创建 Condition 时它会自动创建一个与之绑定的 Lock 对象。Condition类提供了如下方法:
- acquire([timeout])/release():调用 Condition 关联的 Lock 的 acquire() 或 release() 方法。
- wait([timeout]):导致当前线程进入 Condition 的等待池等待通知并释放锁,直到其他线程调用该 Condition 的 notify() 或 notify_all() 方法来唤醒该线程。在调用该 wait() 方法时可传入一个 timeout 参数,指定该线程最多等待多少秒。
- notify():唤醒在该 Condition 等待池中的单个线程并通知它,收到通知的线程将自动调用 acquire() 方法尝试加锁。如果所有线程都在该 Condition 等待池中等待,则会选择唤醒其中一个线程,选择是任意性的。
- notify_all():唤醒在该 Condition 等待池中等待的所有线程并通知它们。
Queue 队列实现线程通信
queue 模块下主要提供了三个类,分别代表三种队列,它们的主要区别就在于进队列、出队列的不同。关于这三个队列类的简单介绍如下:
- queue.Queue(maxsize=0):代表 FIFO(先进先出)的常规队列,maxsize 可以限制队列的大小。如果队列的大小达到队列的上限,就会加锁,再次加入元素时就会被阻塞,直到队列中的元素被消费。如果将 maxsize 设置为 0 或负数,则该队列的大小就是无限制的。
- queue.LifoQueue(maxsize=0):代表 LIFO(后进先出)的队列,与 Queue 的区别就是出队列的顺序不同。
- PriorityQueue(maxsize=0):代表优先级队列,优先级最小的元素先出队列。
三个队列类的属性和方法基本相同,提供了如下属性和方法:
- Queue.qsize():返回队列的实际大小,也就是该队列中包含几个元素。
- Queue.empty():判断队列是否为空。
- Queue.full():判断队列是否已满。
- Queue.put(item, block=True, timeout=None):向队列中放入元素。如果队列己满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到该队列的元素被消费;如果队列己满,且 block 参数为 False(不阻塞),则直接引发 queue.FULL 异常。
- Queue.put_nowait(item):向队列中放入元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。
- Queue.get(item, block=True, timeout=None):从队列中取出元素(消费元素)。如果队列已满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到有元素被放入队列中; 如果队列己空,且 block 参数为 False(不阻塞),则直接引发 queue.EMPTY 异常。
- Queue.get_nowait(item):从队列中取出元素,不阻塞。相当于在上一个方法中将block参数设置为False。
import queue
bq = queue.Queue(2) # 长度为2的阻塞队列
bq.put("Python")
bq.put("Python")
print("1111111111")
bq.put("Python") # ① 阻塞线程
print("2222222222")
# 当程序试图放入第三个元素时,如果使用 put() 方法尝试放入元素将会阻塞线程
在下面程序中就可以利用Queue来实现线程通信了。如下:
import threading
import time
import queue
def product(bq):
str_tuple = ("Python", "Kotlin", "Swift")
for i in range(99999):
print(threading.current_thread().name + "生产者准备生产元组元素!")
time.sleep(0.2);
# 尝试放入元素,如果队列已满,则线程被阻塞
bq.put(str_tuple[i % 3])
print(threading.current_thread().name \
+ "生产者生产元组元素完成!")
def consume(bq):
while True:
print(threading.current_thread().name + "消费者准备消费元组元素!")
time.sleep(0.2)
# 尝试取出元素,如果队列已空,则线程被阻塞
t = bq.get()
print(threading.current_thread().name \
+ "消费者消费[ %s ]元素完成!" % t)
# 创建一个容量为1的Queue
bq = queue.Queue(maxsize=1)
# 启动3个生产者线程
threading.Thread(target=product, args=(bq, )).start()
threading.Thread(target=product, args=(bq, )).start()
threading.Thread(target=product, args=(bq, )).start()
# 启动一个消费者线程
threading.Thread(target=consume, args=(bq, )).start()
'''
Thread-4消费者消费[ Kotlin ]元素完成!
Thread-2生产者生产元组元素完成!
Thread-2生产者准备生产元组元素!
Thread-4消费者准备消费元组元素!
Thread-4消费者消费[ Python ]元素完成!
Thread-1生产者生产元组元素完成!
'''
Event实现线程通信
Event 本身管理一个内部旗标,程序可以通过 Event 的 set() 方法将该旗标设置为 True,也可以调用 clear() 方法将该旗标设置为 False。程序可以调用 wait() 方法来阻塞当前线程,直到 Event 的内部旗标被设置为 True。Event提供了如下方法:
- is_set():该方法返回 Event 的内部旗标是否为True。
- set():该方法将会把 Event 的内部旗标设置为 True,并唤醒所有处于等待状态的线程。
- clear():该方法将 Event 的内部旗标设置为 False,通常接下来会调用 wait() 方法来阻塞当前线程。
- wait(timeout=None):该方法会阻塞当前线程。
结合Event的内部旗标,同样可实现前面的Account的生产者-消费者效果:存钱线程(生产者)存钱之后,必须等取钱线程(消费者)取钱之后才能继续向下执行。Event实际上优点类似于Condition和旗标的结合体,但Event本身并不带Lock对象,因此如果要实现线程同步,还需要额外的Lock对象。
# 取钱存钱的Event实现
import threading
class Account:
# 定义构造器
def __init__(self, account_no, balance):
# 封装账户编号、账户余额的两个成员变量
self.account_no = account_no
self._balance = balance
self.lock = threading.Lock()
self.event = threading.Event()
# 因为账户余额不允许随便修改,所以只为self._balance提供getter方法
def getBalance(self):
return self._balance
# 提供一个线程安全的draw()方法来完成取钱操作
def draw(self, draw_amount):
# 加锁
self.lock.acquire()
# 如果Event内部旗标为True,表明账户中已有人存钱进去
if self.event.is_set():
# 执行取钱操作
print(threading.current_thread().name
+ " 取钱:" + str(draw_amount))
self._balance -= draw_amount
print("账户余额为:" + str(self._balance))
# 将Event内部旗标设为False
self.event.clear()
# 释放加锁
self.lock.release()
# 阻塞当前线程阻塞
self.event.wait()
else:
# 释放加锁
self.lock.release()
# 阻塞当前线程阻塞
self.event.wait()
def deposit(self, deposit_amount):
# 加锁
self.lock.acquire()
# 如果Event内部旗标为False,表明账户中还没有人存钱进去
if not self.event.is_set():
# 执行存款操作
print(threading.current_thread().name\
+ " 存款:" + str(deposit_amount))
self._balance += deposit_amount
print("账户余额为:" + str(self._balance))
# 将Event内部旗标设为True
self.event.set()
# 释放加锁
self.lock.release()
# 阻塞当前线程阻塞
self.event.wait()
else:
# 释放加锁
self.lock.release()
# 阻塞当前线程阻塞
self.event.wait()