【笔记】Effective Python《编写高质量 Python 代码的 90 个有效方法》——第 52 ~ 74 条(并发、稳定性)读书笔记

(书基于 Python 3.7+ 语法规范)

知识点概括:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
subprocess模块
threading模块
Queue类
fan-in/fan-out模式
ThreadPoolExecutor类
协程(coroutine)
asyncio库
multiprocessing模块
contextlib和with语句
datetime模块
pickle模块
copyreg模块
Decimal类
profile模块
timeit模块
bisect模块
heapq模块
memoryview类型/bytearray类型

7.并发与并行

并发(concurrency)指计算机似乎能在同一时刻做许多件不同的事情。

并行(parallelism)与并发的区别在于,它强调计算机确实能够在同一时刻做许多件不同的事情。

并行与并发之间的区别,关键在于能不能提速(speedup)。如果程序把总任务量分给两条独立的执行路径去同时处理,而且这样做确实能让总时间下降到原来的一半,那么这就是并行,此时的总速度是原来的两倍。反过来说,假如无法实现加速,那即便程序里有一千条独立的执行路径,也只能叫作并发,因为这些路径虽然看起来是在同时推进,但实际上却没有产生相应的提速效果。

Python让我们很容易就能写出各种风格的并发程序。在并发量较小的场合可以使用线程(thread),如果要运行大量的并发函数,那么可以使用协程(coroutine)。并行任务,可以通过系统调用、子进程与C语言扩展(C extension)来实现,但要写出真正能够并行的Python代码,其实是很困难的。

第52条、用subprocess管理子进程

Python有相当成熟的函数库可以运行并管理子进程,这让我们能够通过Python语言把其他一些工具(例如命令行工具)很好地拼接起来。Shell脚本经常容易越写越复杂,所以我们有时干脆就考虑改用Python来实现,这样更容易理解,也更容易维护。

由Python所启动的子进程可以平行地运行,这让我们能够充分利用计算机的每一个CPU核心,来尽量提升程序的处理效率。

Python里面有许多方式都可以运行子进程(例如os.popen函数以及os.exec*系列的函数),其中最好的办法是通过内置的subprocess模块来管理。

1
2
3
4
5
6
7
8
9
10
import subprocess

result = subprocess.run(
['echo', 'Hello form the child'],
capture_output=True,
encoding='utf-8'
)

result.check_returncode()
print(result.stdout)

通过run函数启动一条进程,然后确认该进程已经正常终止,最后打印它的输出值。

(这一条里面的范例代码需要用到操作系统之中的echo、sleep与openssl命令,所以系统里面应该提前准备好这些命令。)

子进程可以独立于父进程而运行,这里的父进程指Python解释器所在的那条进程。假如刚才那条子进程不是通过run函数启动,而是由Popen类启动的,那么我们就可以在它启动之后,让Python程序去做别的任务,每做一段时间就来查询一次子进程的状态以决定要不要继续执行任务。

1
2
3
4
5
proc = subprocess.Popen(['sleep', '1'])
while proc.poll() is None:
print('Working...')
# ...
print('Exit status', proc.poll())

把子进程从父进程中剥离,可以让程序平行地运行多条子进程。例如,我们可以像下面这样,先把需要运行的这些子进程用Popen启动起来。

1
2
3
4
5
6
7
import time

start = time.time()
sleep_procs = []
for _ in range(10):
proc = subprocess.Popen(['sleep', '1'])
sleep_procs.append(proc)

然后,在主进程里调用每条子进程的communicate方法,等待这条子进程把它的I/O工作处理完毕。

1
2
3
4
5
6
for proc in sleep_procs:
proc.communicate()

end = time.time()
delta = end - start
print(F'Finished in {delta: .3} seconds')

从统计结果可以看出,这10条子进程确实表现出了平行的效果。

我们还可以在Python程序里面把数据通过管道发送给子进程所运行的外部命令,然后将那条命令的输出结果获取到Python程序之中。而且,在执行外部命令的这个环节中,可以平行地运行多条命令。例如,要用oepnssl这样的命令行工具来加密数据。首先以适当的命令行参数构建一批子进程,并配置好相应的I/O管道,这在Python里很容易就能做到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import os

