pythonqueue函数_Python 源码分析:queue 队列模块 !_weixin_39550940的博客-程序员秘密

技术标签: pythonqueue函数  

起步

queue 模块提供适用于多线程编程的先进先出(FIFO)数据结构。因为它是线程安全的,所以多个线程很轻松地使用同一个实例。

源码分析

先从初始化的函数来看:

class Queue:

def __init__(self, maxsize=0):

# 设置队列的最大容量

self.maxsize = maxsize

self._init(maxsize)

# 线程锁,互斥变量

self.mutex = threading.Lock()

# 由锁衍生出三个条件变量

self.not_empty = threading.Condition(self.mutex)

self.not_full = threading.Condition(self.mutex)

self.all_tasks_done = threading.Condition(self.mutex)

self.unfinished_tasks = 0

def _init(self, maxsize):

# 初始化底层数据结构

self.queue = deque()

从这初始化函数能得到哪些信息呢?首先,队列是可以设置其容量大小的,并且具体的底层存放元素的它使用了 collections.deque() 双端列表的数据结构,这使得能很方便的做先进先出操作。这里还特地抽象为_init 函数是为了方便其子类进行覆盖,允许子类使用其他结构来存放元素(比如优先队列使用了 list)。

然后就是线程锁 self.mutex ,对于底层数据结构 self.queue 的操作都要先获得这把锁;再往下是三个条件变量,这三个 Condition 都以 self.mutex 作为参数,也就是说它们共用一把锁;从这可以知道诸如with self.mutex与 with self.not_empty 等都是互斥的。

基于这些锁而做的一些简单的操作:

class Queue:

...

def qsize(self):

# 返回队列中的元素数

with self.mutex:

return self._qsize()

def empty(self):

# 队列是否为空

with self.mutex:

return not self._qsize()

def full(self):

# 队列是否已满

with self.mutex:

return 0 < self.maxsize <= self._qsize()

def _qsize(self):

return len(self.queue)

这个代码片段挺好理解的,无需分析。

作为队列,主要得完成入队与出队的操作,首先是入队:

class Queue:

...

def put(self, item, block=True, timeout=None):

with self.not_full: # 获取条件变量not_full

if self.maxsize > 0:

if not block:

if self._qsize() >= self.maxsize:

raise Full # 如果 block 是 False,并且队列已满,那么抛出 Full 异常

elif timeout is None:

while self._qsize() >= self.maxsize:

self.not_full.wait() # 阻塞直到由剩余空间

elif timeout < 0: # 不合格的参数值,抛出ValueError

raise ValueError("'timeout' must be a non-negative number")

else:

endtime = time() + timeout # 计算等待的结束时间

while self._qsize() >= self.maxsize:

remaining = endtime - time()

if remaining <= 0.0:

raise Full # 等待期间一直没空间,抛出 Full 异常

self.not_full.wait(remaining)

self._put(item) # 往底层数据结构中加入一个元素

self.unfinished_tasks += 1

self.not_empty.notify()

def _put(self, item):

self.queue.append(item)

尽管只有二十几行的代码,但这里的逻辑还是比较复杂的。它要处理超时与队列剩余空间不足的情况,具体几种情况如下:

1、如果 block 是 False,忽略timeout参数若此时队列已满,则抛出 Full 异常;

若此时队列未满,则立即把元素保存到底层数据结构中;

2、如果 block 是 True若 timeout 是 None 时,那么put操作可能会阻塞,直到队列中有空闲的空间(默认);

若 timeout 是非负数,则会阻塞相应时间直到队列中有剩余空间,在这个期间,如果队列中一直没有空间,抛出 Full 异常;

处理好参数逻辑后,,将元素保存到底层数据结构中,并递增unfinished_tasks,同时通知 not_empty ,唤醒在其中等待数据的线程。

出队操作:

class Queue:

...

def get(self, block=True, timeout=None):

with self.not_empty:

if not block:

if not self._qsize():

raise Empty

elif timeout is None:

while not self._qsize():

self.not_empty.wait()

elif timeout < 0:

raise ValueError("'timeout' must be a non-negative number")

else:

endtime = time() + timeout

while not self._qsize():

remaining = endtime - time()

if remaining <= 0.0:

raise Empty

self.not_empty.wait(remaining)

item = self._get()

self.not_full.notify()

return item

def _get(self):

return self.queue.popleft()

get() 操作是 put() 相反的操作,代码块也及其相似,get() 是从队列中移除最先插入的元素并将其返回。

1、如果 block 是 False,忽略timeout参数若此时队列没有元素,则抛出 Empty 异常;

若此时队列由元素,则立即把元素保存到底层数据结构中;

