Python多线程同步机制:条件变量、事件和屏障

笔记哥 / 03-28 / 15点赞 / 0评论 / 790阅读
# 一文速通 Python 并行计算:04 Python 多线程编程-多线程同步(下)—基于条件变量、事件和屏障 ![image](https://cdn.res.knowhub.vip/c/2503/28/44e7e6dd.png?G1cAAMT0bJxoe1XBNvqh%2f4lHQjNgkUZQKWG93nv3aUTf72Ck%2bMw%2blp8Pv%2bljOcGqgJXAMFaEgAwV46rFguSEUlNmiXs6) # **摘要:** 本文介绍了 Python 多线程同步的三种机制:条件变量(Condition)、事件(Event)和屏障(Barrier),条件变量指的是线程等待特定条件满足后执行,适用于生产者-消费者模型;Event 指的是线程通过事件标志进行同步,适用于线程间简单通信;Barrier 指的是多个线程需同步到同一阶段时使用,适用于并行任务的分阶段执行。 该文档是一份关于 **并行计算** 和 **Python 并发编程** 的学习指南,内容涵盖了并行计算的基本概念、Python 多线程编程、多进程编程以及协程编程的核心知识点: ![image](https://cdn.res.knowhub.vip/c/2503/28/467e6e83.png?G1YAAMTydJz4%2b%2bNTuo06fJsoEpoBiSyCSgnr9Z6z9i3y%2fU4wx2e0Pn1%2f%2bEvr04VWlUhC0JAQPBUKpFLMAu2yUgnGNRw%3d) # **正文** # 1.基于条件变量的线程同步 Python 提供的 `Condition` 对象提供了对复杂线程同步问题的支持。`Condition` 被称为条件变量,与互斥锁不同,条件变量是用来等待而不是用来上锁的。**条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。**条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作: - 一个线程等待"条件变量的条件成立"而挂起; - 另一个线程使 “条件成立”(给出条件成立信号)。 使用 `Condition` 的主要方式为:**线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件,**不断的重复这一过程,从而解决复杂的同步问题。 解释条件机制最好的例子还是生产者-消费者问题,在本例中,只要缓存不满,生产者一直向缓存生产;只要缓存不空,消费者一直从缓存取出(之后销毁);当缓冲队列不为空的时候,生产者将通知消费者;当缓冲队列不满的时候,消费者将通知生产者。 ```csharp **i**mport threading import time condition = threading.Condition() products = 0 class Producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): global condition, products while True: _# 消费者通过拿到锁来修改共享的资源_ if condition.acquire(): _# 线程首先acquire一个条件变量,然后判断生产是否饱和。_ if products < 10: _# 如果产品数量小于10,继续生成,并通过notify方法通知消费者_ _# 只要缓存不满,生产者一直向缓存生产;_ products += 1; print("Producer(%s):deliver one, now products:%s" % (self.name, products)) _# 当缓冲队列不为空的时候,生产者将通知消费者_ condition.notify() _# 如果已经满了,那么生产者进入等待状态,直到被唤醒_ else: print("Producer(%s):already 10, stop deliver, now products:%s" % (self.name, products)) condition.wait() _# 如果队列没有满,就生产1个item,通知状态并释放资源_ condition.release() time.sleep(2) class Consumer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): global condition, products while True: if condition.acquire(): if products > 1: _# 只要缓存不空,消费者一直从缓存取出(之后销毁)。_ products -= 1 print("Consumer(%s):consume one, now products:%s" % (self.name, products)) _# 当缓冲队列不满的时候,消费者将通知生产者。_ condition.notify() else: print("Consumer(%s):only 1, stop consume, products:%s" % (self.name, products)) _# 缓存空,消费者线程等待_ condition.wait() condition.release() time.sleep(2) if __name__ == "__main__": _# 首先是2个生成者生产products_ for p in range(0, 2): p = Producer() p.start() _# 接下来的10个消费者将会消耗products_ for c in range(0, 10): c = Consumer() c.start() ``` 运行的结果如下: ![image](https://cdn.res.knowhub.vip/c/2503/28/2c9f70f6.png?G1YAAMR0rnGCnldetxGHaoJAAs2ARBZBpYT1es9Z%2bxb5fiMY%2fDNan7Y%2f%2fKX1acJYL0KFYITCeRZAK7IiuFJLYsop%2bDUM) 乍一看这段代码好像会死锁,因为 `condition.acquire()` 之后就在 `wait()` 了,好像会一直持有锁。其实 `wait()` 会将锁释放,然后等待其他线程 `notify()` 之后会重新尝试获得锁。\*\*但是要注意 **notify()** 并不会自动释放锁,所以代码中有两行,先 **notify()** 然后再 \*\*\*\*release() \*\***。** ![image](https://cdn.res.knowhub.vip/c/2503/28/a44e52bd.png?G1cAAETn9LwUKJjhvtMdbIlTE20GLNIIKiWs13vO2jfR9wcYlp%2fR%2boz94TetzyCUKmAlMAorUoAzaxUV5ySKC2buntcI) 实际上,**条件的检测是在互斥锁的保护下进行的。线程在改变条件状态之前必须首先锁住互斥量。**如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。**如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。**如果两进程共享可读写的内存,条件变量 可以被用来实现这两进程间的线程同步。 另外:**Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock\*\*\*\*;**除了 `notify` 方法外,`Condition` 对象还提供了 `notifyAll` 方法,可以通知 `waiting` 池中的所有线程尝试 `acquire` 内部锁。由于上述机制,处于 `waiting` 状态的线程只能通过 `notify` 方法唤醒,所以 `notifyAll` 的作用在于防止有线程永远处于沉默状态。 **如果不使用条件变量,也可以不断循环检测缓存是否大于 0,但该方法会造成 CPU 资源的浪费。**采用条件变量这一问题就可以迎刃而解!条件变量允许一个线程休眠(阻塞等待)直至获取到另一个线程的通知(收到信号)再去执行自己的操作。 ![image](https://cdn.res.knowhub.vip/c/2503/28/741542aa.png?G1cAAMT0bJxoe9rANvqh%2f4lHQjNgkUZQKWG93nv3aUTf72BofGYfy8%2bH3%2fSxnGBVwInAME4IAYU5VaAUBNGsJlmrxT0d) ![image](https://cdn.res.knowhub.vip/c/2503/28/be1b018f.png?G1YAAMTsdJxI8kil26hD2jvFHc2ARBZBpYT1es9Z%2byb6fgcjx2e0Pn1%2f%2bEvr0wnFBJwIjMIJwaMyJ4OJWtCSVapeiGs4) 以上消费者-生产者模型过程如下: ![image](https://cdn.res.knowhub.vip/c/2503/28/235d8b91.png?G1cAAMTydJz4cw%2fVbdTh20SR0AxYpBFUSliv95y1b5HvN4LRP6P1afvDb1qfJkw1ECoEExQu8AK0KkuFCywlMsUMv4YB) 以下是另一个有趣的关于条件变量的例子: ![image](https://cdn.res.knowhub.vip/c/2503/28/655f0b6c.png?G1YAAMTW3Dgp8GRB22gDdWfqnTYDElkElRLW6917rpvo%2bwOMIz%2b9thHrw19qG0EoLmAlMAorkocxq%2bvpciU3mImz5NkD) 原理很简单,就是线程拿到锁先检查是不是自己渴望的状态。比如打印“B”的线程,渴望的状态 `current = 'B'` 然后打印出 B,将状态改成 C ,这样就成了打印“C”的线程渴望的状态。但是这里不能唤醒指定的线程,只好唤醒所有的线程,让他们自己再检查一遍状态了。 # 2.基于事件的线程同步 想象这样一个场景,你启动了多个线程,这些线程都要去访问一个资源,但是,这里有一个小小的问题,即将被访问的资源还没有准备好接受访问,那么此时,多个线程去访问,必然得到不响应,你还得处理这种得不到响应的情况。这样的场景下,能否先在主线程里去做试探,确定资源可以访问以后,再让已经启动了的多线程去访问呢?让我们考虑一下如何用 `Event` 来处理这样的问题。 1. 创建一个 `Event` 对象,现在,事件内部标识是 `False`; 2. 启动多线程,线程里调用 `wait` 方法,这时,会阻塞; 3. 主线程去试探,确定资源可访问以后,调用 `set` 方法,将内置标志设置为 `True`; 4. 该 `Event` 会通知所有等待状态的线程恢复运行:已经调用 `wait` 的线程接手到事件信息,访问资源。 以下为示例代码: ```csharp import threading from threading import Event def worker(event_obj, i): print('{i}号线程等待事件信号'.format(i=i)) event_obj.wait() print('{i}号线程收到事件信号'.format(i=i)) event = Event() for i in range(5): t = threading.Thread(target=worker, args=(event, i)) t.start() print('确认资源可用') event.set() ``` 以下为代码输出,可以看到在 `event.set()` 后所有线程恢复运行: ![image](https://cdn.res.knowhub.vip/c/2503/28/e9b8330b.png?G1YAAMTydJz4c%2b8buo06fJsoEpoBiSyCSgnr9Z6z9i3y%2fU4wx2e0Pn1%2f%2bEvr04VWE6FC0KAIngXIAHmVULWAZmBcwwE%3d) 让我们再一次回到生产者-消费者问题上,若要确保如果缓冲区满,生产者不会生成新数据,另外如果缓存区为空,消费者不会查找数据的要求,如何用 `event` 机制实现?代码如下: ```csharp import time from threading import Thread, Event import random items = [] event = Event() class consumer(Thread): def __init__(self, items, event): _# producer 类初始化时定义了item的list和 Event ,_ _# 与条件对象时候的例子不同,这里的list并不是全局的,而是通过参数传入的_ Thread.__init__(self) self.items = items self.event = event def run(self): while True: time.sleep(2) _# 等待元素到达_ _# 当元素到达时,consumer放弃这个锁_ _# 这就允许其他生产者或消费者进入并获得这个锁_ self.event.wait() _# 若consumer放弃这个锁被唤醒,它会重新获得锁_ _# 元素到达时,元素从items列表弹出_ item = self.items.pop() print('Consumer notify : %d popped from list by %s' % (item, self.name)) class producer(Thread): def __init__(self, items, event): Thread.__init__(self) self.items = items self.event = event def run(self): global item for i in range(100): time.sleep(2) item = random.randint(0, 256) self.items.append(item) print('Producer notify : item N° %d appended to list by %s' % (item, self.name)) print('Producer notify : event set by %s' % self.name) _# 添加元素后通知事件_ _# 将内部标识设置为 true 。所有正在等待这个事件的线程将被唤醒。_ _# 当标识为 true 时,调用wait()方法的线程不会被阻塞。_ self.event.set() print('Produce notify : event cleared by %s '% self.name) _# 将内部标识设置为 false 。之后调用wait()方法的线程将会被阻塞,_ _# 直到调用set()方法将内部标识再次设置为 true 。_ self.event.clear() if __name__ == '__main__': t1 = producer(items, event) t2 = consumer(items, event) t1.start() t2.start() t1.join() t2.join() ``` ![image](https://cdn.res.knowhub.vip/c/2503/28/efba7041.png?G1cAAMTydJz4c%2f%2biuo06fJsoEpoBizSCSgnr9Z6z9i3y%2fU7Q4jNan74%2f%2fKb16cJcE6FCMEMRAi%2fAgAplIGqyYiwlruE%3d) 上图是运行程序时候的运行结果,线程 `t1` 在 `list` 最后添加值,然后设置 `event` 来通知消费者。消费者通过 `wait()` 阻塞,直到收到信号的时候从 `list` 中取出元素消费。 ![image](https://cdn.res.knowhub.vip/c/2503/28/921a7390.png?G1cAAER17rxgXRmgfice0wSBBJsBizSCSgnr9fz%2f2pfI%2bzlBi%2fdoffr%2b8JvWpwvzqUQSghkJIfAADEorNagaYVUL4xoO) 补充一下 `wait()` 方法: ![image](https://cdn.res.knowhub.vip/c/2503/28/b49afe3e.png?G1YAAMTydJz4%2b0NgG3X4NlEkNAMSWQSVEtbrvXefJvL9RjD6Z%2fax7Hz4Sx%2fLhKkqEYRgQoDzLECEpqzqci0RoaD6PQ0%3d) # 3.基于屏障的线程同步 屏障用于应对固定数量的线程需要彼此相互等待的情况,与之前介绍 `互斥锁Lock/事件Event/定时器Timer` 等不同,**多线程Barrier会设置一个线程障碍数量parties,如果等待的线程数量没有达到障碍数量 parties,所有线程会处于阻塞状态,当等待的线程到达了这个数量就会唤醒所有的等待线程。** 以播放器为例子: ![image](https://cdn.res.knowhub.vip/c/2503/28/49af1ab8.png?G1cAAETn9LwUqGDGvtMdbIlTE20GLNIIKiWs13vO2jfR9wcYmp%2fR%2boz94TetzyCYC7gQGMYFKeBiVoaY1ARRgyuq5zUC) 首先一个线程做播放器初始化工作(加载本地文件或者获取播放地址),然后一个线程获取视频画面,一个线程获取视频声音,只有当初始化工作完毕,视频画面获取完毕,视频声音获取完毕,播放器才会开始播放,其中任意一个线程没有完成,播放器会处于阻塞状态直到三个任务都完成! ```csharp class threading.Barrier(parties, action=None, timeout=None) ``` 创建一个需要 `parties` 个线程的栅栏对象。如果提供了可调用的 `action` 参数,它会在所有线程被释放时在其中一个线程中自动调用。 `timeout` 是默认的超时时间,如果没有在 `wait()` 方法中指定超时时间的话。 使用方法包括: ![image](https://cdn.res.knowhub.vip/c/2503/28/240436f3.png?G1cAAMTydJz4cx7abdTh20SR0AxYpBFUSliv95y1b5HvN4Lqn9H6tP3hN61PE6YrEFEIJkS4wAoomItWF0LNYNICv4YB) 以下为示例代码,创建三个线程:初始化准备、音频准备、视频准备,当且仅当三个初始化完成,才能启动音乐播放。 ```csharp _# 导入线程模块_ import threading def plyer_display(): print('初始化通过完成,音视频同步完成,可以开始播放....') _# 设置3个障碍对象_ barrier = threading.Barrier(3, action=plyer_display, timeout=None) def player_init(statu): print(statu) try: _# 设置超时时间,如果2秒内,没有达到障碍线程数量,_ _# 会进入断开状态,引发BrokenBarrierError错误_ barrier.wait(2) except Exception as e: _# 断开状态,引发BrokenBarrierError错误_ print("等待超时了... ") else: print("xxxooooxxxxxooooxxxoooo") if __name__ == '__main__': statu_list = ["init ready", "video ready", "audio ready"] thread_list = list() _# 创建三个线程:初始化准备、音频准备、视频准备_ _# 当且仅当三个初始化完成,才能启动音乐播放_ for i in range(0, 3): t = threading.Thread(target=player_init, args=(statu_list[i],)) t.start() thread_list.append(t) for t in thread_list: t.join() ``` 以下为代码运行结果,注意:如果 `barrier.wait(timeout=None)` 等待超时,会进入断开状态,引发 `BrokenBarrierError` 错误,为了程序的健壮性,最好加上异常处理。 ![image](https://cdn.res.knowhub.vip/c/2503/28/727b32c3.png?G1YAAER17rxgXWEofice0wSBBJsBiSyCSgnr9fz%2f2pfI%2bznBHO%2fR%2bvT94S%2btTxdaVSIJQUNC8DyBjGRFj6BaSzEq4hoO)