def run_encrypt(data):
env = os.environ.copy()
env['password'] = 'zfsgjjj'
proc = subprocess.Popen(
['openssl', 'enc', '-des3', '-pass', 'env:password'],
env=env,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
proc.stdin.write(data)
proc.stdin.flush()
return proc

调用:

1
2
3
4
5
procs = []
for _ in range(3):
data = os.urandom(10)
proc = run_encrypt(data)
procs.append(proc)

这些子进程能够平行地运行,并各自处理它所接到的那份数据。现在逐个等待每一条子进程完工,然后获取并打印这条子进程的加密结果。

1
2
3
for proc in procs:
out, _ = proc.communicate()
print(out[-10:])

这些平行运行的子进程还可以分别与另一套平行的子进程对接,形成许多条平行的管道(pipe)。这种管道与UNIX管道类似,能够把一条子进程的输出端同另一条子进程的输入端连接起来。

下面,我们写这样一个函数,让它开启一条子进程来运行openssl命令,这条命令会根据输入端所发来的数据在输出端生成Whirlpool哈希。

1
2
3
4
5
6
def run_hash(input_stdin):
return subprocess.Popen(
['openssl', 'dgst', '-whirlpool', '-binary'],
stdin=input_stdin,
stdout=subprocess.PIPE
)

调用communicate方法时可以指定timeout参数,让我们有机会把陷入死锁或已经卡住的子进程关掉。

第53条、可以用线程执行阻塞式I/O,但不要用它做并行计算

Python语言的标准实现叫作CPython,它分两步来运行Python程序。首先解析源代码文本,并将其编译成字节码(bytecode)。字节码是一种底层代码,可以把程序表示成8位的指令(从Python 3.6开始,这种底层代码实际上已经变成16位了,所以应该叫作wordcode才对,但基本原理依然相同)。然后,CPython采用基于栈的解释器来运行字节码。这种字节码解释器在执行Python程序的过程中,必须确保相关的状态不受干扰,所以CPython会用一种叫作全局解释器锁(global interpreter lock,GIL)的机制来保证这一点。

GIL实际上就是一种互斥锁(mutual-exclusion lock,mutex),用来防止CPython的状态在抢占式的多线程环境(preemptive multithreading)之中受到干扰,因为在这种环境下,一条线程有可能突然打断另一条线程抢占程序的控制权。如果这种抢占行为来得不是时候,那么解释器的状态(例如为垃圾回收工作而设立的引用计数等)就会遭到破坏。所以,CPython要通过GIL阻止这样的动作,以确保它自身以及它的那些C扩展模块能够正确地执行每一条字节码指令。

但是,GIL会产生一个很不好的影响。在C++与Java这样的语言里面,如果程序之中有多个线程能够分头执行任务,那么就可以把CPU的各个核心充分地利用起来。尽管Python也支持多线程,但这些线程受GIL约束,所以每次或许只能有一条线程向前推进,而无法实现多头并进。所以,想通过多线程做并行计算或是给程序提速的开发者,恐怕要失望了。

多线程的程序在标准的CPython解释器之中会受GIL牵制(例如CPython要通过GIL防止这些线程争抢全局锁,而且要花一些时间来协调)。

若要CPython把多个核心充分利用起来,还是有一些办法的,但那些办法都不采用标准的Thread类,而且实现起来也需要大量的精力。既然有这么多限制,那Python还支持多线程干什么?这其实有两个原因。

首先,这种机制让我们很容易就能实现出一种效果,也就是令人感觉程序似乎能在同一时间做许多件事。这样的效果采用手工方式很难编写而通过线程来实现,则可以让Python自动替我们把这些问题处理好,让多项任务能够并发地执行。由于GIL机制,虽然每次还是只能有一个线程向前执行,但CPython会确保这些Python线程之间能够公平地轮换执行。

其次,我们可以通过Python的多线程机制处理阻塞式的I/O任务,因为线程在执行某些系统调用的过程中会发生阻塞,假如只支持一条线程,那么整个程序就会卡在这里不动。Python程序需要通过系统调用与外部环境交互,其中有一些调用属于阻塞式的I/O操作,例如读取文件、写入文件、联网以及与显示器等设备交互。多线程机制可以让程序中的其他线程继续执行各自的工作,只有发起调用请求的那条线程才需要卡在那里等待操作系统给出结果。

GIL只不过是让Python内部的代码无法平行推进而已,至于系统调用,则不会受到影响,因为Python线程在即将执行系统调用时,会释放GIL,待完成调用之后,才会重新获取它。

除了线程,还有很多办法也能处理阻塞式的I/O(例如采用内置的asyncio模块等)。那些办法都很好,但你可能得花时间去重构代码适应它们所要求的执行模式。与那些办法相比,用多线程处理阻塞式I/O是最简单的,而且只需要稍微调整代码就行。

第54条、利用Lock防止多个线程争用同一份数据

了解到全局解释器锁(GIL)的效果之后,许多Python新手可能觉得没必要继续在代码里使用互斥锁(mutual-exclusion lock,mutex)了。既然GIL让Python线程没办法平行地运行在多个CPU核心上,那是不是就意味着它同时还会自动保护程序里面的数据结构,让我们不需要再加锁了?

其实并非如此。GIL起不到这样的保护作用。虽说同一时刻只能有一条Python线程在运行,但这条线程所操纵的数据结构还是有可能遭到破坏,因为它在执行完当前这条字节码指令之后,可能会被Python系统切换走,等它稍后切换回来继续执行下一条字节码指令时,当前的数据或许已经与实际值脱节了,因为中途切换进来的其他线程可能更新过这个值。所以,多个线程同时访问同一个对象是很危险的。每条线程在操作这份数据时,都有可能遭到其他线程打扰,因此数据之中的固定关系或许已经被别的线程破坏了,这会令程序陷入混乱状态。

例如,要编写一个程序,让它平行地采集数据。如果要采集传感器网络中的每个传感器所给出的亮度,那么就需要用到这种程序。我们首先需要定义下面这样一个新类,用来记录采集到的样本总数。

1
2
3
4
5
6
class Counter:
def __init__(self):
self.count = 0

def increment(self, offset):
self.count += offset

然后,假设获取传感器读数的操作是一种阻塞式的I/O操作,这样的话,我们就需要针对每个传感器都开启一条工作线程专门读取它所负责的这个传感器。每采集到一份样本,线程就会给表示样本总数的那个量加1,直到采集完应采集样本为止。

1
2
3
4
def worker(sensor_index, how_many, counter):
for _ in range(how_many):
# ...
counter.increment(1)

现在,给每个传感器建立各自的工作线程,让这些线程平行地采样,最后等待所有线程完成各自采样工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from threading import Thread

how_many = 10 ** 5
counter = Counter()

threads = []
for i in range(5):
thread = Thread(target=worker,args=(i, how_many, counter))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

expected = how_many * 5
found = counter.count
print(f'Counter should be {expected}, got {found}')

这个程序似乎相当简单,所以运行结果应该是500000才对。但实际上却差得很远,这是为什么?其实,Python解释器需要保证这些线程可以公平地获得执行机会,或者说,保证每条线程所分配到的执行时间大致相等。为了实现这种效果,它会及时暂停某条线程,并且把另一条线程切换过来执行。然而问题是,我们并不清楚它具体会在什么时候暂停线程,万一这条线程正在执行的是一项本来不应该中断的原子操作(atomic operation),那会如何呢?上面的例子遇到的正是这种情况。Counter对象的increment方法看上去很简单,工作线程在调用这个方法时,相当于是在执行下面这样一条语句:

1
counter.count += 1

然而,在对象的属性上面执行+=操作,实际上需要分成三个小的步骤。也就是说,Python系统会这样看待这次操作:

1
2
3
value = getattr(counter, 'count')
result = value + 1
setattr(counter, 'count', result)

这三个步骤本来应该一次执行完才对,但是Python系统有可能在任意两步之间,把当前这条线程切换走,这就导致这条线程在切换回来后,看到的是个已经过时的value值,它把这个过时的值通过setattr赋给Counter对象的count属性,从而使统计出来的样本总数偏小。

线程A在执行了第一步之后,还没来得及执行第二步,就被线程B打断了。等到线程B把它的三个步骤执行完毕后,线程A才重新获得执行机会。这时,它并不知道count已经被线程B更新过了,它仍然以为自己在第一步里读取到的那个value_a是正确的,于是线程A就给value_a加1并将结果(也就是result_a)赋给count属性。这实际上把线程B刚刚执行的那一次递增操作覆盖掉了。上面的传感器采样总数之所以出错,也正是这个原因所致。

除了这个例子,其他形式的数据结构也会遇到类似问题。为了避免数据争用,Python在内置的threading模块里提供了一套健壮的工具。其中最简单也最有用的是一个叫作Lock的类,它相当于互斥锁(mutex)。

通过这样的锁,我们可以确保多条线程有秩序地访问Counter类的count属性,使得该属性不会遭到破坏,因为线程必须先获取到这把锁,然后才能操纵count,而每次最多只能有一条线程获得该锁。

1
2
3
4
5
6
7
8
9
10
from threading import Lock

class LockingCounter:
def __init__(self):
self.lock = Lock()
self.count = 0

def increment(self, offset):
with self.lock:
self.count += offset

现在,就可以确保这些工作线程能够正确地递增count属性了。只不过这次的Counter对象要改用刚才写的LockingCounter类来制作。

1
2
3
4
5
6
7
8
9
10
11
12
13
counter = LockingCounter()

for i in range(5):
thread Thread(target=worker, args=(i, how_many, counter))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

expected = how_many * 5
found = counter.count
print(f'Counter should be {expected}, got {found}')

这样的结果才正是我们想要看到的。这说明Lock确实能够解决数据争用问题。

第55条、用Queue来协调各线程之间的工作进度

Python程序如果要同时执行多项任务,而这些任务又分别针对同一种产品的不同环节,那么就有可能得在它们之间进行协调。比较有用的一种协调方式是把函数拼接成管道。

这样的管道与生产线比较像。它可以按先后顺序划分成几个阶段,每个阶段都由相应的函数负责。程序会把未经加工的原料放在生产线(也就是管道)的起点,而那些函数,则分别关注着自己所负责的这一段,只要有产品来到当前环节,它就对这件产品做出相应的加工处理。如果所有函数都不会再收到有待加工的产品,那么整条生产线就可以关停。这套方案,很适合应对与阻塞式I/O或子进程有关的需求,因为我们很容易就能在Python程序里,平行地开启多个线程来分别负责生产线中的某个环节。

例如,要构建这样一套系统,让它持续从数码相机里获取照片,然后调整照片尺寸,最后把调整好的照片添加到网络相册之中。

假如这三个环节所对应的download、resize与upload函数,现在都已经写好了,那么应该如何拼接成一条管道呢?

1
2
3
4
5
6
7
8
def download(item):
#...

def resize(item):
#...

def upload(item):
#...

首先,必须想办法表示每个环节所要加工的产品,并让加工好的产品能够为下一个环节所获取。这可以用线程安全的生产-消费队列(producer-consumer queue,也叫生产者-消费队列)来实现。

1
2
3
4
5
6
7
from collections import deque
from threading import Lock

class MyQueue:
def __init__(self):
self.items = deque()
self.lock = Lock()

首先定义put方法,让生产者(也就是数码相机)可以通过这个方法把新图像添加到deque的尾部:

1
2
3
def put(self, item):
with self.lock:
self.items.append(item)

然后定义下面这个方法。第一阶段的消费者,也就是需要下载照片的那个函数,可以通过这个方法从deque的前端(即左侧)获取元素。

1
2
3
def get(self):
with self.lock:
return self.items.popleft()

我们把管道的每个阶段都表示成一条Python线程,它会从刚才那样的队列中取出有待处理的产品,并交给对应的函数去处理,接着再把处理结果放到下一个队列之中。另外,我们再添加两个字段,分别记录这条线程向上游队列查询产品的次数以及完成加工的次数。

1
2
3
4
5
6
7
8
9
10
11
from threading import Thread
import time

class Worker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
self.polled_count = 0
self.work_done = 0

下面我们试着通过捕捉IndexError来处理这种上游发生延迟的情况。

1
2
3
4
5
6
7
8
9
10
11
def run(self):
while True:
self.polled_count += 1
try:
item = self.in_queue.get()
except IndexError:
time.sleep(0.01)
else:
result = self.func(item)
self.out_queue.put(result)
self.work_done += 1

现在,创建四个队列,并在它们之间安排三条工作线程,让每条线程都从上游队列里面获取元素,并把加工过的元素放到下游队列之中。

1
2
3
4
5
6
7
8
9
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
Worker(download, download_queue, resize_queue),
Worker(resize, resize_queue, upload_queue),
Worker(upload, upload_queue, done_queue)
]