2、如果 block 是 True若 timeout 是 None 时,那么get操作可能会阻塞,直到队列中有元素(默认);

若 timeout 是非负数,则会阻塞相应时间直到队列中有元素,在这个期间,如果队列中一直没有元素,则抛出 Empty 异常;

最后,通过 self.queue.popleft() 将最早放入队列的元素移除,并通知 not_full ,唤醒在其中等待数据的线程。

这里有个值得注意的地方,在 put() 操作中递增了 self.unfinished_tasks ,而 get() 中却没有递减,这是为什么?

这其实是为了留给用户一个消费元素的时间,get() 仅仅是获取元素,并不代表消费者线程处理的该元素,用户需要调用 task_done() 来通知队列该任务处理完成了:

class Queue:

...

def task_done(self):

with self.all_tasks_done:

unfinished = self.unfinished_tasks - 1

if unfinished <= 0:

if unfinished < 0: # 也就是成功调用put()的次数小于调用task_done()的次数时,会抛出异常

raise ValueError('task_done() called too many times')

self.all_tasks_done.notify_all() # 当unfinished为0时,会通知all_tasks_done

self.unfinished_tasks = unfinished

def join(self):

with self.all_tasks_done:

while self.unfinished_tasks: # 如果有未完成的任务,将调用wait()方法等待

self.all_tasks_done.wait()

由于 task_done()使用方调用的,当 task_done() 次数大于 put() 次数时会抛出异常。

task_done() 操作的作用是唤醒正在阻塞的 join() 操作。join() 方法会一直阻塞,直到队列中所有的元素都被取出,并被处理了(和线程的join方法类似)。也就是说 join()方法必须配合task_done() 来使用才行。

LIFO 后进先出队列

LifoQueue使用后进先出顺序,与栈结构相似:

class LifoQueue(Queue):

'''Variant of Queue that retrieves most recently added entries first.'''

def _init(self, maxsize):

self.queue = []

def _qsize(self):

return len(self.queue)

def _put(self, item):

self.queue.append(item)

def _get(self):

return self.queue.pop()

这就是 LifoQueue 全部代码了,这正是 Queue 设计很棒的一个原因,它将底层的数据操作抽象成四个操作函数,本身来处理线程安全的问题,使得其子类只需关注底层的操作。

LifoQueue 底层数据结构改用 list 来存放,通过 self.queue.pop() 就能将 list 中最后一个元素移除,无需重置索引。

PriorityQueue 优先队列

from heapq import heappush, heappop

class PriorityQueue(Queue):

'''Variant of Queue that retrieves open entries in priority order (lowest first).

Entries are typically tuples of the form: (priority number, data).

'''

def _init(self, maxsize):

self.queue = []

def _qsize(self):

return len(self.queue)

def _put(self, item):

heappush(self.queue, item)

def _get(self):

return heappop(self.queue)

优先队列使用了 heapq 模块的结构,也就是最小堆的结构。优先队列更为常用,队列中项目的处理顺序需要基于这些项目的特征,一个简单的例子:

import queue

class A:

def __init__(self, priority, value):

self.priority = priority

self.value = value

def __lt__(self, other):

return self.priority < other.priority

q = queue.PriorityQueue()

q.put(A(1, 'a'))

q.put(A(0, 'b'))

q.put(A(1, 'c'))

print(q.get().value) # 'b'

使用优先队列的时候,需要定义 __lt__ 魔术方法,来定义它们之间如何比较大小。若元素的 priority 相同,依然使用先进先出的顺序。

参考

https://pymotw.com/3/queue/index.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_39550940/article/details/110901094

智能推荐

mkdir() 与 mkdirs() 的区别_HD243608836的博客-程序员秘密

