一文速通Python并行计算:06 Python多线程编程-基于队列进行通信

笔记哥 / 04-07 / 28点赞 / 0评论 / 711阅读
# 一文速通 Python 并行计算:06 Python 多线程编程-基于队列进行通信 ![image](https://cdn.res.knowhub.vip/c/2504/07/8c2677bf.png?G1cAAMT0bJxoe1XBNvqh%2f4lHQjNgkUZQKWG93nv3aUTf72Ck%2bMw%2blp8Pv%2bljOcGqgJXAMFaEgAwV46rFguSEUlNmiXs6) # **摘要:** 队列是一种线性数据结构,支持先进先出(FIFO)操作,常用于解耦生产者和消费者。慢速生产-快速消费场景中,队列作为缓冲区平衡速度差异。LifoQueue 是后进先出(LIFO)的栈式队列,适用于撤销操作等场景。PriorityQueue 则按优先级排序,适合任务调度等需要优先处理的场景。这三种队列分别通过 Queue、LifoQueue 和 PriorityQueue 类实现,提供 put()、get()等方法,是并发编程中线程安全的重要工具。 # **正文** # 1.队列的基本概念和应用 队列(queue),是先进先出(FIFO, First-In-First-Out)的线性表,在具体应用中通常用链表或者数组来实现,队列只允许在后端(称为 rear)进行插入操作,在前端(称为 front)进行删除操作,队列的操作方式和堆栈类似,唯一的区别在于队列只允许新数据在后端进行添加。在 Python 中队列可以通过内置模块 `queue` 导入,具体导入方法:`from queue import Queue`,`queue` 模块提供了适合多线程编程的先进先出的数据结构,可以用来在生产者和消费者线程之间安全的传递消息或者数据。 ![image](https://cdn.res.knowhub.vip/c/2504/07/6c9d3e29.png?G1cAAMTW3DgpD0JG22gDdWfqnTUDFmkElRLW6%2fn%2ftS%2bi9wuBaL5H6zP2h9%2b0PoPEnAWFBGJQpIAKWC3F1RI7lHHy4XmNAA%3d%3d) `Queue` 常用的方法有以下四个: - **put()\*\*\*\*:** 往 `queue` 中放一个 `item`。 - \*\*get()\*\*\*\*: \*\*从 `queue` 删除一个 `item`,并返回删除的这个 `item`。 - **task\_done()\*\*\*\*:** 每次 `item` 被处理的时候需要调用这个方法。 - \*\*join()\*\*\*\*: \*\*所有 `item` 都被处理之前一直阻塞。 这里以生产者-消费者模型为例,生产者向队列中发送产品,消费者从队列中接收产品。代码如下: ```csharp from threading import Thread, Event from queue import Queue import time import random class producer(Thread): _# 首先,我们创建一个生产者类。_ _# 由于我们使用队列存放数字,所以不需要用来存放数字的list了。_ def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self) : _# producer 类生产整数,然后通过一个 for 循环将整数放到队列中_ for i in range(10): item = random.randint(0, 256) _# 生产者使用 Queue.put(item [,block[, timeout]]) 来往queue中插入数据。_ _# Queue是同步的,在插入数据之前内部有一个内置的锁机制。_ self.queue.put(item) _# 可能发生两种情况:_ _# (1)如果 block 为 True , timeout 为 None :_ _# 那么可能会阻塞掉,直到出现可用的位置。如果 timeout 是正整数,_ _# 那么阻塞直到这个时间,就会抛出一个异常。_ _# (2)如果 block 为 False:_ _# 如果队列有闲置那么会立即插入,否则就立即抛出异常( timeout 将会被忽略)。_ _# 本例中, put() 检查队列是否已满,然后调用 wait() 开始等待。_ print('Producer notify: item N° %d appended to queue by %s' % (item, self.name)) time.sleep(1) class consumer(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: _# 消费者使用 Queue.get([block[, timeout]]) 从队列中取回数据,_ _# queue内部也会经过锁的处理。如果队列为空,消费者阻塞。_ item = self.queue.get() print('Consumer notify : %d popped from queue by %s' % (item, self.name)) _# 消费者从队列中取出整数然后用 task_done() 方法将其标为任务已处理。_ self.queue.task_done() if __name__ == '__main__': queue = Queue() t1 = producer(queue) t2 = consumer(queue) t3 = consumer(queue) t4 = consumer(queue) t1.start() t2.start() t3.start() t4.start() t1.join() t2.join() t3.join() t4.join() ``` 代码运行如下: ![image](https://cdn.res.knowhub.vip/c/2504/07/2fd07d92.png?G1YAAMTsdJzIJxKq26hD2jvFHc2ARBZBpYT1es9Z%2byb6fheIxme0Pn1%2f%2bEvr00mssiCRQAyK4FEAK6qJr8BiNefEJa7h) 生产者和消费者之间的操作可以用下图来描述: ![image](https://cdn.res.knowhub.vip/c/2504/07/54e976d6.png?G1YAAER17rxgpQ4Hfice0wSBBJsBiSyCSgnr9fz%2f2pfI%2bzlBjfdoffr%2b8JfWpwtzTcQhBDMUwcOAbGpWGM5UlJVkXMMB) Queue 的其他方法包括: ![image](https://cdn.res.knowhub.vip/c/2504/07/da0a5919.png?G1cAAMTydJz4c%2f6LbqMO2kSR0AxYpBFUSliv95y1b5HvbwTVP6P22faH39Q%2bm9BKIKIQNChcQAIsWbxYXKAGaM45%2bTUa) > > > **join()方法与 task\_done()方法** > > `Queue.task_done()` 在完成一项工作之后,`Queue.task_done()` 函数向任务已经完成的队列发送一个信号。 > > `Queue.join()` 实际上意味着等到队列为空,再执行别的操作。 > > 如果线程里每从队列里取一次,但没有执行 `task_done()`,则 `join` 无法判断队列到底有没有结束,在最后执行个 `join()` 是等不到结果的,会一直挂起。可以理解为,每 `task_done` 一次 就从队列里删掉一个元素,这样在最后 `join` 的时候根据队列长度是否为零来判断队列是否结束,从而执行主线程。 > # 2.慢速生产-快速消费 在快速生产-慢速消费的场景中,我们可以直接用 `task_done()` 与 `join()` 配合,来让 `empty()` 判断出队列是否已经结束。 当然,`queue` 我们可以正确判断是否已经清空,但是对于线程里的 `get` 方法,如果没有东西告诉它队列空了,`get` 会继续阻塞,那么我们就需要在 `get` 程序中加一个判断,如果 `empty()` 成立,`break` 退出循环,否则 get()还是会一直阻塞。 但是**如果生产者速度与消费者速度相当,或者生产速度小于消费速度**,则靠 `task_done()` 来实现队列减一则不靠谱,队列会时常处于供不应求的状态,常为 `empty`,所以用 `empty` 来判断则不靠谱。 那么这种情况会导致 `join` 可以判断出队列结束了,但是线程里不能依靠 `empty()` 来判断线程是否可以结束。 我**们可以在消费队列的每个线程最后塞入一个特定的“标记”,在消费的时候判断,如果 get 到了这么一个“标记”,则可以判定队列结束了,因为生产队列都结束了,也不会再新增了。** # 3.先进后出队列 LifoQueue 与上文的 Queue 相反,最后存入的数据最先取出,最先存入的数据最后取出,先进后出队列类似于栈。 示例代码如下: ```csharp import queue import threading _# 可以设置队列的长度,当队列满的时候自动进入阻塞状态_ q = queue.LifoQueue(5) def put(): for i in range(3): q.put(i) print("数据 %d 被存入到队列中" % i) q.join() def get(): for i in range(3): value = q.get() print("数据 %d 从队列中取出" % value) q.task_done() if __name__ == '__main__': t1 = threading.Thread(target=put, args=()) t1.start() t2 = threading.Thread(target=get, args=()) t2.start() ``` 代码输出如下: ![image](https://cdn.res.knowhub.vip/c/2504/07/9e702f61.png?G1cAAMTydJz4c%2fV5uo06fJsoEpoBizSCSgnr9Z6z9i3y%2fU4wx2e0Pn1%2f%2bE3r04VaE3EJQUVGCDBAC2iFIZmqWVVLcQ0H) 从运行结果,我们发现,我们写入数据的顺序是正序的,但是我们取出数据的顺序却是逆序的,这就说明,最新存进去的数据是最后出来的,也就是先进后出队列。 # 4.优先级队列 PriorityQueue `PriorityQueue`(优先级队列),即存入数据时候加入一个优先级,**取数据的时候优先级最高的取出**,在将数据存入到优先队列 `PriorityQueue` 时,**设置的值越小,优先级越高**(注意:使用优先级存数据取数据,队列中的数据必须是同一类型,举个栗子:班级成绩排名/身高排名……)。 示例代码: ```csharp import queue q = queue.PriorityQueue() q.put([1, 'ace']) q.put([40, 333]) q.put([3, 'afd']) q.put([5, '4asdg']) _# 1是级别最高的,_ while not q.empty(): _# 不为空时候执行_ print(q.get()) q = queue.PriorityQueue() q.put('我') q.put('你') q.put('他') q.put('她') q.put('ta') while not q.empty(): print(q.get()) ``` 输出结果: ![image](https://cdn.res.knowhub.vip/c/2504/07/2151b920.png?G1YAAETn9LyUyiDCvtMdbIlTE20GJLIIKiWs13vO2jfR9weDJT%2bj9Rn7w19an0GsXhgXMVghSB4VUINJsVQF7OZqeY0A) 按优先级:不管是数字、字母、列表、元组等(字典、集合没测),使用优先级存数据取数据,队列中的数据必须是同一类型,都是**按照实际数据的 ascii 码表的顺序进行优先级匹配,汉字是按照 unicode 表。** ![image](https://cdn.res.knowhub.vip/c/2504/07/9874d0c9.png?G1YAAMTsdJzIJ0G026hD2jvFHc2ARBZBpYT1es9Z%2byb6fhdIis9offr%2b8JfWp5PkiwVKAslICB4G5MpcqgZTcCkGi2s4)