这种实现方式的问题远不止这一个,还有另外三个问题也必须重视。第一,为了判断全部产品是否加工完毕,必须像Worker线程里的run方法那样,反复查询最后那个队列,以确认里面的元素个数是否已经变得与刚开始的原料总数相同。第二,目前这种方案会使run方法陷入无限循环,我们没办法明确通知线程何时应该退出。第三,如果下游环节的处理速度过慢,那么程序随时都有可能崩溃,这是最严重的问题。例如,如果第一个环节处理得很快,而第二个环节处理得比较慢,那么连接这两个环节的那个队列就会迅速膨胀,因为它里面堆积了大量的产品等着第二个环节来加工,可是第二个环节又跟不上节奏。时间久了,数据会越积越多,导致程序因为耗尽内存而崩溃。

总之,这种需求不适合用管道来实现,因为很难构建出良好的生产-消费队列。

改用Queue来实现:内置的queue模块里有个Queue类

改用Queue之后,就不用再频繁查询是否有新产品要加工了,因为它的get方法会一直阻塞在那里,直至有新数据返回为止。

1
2
3
4
5
6
7
8
9
10
11
from queue import Queue

my_queue = Queue()

def consumer():
print('Consumer waiting')
my_queue.get()
print('Consumer done')

thread = Thread(target=consumer)
thread.start()

即便这个线程先启动,也没有关系,因为只有当生产线程通过Queue实例的put方法给队列里面填入新数据之后,刚才那个get方法才有数据可以返回。

1
2
3
4
print('Producer putting')
my_queue.put(object())
print('Producer done')
thread.join()

为了解决因下游环节速度过慢而造成的管道拥堵问题,我们可以限定Queue最多只能堆积多少个元素。如果通过put方法给已经填满的队列添加新元素,那么这个方法就会阻塞,直到队列里有空位为止。下面我们创建最多只能保存一个元素的队列,并且定义这样一条消费线程,让它先等待一段时间,然后再从队列中获取元素,这样就促使生产线程没办法立刻给队列中添加新元素。

1
2
3
4
5
6
7
8
9
10
11
12
my_queue = Queue(1)

def consumer():
time.sleep(0.1)
my_queue.get()
print('Consumer got 1')
my_queue.get()
print('Consumer got 2')
print('Consumer done')

thread = Thread(target=consumer)
thread.start()

编写time.sleep(0.1)这条语句,是想故意让消费线程慢下来,以免生产线程在第二次调用put方法时,它第一次放进去的那个元素还没来得及被消费线程提取走。Queue的容量为1,所以生产线程会阻塞在第二个put方法这里,它必须等消费线程通过get方法把队列中的那个元素取走,才能继续往里面添加新元素。

管道非常适合用来安排多阶段的任务,让我们能够把每一阶段都交给各自的线程去执行,这尤其适合用在I/O密集型的程序里面。

构造这种并发的管道时,有很多问题需要注意,例如怎样防止线程频繁地查询队列状态,怎样通知线程尽快结束操作,以及怎样防止管道出现拥堵等。

我们可以利用Queue类所具有的功能来构造健壮的管道系统,因为这个类提供了阻塞式的入队(put)与出队(get)操作,而且可以限定缓冲区的大小,还能够通过task_done与join来确保所有元素都已处理完毕。

第56条、学会判断什么场合必须做并发

程序范围变大、需求变复杂之后,经常要用多条路径平行地处理任务。

fan-out与fan-in是最常见的两种并发协调(concurrency coordination)模式,前者用来生成一批新的并发单元,后者用来等待现有的并发单元全部完工。

Python提供了很多种实现fan-out与fan-in的方案。

第57条、不要在每次fan-out时都新建一批Thread实例

想在Python里平行地做I/O,首先要考虑的工具当然是线程。但如果真用线程来表示fan-out模式中的执行路径,你就会发现,这样其实有很多问题。

每次都手工创建一批线程,是有很多缺点的,例如:创建并运行大量线程时的开销比较大,每条线程的内存占用量比较多,而且还必须采用Lock等机制来协调这些线程。

线程本身并不会把执行过程中遇到的异常抛给启动线程或者等待该线程完工的那个人,所以这种异常很难调试。

第58条、学会正确地重构代码,以便用Queue做并发

每次都手工创建一批线程并平行地执行I/O任务是有很多缺点的。另一种方案,也就是用内置的queue模块里的Queue类实现多线程管道

这种方案的总思路是:在推进生命游戏时,不像原来那样,每推进一代,就新建一批线程来推进相应的单元格,而是可以提前创建数量固定的一组工作线程,令这组线程平行地处理当前这批I/O任务,并在处理完之后,继续等待下一批任务,这样就不会消耗那么多资源了,程序也不会再因为频繁新建线程而耽误那么多时间。

把队列(Queue)与一定数量的工作线程搭配起来,可以高效地实现fan-out(分派)与fan-in(归集)。

为了改用队列方案来处理I/O,我们重构了许多代码,如果管道要分成好几个环节,那么要修改的地方会更多。

利用队列并行地处理I/O任务,其处理I/O任务量有限,我们可以考虑用Python内置的某些功能与模块打造更好的方案。

第59条、如果必须用线程做并发,那就考虑通过ThreadPoolExecutor实现

Python有个内置模块叫作concurrent.futures,它提供了ThreadPoolExecutor类。这个类结合了线程(Thread)方案与队列(Queue)方案的优势,可以用来平行地处理生命游戏里的那种I/O操作。