官方解释:boolean mkdir() 创建此抽象路径名指定的目录。boolean mkdirs() 创建此抽象路径名指定的目录,包括所有必需但不存在的父目录。我通过Demo通俗的解释一下:String path ="E:\\data\\a\\b\\c";Boolean result = new File(path).mkdir();System.out.println(...

python--__str__ 和__repr__的用法_ZongXin.Zhou的博客-程序员秘密

如:__init__和__str__还有 __repr__凡是以双下划线开头的。在满足某个条件的时候会自动调用,这个满足的条件我们可以成为调用时机具体如下列示例:class Dog(object): def __init__(self, name, age): self.name = name self.age = age # __str__的目标是可读性,或者说,__str__的结果是让人看的 def __str__(self): # 没有

QT GUI开发(一):保姆级VS2015配置QT开发环境_mozun2020的博客-程序员秘密_vs2015配置qt环境

QT GUI开发(一):保姆级VS2015配置QT开发环境前言一. QT环境搭建1.1 QT安装1.2 VS中安装工具库二. QT简单工程示例三. 小结前言做软件开发,特别是用户图形界面交互方向,QT的应用越来越广泛了,因其可移植性,以及强大的配置库,大有替代VS的MFC的趋势,笔者2011年上大学的时候,一般开发GUI界面工程都还是基于MFC的框架,到了2015年开始研究生学习的时候,考虑到移植到嵌入式平台的应用需求,已经开始应用QT作为自己的毕设GUI界面开发平台了。正好现在自己的工作也主要与Q

intellij idea 插件 ideaVim 用法_weixin_33883178的博客-程序员秘密

intellij idea 插件 ideaVim - Genji_ - 博客园http://www.cnblogs.com/nova-/p/3535636.html IdeaVim插件使用技巧 - - ITeye技术网站http://kidneyball.iteye.com/blog/1828427 Ctrl+Alt+V  --打开或关闭Idea Vim 当打开idea vim后,当前编...

【操作系统】CPU寄存器详解_公子无缘的博客-程序员秘密_cpu 寄存器

寄存器是 CPU 内部用来存放数据的一些小型存储区域,用来暂时存放参与运算的数据和运算结果以及一些 CPU 运行需要的信息。本文将归纳下面几中寄存器:目录一 通用寄存器二 标志寄存器三指令寄存器四 段寄存器五 控制寄存器六 调试寄存器七 描述符寄存器八 任务寄存器九 MSR寄存器一 通用寄存器 最常用的,也是最基础的有8个通用寄存器(注意一般看到的EAX、ECX也是指的这类寄存器再32位CPU上的拓展,另...

随便推点

单元测试小记_markix的博客-程序员秘密

工具Junit:测试框架Mockito:模拟框架(模拟数据、模拟方法…)https://site.mockito.orgPowerMock:更强大的模拟框架(支持模拟静态方法、私有方法…)https://github.com/powermock/powermockJaCoCo:代码覆盖率统计工具https://www.jacoco.org/jacoco/index.html使用使用Mockito比如要测试Service类,则需要将service类中的dao给mock掉。@Inje

Ribbon 学习(二):Spring Cloud Ribbon 加载配置原理_布碗的博客-程序员秘密

说明在上一篇博文《Ribbon 学习(一):脱离 Eureka 使用 Ribbon》一文中,我简单介绍了在脱离 Eureka 时 Ribbon 的简单使用方式。在本篇博文中,我将继续通过源码来介绍 Spring Cloud Ribbon 的加载配置原理,了解 Ribbon Client 如何创建,以及 RequestTemplate 如何具有负载均衡的功能特性。在正文开始前,我们先回忆下在上篇博文中是如何使用 Ribbon 的。首先使用 @LoadBalanced 注解标注创建了 ResetTempla

VS2019打开项目,出现“无法找到 .NET Core SDK。请检查确保已安装此项且 global.json 中指定的版本(如有)与所安装的版本相匹配“的错误_搬运工甲的博客-程序员秘密

今天从GitHub上拉下来一个工程,正想要学习的时候,打开工程结果出现了 “无法找到 .NET Core SDK。请检查确保已安装此项且 global.json 中指定的版本(如有)与所安装的版本相匹配” 的报错,在经过一系列百度之后,得到最简单解决方案:使用cmd命令 dotnet --info 查看自己使用的SDK版本,然后直接找到项目中的 global.json 文件,右键打开,直接修改版...

Realm数据库使用教程(五):删除数据_晓果博客的博客-程序员秘密_realm删除

Realm数据库使用教程(四):更新数据删除数据同步删除(一):先查找到数据:deleteFromRealm(int index)删除指定数据final RealmResults<Student> students = mRealm.where(Student.class).findAll(); mRealm.executeTransaction(new Real

C#字典Dictionary在unity中使用案例_苍狼王unity学院的博客-程序员秘密

C#字典在unity中使用案例1、前言:讲起C#Dictionary,许多人闻之色变,不了解,不清楚,即使知道,了解,也不一定会用,鉴于此,本人特地总结了一个使用字典的案例。2、什么是字典。必须包含名空间System.Collection.GenericDictionary里面的每一个元素都是一个键值对(由二个元素组成:键和值)键必须是唯一的,而值不需要唯一的键和值都可以是任何类型(...

vite项目在jenkins自动打包报错:failed to load config from ../vite.config.js You installed esbuild on_优秀前端人的博客-程序员秘密

vite项目在jenkins自动打包报错找不到esbuild-linux-64在window环境开发用的找不到esbuild-windows-64,在linux环境构建需要使用esbuild-linux-64,找不到esbuild-linux-64就会报错实际报错:error during build:11:21:11 Error: 11:21:11 You installed esbuild on another platform than the one you're currently us