利用ThreadPoolExecutor,我们只需要稍微调整一下代码,就能够并行地执行简单的I/O操作,这种方案省去了每次fan-out(分派)任务时启动线程的那些开销。

1
2
3
4
5
6
7
def game_logic(state, neighbors):
# ...
raise OSError('Problem with I/O')
# ...
with ThreadPoolExecutor(max_workers=10) as pool:
task = pool.submit(game_logic, ALIVE, 3)
task.result()

ThreadPoolExecutor方案仍然有个很大的缺点,就是I/O并行能力不高,即便把max_workers设成100,也无法高效地应对那种有一万多个单元格,且每个单元格都要同时做I/O的情况。如果你面对的需求,没办法用异步方案解决,而是必须执行完才能往后走(例如文件I/O),那么ThreadPoolExecutor是个不错的选择。然而在许多情况下,其实还有并行能力更强的办法可以考虑。

第60条、用协程实现高并发的I/O

如果同时需要执行的I/O任务有成千上万个,那么之前这些方案的效率就不太理想了。

像这种在并发方面要求比较高的I/O需求,可以用Python的协程(coroutine)来解决。协程能够制造出一种效果,让我们觉得Python程序好像真的可以同时执行大量任务。这种效果需要使用async与await关键字来实现,它的基本原理与生成器(generator)类似,也就是不立刻给出所有的结果,而是等需要用到的时候再一项一项地获取。

启动协程是有代价的,就是必须做一次函数调用。协程激活之后,只占用不到1KB内存,所以只要内存足够,协程稍微多一些也没关系。与线程类似,协程所要执行的任务也是用一个函数来表示的,在执行这个函数的过程中,协程可以从执行环境里面获取输入值,并把输出结果放到这个执行环境之中。协程与线程的区别在于,它不会把这个函数从头到尾执行完,而是每遇到一个await表达式,就暂停一次,下次继续执行的时候,它会先等待await所针对的那项awaitable操作有了结果(那项操作是用async函数表示的),然后再推进到下一个await表达式那里(这跟生成器函数的运作方式有点像,那种函数也是一遇到yield就暂停)。

Python系统可以让数量极多的async函数各自向前推进,看起来像很多条Python线程那样,能够并发地运行。然而,这些协程并不会像线程那样占用大量内存,启动和切换的开销也比较小,而且不需要用复杂的代码来实现加锁或同步。这种强大的机制是通过事件循环(event loop)打造的,只要把相关的函数写对,这种循环就可以穿插着执行许多个这样的函数,并且执行得相当快,从而高效地完成并发式的I/O任务。

1
2
3
4
5
6
7
8
9
10
11
12
ALIVE = '*'
EMPTY = '-'

class Grid:
#...

def count_neighbors(y, x, get):
#...

async def game_logic(state, neighbors):
# ...
data = await my_socket.read(500)

加上async,表示该函数是一个协程,这样我们就可以在函数里面用await做I/O了。

协程是采用async关键字所定义的函数。如果你想执行这个协程,但并不要求立刻就获得执行结果,而是稍后再来获取,那么可以通过await关键字表达这个意思。协程能够制造出这样一种效果,让人以为程序里有成千上万个函数都在同一时刻高效地运行着。协程可以用fan-out(分派)与fan-in(归集)模式实现并行的I/O操作,而且能够克服用线程做I/O时的缺陷。

协程的优点是,能够把那些与外部环境交互的代码(例如I/O调用)与那些实现自身需求的代码(例如事件循环)解耦。这让我们可以把重点放在实现需求所用的逻辑上面,而不用专门花时间去写一些代码来确保这些需求能够并发地执行。

第61条、学会用asyncio改写那些通过线程实现的I/O

Python已经将异步执行功能很好地集成到语言里面了,所以我们很容易就能把采用线程实现的阻塞式I/O操作转化为采用协程实现的异步I/O操作。

asyncio库的文档(https://docs.python.org/3/library/asyncio.html)

Python提供了异步版本的for循环、with语句、生成器与推导机制,而且还有很多辅助的库函数,让我们能够顺利地迁移到协程方案。我们很容易就能利用内置的asyncio模块来改写代码,让程序不要再通过线程执行阻塞式的I/O,而是改用协程来执行异步I/O。

第62条、结合线程与协程,将代码顺利迁移到asyncio

如果项目比较大,那通常需要一点一点地迁移,也就是要边改边测,确保迁移过去的这一部分代码的效果跟原来相同。

为了能够分步骤地迁移,必须让采用线程做阻塞式I/O的那些代码能够与采用协程做异步I/O的代码相互兼容。具体来说,这要求我们既能够在线程里面执行协程,又能够在协程里面启动线程并等待运行结果。好在asyncio模块已经内置了相关的机制,让线程与协程可以顺利地操作对方。

asyncio模块的事件循环提供了一个返回awaitable对象的run_in_executor方法,它能够使协程把同步函数放在线程池执行器(ThreadPoolExecutor)里面执行,让我们可以顺利地将采用线程方案所实现的项目,从上至下地迁移到asyncio方案。

asyncio模块的事件循环还提供了一个可以在同步代码里面调用的run_until_complete方法,用来运行协程并等待其结束。它的功能跟asyncio.run_coroutine_threadsafe类似,只是后者面对的是跨线程的场合,而前者是为同一个线程设计的。这些都有助于将采用线程方案所实现的项目从下至上地迁移到asyncio方案。

只要输入文件的句柄处于开启状态,相应的工作线程就不会退出。反过来说,这条线程要是退出了,那就意味着有人把那份文件的句柄关了。于是,只需要等待所有的线程都完工,就可以确定这些文件的句柄已经全部关闭。

我们在判断是否需要移植时,应该考虑到,这样做会不会让代码变得难懂,会不会降低程序的效率。有的时候,所有代码都应该迁移到asyncio,但另一些场合则没必要这么做。

上面讲的是从上往下迁移,我们现在反过来,看看如何从下往上迁移。这也可以分成四步,但方向相反。原来是从顶层函数入手,沿着调用栈向下走,现在是从末端函数(也就是叶节点)入手,沿着调用栈向上走,一直走到整个调用体系的顶层,也就是入口点所在的那一层。具体步骤为:

  • 1)为要移植的每个末端函数(leaf function)都创建一个对应的异步协程版本。
  • 2)修改现有的同步版本函数,把它原来执行的那些实际操作全都拿掉,让它只通过事件循环去调用刚写的那个异步版本函数。
  • 3)沿着调用栈向上移动一层,针对这一层里的相关函数制作对应的异步版本,并让那些异步版本函数去调用第1步里创建的相应协程。
  • 4)把第2步里纯粹为了封装协程而设的同步版本删掉,因为上一层现在调用的是下一层里的异步版本函数,这些同步版本现在已经用不到了。

第63条、让asyncio的事件循环保持畅通,以便进一步提升程序的响应能力

把系统调用(包括阻塞式的I/O以及启动线程等操作)放在协程里面执行,会降低程序的响应能力,增加延迟感。

调用asyncio.run函数时,把debug参数设为True,可以帮助我们发现这种问题。

如:

1
2
3
4
5
6
import time

async def slow_coroutine():
time.sleep(0.5)

asyncio.run(slow_coroutine(), debug=True)

第64条、考虑用concurrent.futures实现真正的并行计算

有些Python程序写到一定阶段,性能就再也上不去了。即便优化了代码,程序的执行速度可能还是达不到要求。考虑到现在的计算机所装配的CPU核心数量越来越多,所以我们很自然地就想到用并行方式来解决这个问题。那么接下来就必须思考,如何将代码所要执行的计算任务划分成多个独立的部分并在各自的核心上面平行地运行。

Python的全局解释器锁(global interpreter lock,GIL)导致我们没办法用线程来实现真正的并行,所以先把这种方案排除掉。另一种常见的方案,是把那些对性能要求比较高的(performance-critical)代码用C语言重写成扩展模块。C语言比Python更接近底层硬件,因此运行速度要比Python快,这样的话,有些任务可能根本就不需要做并行,而是单单用C语言重写一遍就好。另外,C扩展还可以启动原生线程(native thread),这种线程不受Python解释器制约,也不必考虑GIL的问题,它们能够平行地运行,从而发挥出多核CPU的优势。Python里面针对C扩展而设计的那些API,有详细的文档可以参考,所以这是个很好的备选方案。大家在开发扩展模块的时候,还可以借助SWIG(https://github.com/swig/swig)与CLIF(https://github.com/google/clif)等工具。

然而,用C语言重写Python代码,代价是比较高的。因为有些代码在Python之中很简洁,但是改写成C代码之后,就变得特别难懂、特别复杂了。在移植过程中,我们还必须做大量的测试,以确保移植过去的那些代码跟原来的Python代码效果相同,并且不会引入bug。有的时候,这些工作确实很有意义,所以Python行业里面出现了大量的C扩展模块,用来迅速执行各种任务,例如文本解析、图像合成、矩阵运算等。另外还有Cython(https://cython.org/)与Numba(https://numba.pydata.org/)这样的开源工具帮我们顺利地向C语言移植。

问题是,在大多数情况下,我们不能只把整个程序里的一小段代码移植到C语言,因为程序的性能之所以缓慢,通常是由多个因素造成的,而不是说只要消除了其中某一个主要因素,整个程序的性能就会大幅提升。要想把C语言在底层硬件与线程方面的优势发挥出来,必须把程序里的许多代码都迁移过去,这会让测试量激增,而且容易引入新的bug。所以,还是得想想有没有什么好办法,能够在Python语言自身的范围内,解决这种复杂的并行计算问题。

Python内置的multiprocessing模块提供了多进程机制,这种机制很容易通过内置的concurrent.futures模块来使用,这可能就是我们要找的理想方案(相关范例参见第59条)。这种方案可以启动许多条子进程(child process),这些进程是独立于主解释器的,它们有各自的解释器与相应的全局解释器锁,因此这些子进程可以平行地运行在CPU的各个核心上面。每条子进程都能够充分利用它所在的这个核心来执行运算。这些子进程都有指向主进程的链接,用来接收所要执行的计算任务并返回结果。

如计算最大公约数:

1
2
3
4
5
6
7
def gcd(pair):
a, b = pair
low = min(a, b)
for i in range(low, 0, -1):
if a % 1 == 0 and b % i == 0:
return i
assert False, 'Not reachable'

如果把有待求解最大公约数的那些元组按照先后顺序交给这个函数去执行,那么程序花费的总时间就会随着元组的数量呈正比例上升,因为我们根本就没有做平行计算。

直接把这种代码分给多条Python线程去执行,是不会让程序提速的,因为它们全都受制于同一个Python全局解释器锁(GIL),无法真正平行地运行在各自的CPU核心上面。

使用concurrent.futures模块里面的ThreadPoolExecutor类,并允许它最多可以启用两条工作线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import my_module
from concurrent.futures import ThreadPoolExecutor
import time

NUMBERS = [
# ...
]

def main():
start = time.time()
pool = ThreadPoolExecutor(max_workers = 2)
results = list(pool.map(my_module.gcd, NUMBERs))
end = time.time()
delta = end - start
print(f'Took {delta:.3f} seconds')
if __name__ == '__main__':
main()

由于要启动线程池并和它通信,这种写法比单线程版本还慢。

但是请注意,只需要变动一行代码就能让程序出现奇效,也就是把ThreadPoolExecutor改成concurrent.futures模块里的ProcessPoolExecutor。这样一改,程序立刻就快了起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import my_module
from concurrent.futures import ProcessPoolExecutor
import time

NUMBERS = [
# ...
]

def main():
start = time.time()
pool = ThreadPoolExecutor(max_workers = 2)
results = list(pool.map(my_module.gcd, NUMBERs))
end = time.time()
delta = end - start
print(f'Took {delta:.3f} seconds')
if __name__ == '__main__':
main()

这是为什么呢?因为ProcessPool-Executor类会执行下面这一系列的步骤(当然,这实际上是由multiprocessing模块里的底层机制所推动的)。

  • 1)从包含输入数据的NUMBERS列表里把每个元素取出来,以便交给map。
  • 2)用pickle模块对每个元素做序列化处理,把它转成二进制形式(参见第68条)。
  • 3)将序列化之后的数据,从主解释器所在的进程经由本地socket复制到子解释器所在的进程。
  • 4)在子进程里面,用pickle模块对数据做反序列化处理,把它还原成Python对象。
  • 5)引入包含gcd函数的那个Python模块。
  • 6)把刚才还原出来的那个对象交给gcd函数去处理,此时,其他子进程也可以把它们各自的那份数据交给它们各自的gcd函数执行。
  • 7)对执行结果做序列化处理,把它转化成二进制形式。
  • 8)将二进制数据通过socket复制到上级进程。
  • 9)在上级进程里面对二进制数据做反序列化处理,把它还原成Python对象。
  • 10)把每条子进程所给出的结果都还原好,最后合并到一个list里面返回。

从开发者这边来看,这个过程似乎很简单,但实际上,multiprocessing模块与Proce-ssPoolExecutor类要做大量的工作才能实现出这样的并行效果。同样的效果,假如改用其他语言来做,那基本上只需要用一把锁或一项原子操作就能很好地协调多个线程,从而实现并行。但这在Python里面不行,所以我们才考虑通过ProcessPoolExecutor来实现。然而这样做的开销很大,因为它必须在上级进程与子进程之间做全套的序列化与反序列化处理。

这个方案对那种孤立的而且数据利用度较高的任务来说,比较合适。所谓孤立(isolated),这里指每一部分任务都不需要跟程序里的其他部分共用状态信息。所谓数据利用度较高(high-leverage),这里指任务所使用的原始材料以及最终所给出的结果数据量都很小,因此上级进程与子进程之间只需要互传很少的信息就行,然而在把原始材料加工成最终产品的过程中,却需要做大量运算。刚才那个求最大公约数的任务就属于这样的例子,当然还有很多涉及其他数学算法的任务,也是如此。

如果你面对的计算任务不具备刚才那两项特征,那么使用ProcessPoolExecutor所引发的开销可能就会盖过因为并行而带来的好处。在这种情况下,我们可以考虑直接使用multiprocessing所提供的一些其他高级功能,例如共享内存(shared memory)、跨进程的锁(cross-process lock)、队列(queue)以及代理(proxy)等。但是,这些功能都相当复杂,即便两个Python线程之间所要共享的进程只有一条,也是要花很大工夫才能在内存空间里面将这些工具安排到位。假如需要共享的进程有很多条,而且还涉及socket,那么这种代码理解起来会更加困难。

只有在其他方案全都无效的情况下,才可以考虑直接使用multiprocessing里面的高级功能(那些功能用起来相当复杂)。不要刚一上来,就立刻使用跟multiprocessing这个内置模块有关的机制,而是可以先试着用ThreadPoolExecutor来运行这种孤立且数据利用度较高的任务。把这套方案实现出来之后,再考虑向ProcessPoolExecutor方案迁移。如果ProcessPoolExecutor方案也无法满足要求,而且其他办法也全都试遍了,那么最后可以考虑直接使用multiprocessing模块里的高级功能来编写代码。


8.稳定与性能

写了一个有用的Python程序之后,接下来就该考虑怎样让代码变得健壮(robust)起来,只有这样,才能将这个程序变为正式的产品(productionize)。把程序的功能写对,当然是很重要的,然而我们还得考虑怎样让程序在面对意外情况时,依然能够可靠地运作。Python有很多内置的特性与模块,可以帮我们加固程序代码,让它应付各种各样的状况。

说到健壮,其中一项指标在于能不能高效地应对大规模的数据。我们经常发现自己写的Python程序在处理少量数据时没有问题,但数据量一多,速度就下降,这可能是因为自己的算法过于复杂,或者计算的时候还有其他一些开销。不过没关系,Python提供了很多算法和数据结构,可以让我们轻松地写出高性能的程序。

第65条、合理利用try/except/else/finally结构中的每个代码块

在Python代码中处理异常,需要考虑四个情况,这正好对应try/except/else/finally这个结构中的四个代码块。这种复合语句的每块代码都有各自的用途,你可以全写,也可以只写其中几个

try/finally形式

如果我们想确保,无论某段代码有没有出现异常,与它配套的清理代码都必须得到执行,同时还想在出现异常的时候,把这个异常向上传播,那么可以将这两段代码分别放在try/finally结构的两个代码块里面。最常见的例子是确保文件句柄能够关闭

1
2
3
4
5
6
7
8
9
def try_finally_example(filename):
print('* Opening file')
handle = open(filename, encoding='utf-8')
try:
print('* Reading data')
return handle.read()
finally:
print('* Calling close()')
handle.close()

如果read方法抛出异常,那么这个异常肯定会向上传播给调用try_finally_example函数的那段代码,然而在传播之前,系统会记得运行finally块中的代码,使文件句柄(handle)能够关闭(close)。

try/except/else形式

如果你想在某段代码发生特定类型的异常时,把这种异常向上传播,同时又要在代码没有发生异常的情况下,执行另一段代码,那么可以使用try/except/else结构表达这个意思。如果try块代码没有发生异常,那么else块就会运行。try里面应该尽量少写一些代码,这样阅读起来比较清晰,而且即便出现异常,我们也能很快找到它是由哪一行代码引发的。

1
2
3
4
5
6
7
8
9
10
11
12
import json

def load_json_key(data, key):
try:
print('* Loading JSON data')
result_dict = json.loads(data)
except ValueError as e:
print('* Handling ValueError')
raise KeyError(key) from e
else:
print('* Looking up key')
return result_dict[key]

完整的try/except/else/finally形式

如果这四个代码块的功能全都要用到,那么可以编写完整的try/except/else/finally结构。

第66条、考虑用contextlib和with语句来改写可复用的try/finally代码

Python里的with语句可以用来强调某段代码需要在特殊情境之中执行。例如,如果必须先持有互斥锁,然后才能运行某段代码,那么就可以用with语句来表达这个意思(此时,所谓在特殊情境之中执行,指的就是在持有互斥锁的情况下执行)。

1
2
3
4
5
from threading import Lock

lock = Lock()
with lock:
#...

这样写,其实跟相应的try/finally结构是一个意思,这是因为Lock类做了专门的设计,它结合with结构使用,表达的也是这个意思。

跟try/finally结构相比,with语句的好处在于,写起来比较方便,我们不用在每次执行这段代码前,都通过lock.acquire()加锁,而且也不用总是提醒自己要在finally块里通过lock.release()解锁。

如果想让其他的对象跟函数,也能像Lock这样用在with语句里面,那么可以通过内置的contextlib模块来实现。这个模块提供了contextmanager修饰器,它可以使没有经过特别处理的普通函数也能受到with语句支持。这要比标准做法简单得多,因为那种做法必须定义新类并实现名为enterexit的特殊方法。

例如,有些情况下,我们想在执行某个函数的时候,看到更为详细的调试信息。下面这个函数会打印出两种级别的日志信息,一种是DEBUG级别,一种是ERROR级别。

1
2
3
4
5
6
import logging

def my_function():
logging.debug('Some debug data')
logging.error('Error log here')
logging.debug('More debug data')

这个程序默认的日志级别(log level)是WARNING,所以,它只会把级别大于或等于WARNING的消息打印到屏幕上。DEBUG的级别低于WARNING,因此这种级别的消息是看不到的。

要想临时改变日志级别,可以定义情境管理器(context manager)。用@contextmanager来修饰下面这个辅助函数,就能定义出这样一种管理器。这种管理器可以用在with语句里面,让日志记录器(Logger)在进入这个范围之后,临时改变自己的日志级别,这样的话,原来那些低级别的消息,现在就可以显示出来了,待with语句块执行完毕后,再恢复原有日志级别。

1
2
3
4
5
6
7
8
9
10
11
from contextlib import contextmanager

@contextmanager
def debug_logging(level):
logger = logging.getLogger()
old_level = logger.getEffectiveLevel()
logger.setLevel(level)
try:
yield
finally:
logger.setLevel(old_level)

系统开始执行with语句时,会先把@contextmanager所修饰的debug_logging辅助函数推进到yield表达式所在的地方,然后开始执行with结构的主体部分。如果执行with语句块(也就是主体部分)的过程中发生异常,那么这个异常会重新抛出到yield表达式所在的那一行里,从而为辅助函数中的try结构所捕获。

Python内置的contextlib模块提供了contextmanager修饰器,让我们可以很方便地修饰某个函数,从而制作出相对应的情境管理器,使得这个函数能够运用在with语句里面。情境管理器通过yield语句所产生的值,可以由with语句之中位于as右侧的那个变量所接收,这样的话,我们就可以通过该变量与当前情境相交互了。

带目标的with语句

with语句还有一种写法,叫作with...as...,它可以把情境管理器所返回的对象赋给as右侧的局部变量,这样的话,with结构的主体部分代码就可以通过这个局部变量与情境管理器所针的那套情境交互了。

1
2
with open('my_output.txt', 'w') as handle:
handle.write('This is some data')

与手动打开并关闭文件句柄的写法相比,这种写法更符合Python的风格。这样写,可以保证程序在离开with结构的时候总是会把文件关掉,而且这种结构可以提醒我们把主体部分代码写得简短一些,也就是在打开文件句柄之后,尽快把自己要执行的操作给做完,这是个值得提倡的做法。

第67条、用datetime模块处理本地时间,不要用time模块

协调世界时(Coordinated Universal Time,UTC)是标准的时间表示方法,它不依赖特定时区。有些计算机采用与UNIX时间原点(UNIX epoch)之间的秒数来表达时间,在这种场合,UTC用起来是很方便的,然而对于人类来说,UTC却不太直观,因为我们平常所说的时间,总是默认针对自己所在的地方而言的。例如我们习惯说“早上8点”,而不习惯说“比UTC的15点整早7个小时”。所以,在涉及时间的程序里,我们很有可能要在UTC与当地时区之间互相转换,这样才能给出用户容易理解的格式。

Python里面有两种办法可以转换时区,一种是老办法,也就是通过内置的time模块来做,这种办法很容易出错。还有一种是新办法,也就是通过内置的datetime模块来做,这种办法可以跟第三方的Python开发者所构造的pytz软件包搭配起来,形成很好的转换效果。

time模块

内置time模块的localtime函数可以把UNIX时间戳(UNIX timestamp)转换为与宿主计算机的时区相符的本地时间(UNIX时间戳是个UTC时间,表示某时刻与UNIX时间原点之间的秒数;笔者这台计算机使用的时区为太平洋夏令时(Pacific Daylight Time,PDT),它比UTC慢7个小时)。转换之后的本地时间,可以用strftime函数调整成用户习惯的格式。

许多操作系统会通过相关的配置文件自动反映时区方面的变化,在这样的操作系统上,time模块是可以支持某些时区的。但另外一些操作系统(例如Windows)则不行,所以在那些操作系统上,不要想着用time去做转换。

time模块本质上仍然要依赖具体的平台而运作。它的行为取决于底层的C函数与宿主操作系统之间的协作方式,这导致该模块的功能在Python里面显得不太可靠。time模块没办法稳定地处理多个时区,所以不要用这个模块来编写这方面的代码。假如一定要用,那最多也就是在UTC时间和宿主计算机的当地时区之间用它来转换,涉及其他时区的转换操作,最好还是通过datetime模块来做。

datetime模块

跟time模块一样,它也能把UTC时间转换成本地时间。

1
2
3
4
5
6
from datetime import datetime, timezone

now = datetime(2024, 5, 25, 12, 11, 33)
now_utc = now.replace(tzinfo=timezone.utc)
now_loca = now_utc.astimezone()
print(now_local)

反过来也很容易。我们同样可以用datetime模块把本地时间转换成UTC格式的UNIX时间戳。

1
2
3
4
5
time_str = '2024-05-25 12:11:33'
now = datetime.strptime(time_str, time_format)
time_tuple = now.timetuple()
utc_now = time.mktime(time_tuple)
print(utc_now)

跟time模块不同,datetime模块里面有相应的机制,可以把一个时区的本地时间可靠地转化成另一个时区的本地时间。但问题是,datetime的这套时区操纵机制必须通过tzinfo类与相关的方法来运作,而系统在安装Python的时候,并不会默认安装UTC之外的时区定义信息。

好在其他Python开发者提供了pytz模块,能够把这些缺失的时区定义信息给补上,这个模块可以从Python Package Index下载。pytz包含一整套数据库,你可能会用到的每个时区它应该都有。

为了顺利使用pytz模块,应该先把本地时间转换成UTC时间,然后在这样的时间值上通过datetime所提供的各种方法执行自己想要的操作(例如通过下面提到的astimezone方法来调整时区属性),最后把操作好的时间转回当地时间。

1
2
3
4
5
6
7
import pytz

arrival_nyc = '2024-05-25 12:11:33'
nyc_dt_naive = datetime.strptime(arrival_nyc, time_format)
eastern = pytz.timezone('US/Eastern')
nyc_dt = eastern.localize(nyc_dt_naive)
utc_dt = pytz.utc.normalize(nyc_dt.astimezone(pytz.utc))

把datetime与pytz搭配起来使用,可以确保程序在各种环境下,都能够给出一致的转换结果,而不依赖于具体的操作系统与宿主计算机。

第68条、用copyreg实现可靠的pickle操作

Python内置的pickle模块可以将对象序列化成字节流,也可以把字节流反序列化(还原)成对象。经过pickle处理的字节流,只应该在彼此信任的双方之间传输,而不应该随意传给别人,或者随意接受别人发来的这种数据,因为pickle的本意只是提供一种数据传输手段,让你在自己可以控制的程序之间传递二进制形式的Python对象。

pickle模块所使用的这种序列化格式本身就没有考虑过安全问题。这种格式会把原有的Python对象记录下来,让系统可以在稍后予以重建。这意味着,假如记录的这个对象本身含有恶意行为,那么通过反序列化还原出来之后,就有可能破坏整个程序。

跟pickle不同,json模块考虑到了安全问题。序列化之后的JSON数据表示的只不过是一套对象体系而已,把这样的数据反序列化不会给程序带来风险。如果要在彼此不信任的两个人或两个程序之间传递数据,那么应该使用JSON这样的格式。

pickle模块主要用途仅仅是让我们能够把对象轻松地序列化成二进制数据。如果想直接使用这个模块来实现比这更为复杂的需求,那么可能就会看到奇怪的结果。解决这样的问题,也非常简单,即可以用内置的copyreg模块解决。这个模块允许我们向系统注册相关的函数,把Python对象的序列化与反序列化操作交给那些函数去处理,这样的话,pickle模块就运作得更加稳定了。

第69条、在需要准确计算的场合,用decimal表示相应的数值

Python语言很擅长操纵各种数值。它的整数类型实际上可以表示任意尺寸的整型数据,它的双精度浮点数类型遵循IEEE 754规范。另外,Python还提供了标准的复数类型,用来表示虚数。尽管有这么多类型,但还是没办法把每种情况都覆盖到。

例如,我们要给国际长途电话计费。通话时间用分和秒来表示,这项数据是已知的(例如3分42秒)。通话费率也是固定的,例如从美国打给南极洲的电话,每分钟1.45美元。现在要计算这次通话的具体费用。有人可能觉得应该用浮点数来计算。

1
2
3
rate = 1.45
seconds = 3*60 + 42
cost = rate * seconds/ 60 # 5.364999...

这个答案比正确答案(5.365)少了0.000000000000001,这是因为浮点数必须表示成IEEE 754格式,所以采用浮点数算出的结果可能跟实际结果稍有偏差。

这样的计算应该用Python内置的decimal模块所提供的Decimal类来做。这个类默认支持28位小数,如果有必要,还可以调得更高。改用这个类之后,就不会出现由于IEEE 754浮点数而造成的偏差了。另外,这种数值所支持的舍入方式,也比浮点数丰富可控。

1
2
3
4
5
from decimal import Decimal

rate = Decimal('1.45')
seconds = Decimal(3*60 + 42)
cost = rate * seconds / Decimal(60)

Decimal的初始值可以用两种办法来指定。第一种,是把含有数值的str字符串传给Decimal的构造函数,这样做不会让字符串里面的数值由于Python本身的浮点数机制而出现偏差。第二种,是直接把float或int实例传给构造函数。通过下面这段代码,我们可以看到,这两种办法在某些小数上会产生不同的效果。

1
2
Decimal('1.45')  # 1.45
Decimal(1.45) # 1.449999999...

所以,如果你想要的是准确答案,那么应该使用str字符串来构造Decimal,这种decimal的精确度可能比你需要的更高,但无论如何,都比刚一开始就出现偏差要好。

Decimal类提供了quantize函数,可以根据指定的舍入方式把数值调整到某一位。

第70条、先分析性能,然后再优化

Python的动态机制,让我们很难预判程序在运行时的性能。有些操作,看上去似乎比较慢,但实际执行起来却很快(例如操纵字符串,使用生成器等);还有一些操作,看上去似乎比较快,但实际执行起来却很慢(例如访问属性,调用函数等)。让Python程序速度变慢的原因,有时很难观察出来。

所以,最好不要凭感觉去判断,而是应该先获得具体的测评数据,然后再决定怎么优化。Python内置了profiler模块,可以找到程序里面占总执行时间比例最高的一部分,这样的话,我们就可以专心优化这部分代码,而不用执着于对程序性能影响不大的那些地方(因为你把同样的精力投入到那些地方,产生的提速效果不会太好)。

Python内置了两种profiler,一种是由profile模块提供的纯Python版本,还有一种是由cProfile模块提供的C扩展版本。这个版本比纯Python版要好,因为它在执行测评的过程中,对受测程序的影响比较小,测评结果更加准确。相反,纯Python版本的开销比较大,会令测评结果有所偏差。

分析Python程序的性能之前,一定要提醒自己注意,别把访问外部系统的那些代码,与核心代码混在一起测。例如,访问网络或磁盘资源的那些函数就有可能需要调用某些底层机制,而那些机制的运作速度比较慢,从而对程序的执行时间造成很大影响。另外,如果程序把这些访问速度较慢的资源缓存了起来,那么在开始分析性能之前,一定要先将缓存预热(也就是要把里面的内容提前配置好)。

1
2
3
4
from cProfile import Profile

profiler = Profile()
profiler.runcall(test)

运行完test函数后,用内置的pstats模块与里面的Stats类来统计结果。Stats对象提供了各种方法,通过这些方法我们可以只把自己关注的那部分测评信息打印出来。

1
2
3
4
5
6
from pstats import Stats

stats = Stats(profiler)
stats.strip_dirs()
stats.sort_stats('cumulative')
stats.print_stats()

print_stats方法输出了一张表格,其中每一行都表示一个函数的执行情况。这些样本,都是在profiler激活之后才开始采集的,或者说是在执行profiler.runcall(test)的过程中采集的。

1
2
3
4
5
ncalls:函数在测评期间的调用次数。
tottime:程序执行这个函数本身所花时间(不包括该函数调用其他函数所花时间)。
tottime percall:程序每次执行这个函数所花的平均时间(不统计该函数调用其他函数所花时间)。这相当于tottime除以ncalls。
cumtime:程序执行这个函数以及它所调用的其他函数所花时间。
cumtime percall:程序每次执行这个函数以及它所调用的其他函数平均花费时间。这相当于cumtime除以ncalls。

profiler的print_callers方法来打印统计结果,它可以显示出程序里面有哪几个函数调用了我们关心的这个函数。

可以通过Stats对象筛选出我们关心的那些分析结果,从而更为专注地思考如何优化程序性能。

第71条、优先考虑用deque实现生产者-消费者队列

写程序的时候,经常要用到先进先出的(first-in, first-out,FIFO)队列,这种队列也叫作生产者-消费者队列(producer–consumer queue)或生产-消费队列。FIFO队列可以把某个函数给出的值收集起来,并交给另一个函数按序处理。一般来说,开发者会用Python内置的list类型来实现FIFO队列。

用list来实现这种生产-消费队列,在一定程度上是没问题的,但是当基数(cardinality,也就是列表中的元素数量)变多之后,list的性能就会下降,并且不是等比例地下降,而是更为严重。为了对采用列表所实现的FIFO队列做性能分析,我们通过内置的timeit模块执行一些micro-benchmark。这种micro-benchmark放在相应的benchmark函数里面执行,比如这里定义的list_append_benchmark函数测评的就是用list的append方法给队列中添加新元素时的性能(这正是刚才的生产函数所用的做法)。

1
2
3
4
import timeit

# ...
timeit.repeat(...)

list类型可以用来实现FIFO队列,生产者可以通过append方法向队列添加元素。但这种方案有个问题,就是消费者在用pop(0)从队列中获取元素时,所花的时间会随着队列长度,呈平方式增长。

跟list不同,内置collections模块之中的deque类,无论是通过append添加元素,还是通过popleft获取元素,所花的时间都只跟队列长度呈线性关系,而非平方关系,这使得它非常适合于FIFT队列。

第72条、考虑用bisect搜索已排序的序列

我们经常要将大量数据载入内存,并把它们放到一份有序的列表之中以便搜索。例如,可能要把一本英语辞典加载进来以实现拼写检查,或者把一套财务交易数据加载进来以便审计。

不管程序要处理的是什么数据,在列表(list)里面通过index方法搜索某个值,所花的时间都跟列表长度成正比(或者说,随着列表长度呈线性增长)。

如果你不能确定这个值是否在列表里面,那么你要查的就应该是:列表中恰好等于目标值,或比目标值大但最接近目标值的那个元素所在的位置。要想找到这个位置,最简单的办法是对列表做线性扫描,把其中的元素逐个与目标值比较。

Python内置的bisect模块可以更好地搜索有序列表。其中的bisect_left函数,能够迅速地对任何一个有序的序列执行二分搜索。如果序列中有这个值,那么它返回的就是跟这个值相等的头一个元素所在的位置;如果没有,那么它返回的是插入位置,也就是说,把待查的值插到这个位置可以让序列继续保持有序。

1
2
3
4
5
6
from bisect import bisect_left

index = bisect_left(data, 91234)
assert index == 91234
index = bisect_left(data, 91234.56)
assert index == 91235

bisect模块的二分搜索算法,在复杂度上面是对数级别的。这意味着,线性搜索算法(list.index方法)在包含20个元素的列表中查询目标值所花的时间,已经够这个算法搜索长度为一百万个元素的列表了(math.log2(10**6)大约是19.93)。它要比线性搜索快得多!

bisect最好的地方,是它不局限于list类型,而是可以用在任何一种行为类似序列的对象上面。bisect模块还提供了其他一些功能,可以实现更为高级的用法(在Python解释器界面输入help(bisect),查看详细文档)。

第73条、学会使用heapq制作优先级队列

有的时候,我们想根据元素的重要程度来排序。在这种情况下,应该使用优先级队列(priority queue)。

模块名称里面的heap指的是堆,这是一种数据结构,可以维护列表中的元素,并且只需要对数级别的时间就可以添加新元素或移除其中最小的元素(这种算法的复杂程度,要低于线性算法,所以效率比线性算法高)。在这个图书馆程序里面,所谓最小的元素是指逾期时间最长的(或者说,应还日期距离现在最远的)那次出借记录。这个模块最好的地方,就是我们不用了解相关的算法如何实现,只需要调用它就行。

优先级队列让我们能够按照重要程度来处理元素,而不是必须按照先进先出的顺序处理。

如果直接用相关的列表操作来模拟优先级队列,那么程序的性能会随着队列长度的增大而大幅下降,因为这样做的复杂程度是平方级别,而不是线性级别。

通过Python内置的heapq模块所提供的函数,我们完全可以实现基于堆的优先级队列,从而高效地处理大量数据。

要使用heapq模块,我们必须让元素所在的类型支持自然排序,这可以通过对类套用`@functools.total_ordering`修饰器并定义lt方法来实现。

第74条、考虑用memoryview与bytearray来实现无须拷贝的bytes操作

针对CPU密集型的计算任务,要想用Python程序平行地处理,可能得多花一点功夫,但针对I/O密集型的任务,却很容易就能用各种方式写出吞吐量较大(也就是处理能力较强)的平行代码(参见第53条与第60条)。然而,由于这种代码写起来很简单,特别容易遭到误用,让人以为Python好像慢得连I/O密集型任务都处理不好。

Python内置的memoryview类型提供了一套无须执行拷贝的(也就是零拷贝的)操作接口,让我们可以对支持缓冲协议的Python对象制作切片,并通过这种切片高速地完成读取与写入。

Python内置的bytearray类型是一种与bytes相似但内容能够改变的类型,我们可以通过socket.recv_from这样的函数,以无需拷贝的方式(也就是零拷贝的方式)读取数据。我们可以用memoryview来封装bytearray,从而用收到的数据覆盖底层缓冲里面的任意区段,同时又无需执行拷贝操作。