京公网安备 11010802034615号
经营许可证编号:京B2-20210330
浅析Python中的多进程与多线程的使用
在批评Python的讨论中,常常说起Python多线程是多么的难用。还有人对 global interpreter lock(也被亲切的称为“GIL”)指指点点,说它阻碍了Python的多线程程序同时运行。因此,如果你是从其他语言(比如C++或Java)转过来的话,Python线程模块并不会像你想象的那样去运行。必须要说明的是,我们还是可以用Python写出能并发或并行的代码,并且能带来性能的显著提升,只要你能顾及到一些事情。
在本文中,我们将会写一个小的Python脚本,用于下载Imgur上最热门的图片。我们将会从一个按顺序下载图片的版本开始做起,即一个一个地下载。在那之前,你得注册一个Imgur上的应用。如果你还没有Imgur账户,请先注册一个。
本文中的脚本在Python3.4.2中测试通过。稍微改一下,应该也能在Python2中运行——urllib是两个版本中区别最大的部分。
开始动手
让我们从创建一个叫“download.py”的Python模块开始。这个文件包含了获取图片列表以及下载这些图片所需的所有函数。我们将这些功能分成三个单独的函数:
第三个函数,“setup_download_dir”,用于创建下载的目标目录(如果不存在的话)。
Imgur的API要求HTTP请求能支持带有client ID的“Authorization”头部。你可以从你注册的Imgur应用的面板上找到这个client ID,而响应会以JSON进行编码。我们可以使用Python的标准JSON库去解码。下载图片更简单,你只需要根据它们的URL获取图片,然后写入到一个文件即可。
代码如下:
import json
import logging
import os
from pathlib import Path
from urllib.request import urlopen, Request
logger = logging.getLogger(__name__)
def get_links(client_id):
headers = {'Authorization': 'Client-ID {}'.format(client_id)}
req = Request('https://api.imgur.com/3/gallery/', headers=headers, method='GET')
with urlopen(req) as resp:
data = json.loads(resp.readall().decode('utf-8'))
return map(lambda item: item['link'], data['data'])
def download_link(directory, link):
logger.info('Downloading %s', link)
download_path = directory / os.path.basename(link)
with urlopen(link) as image, download_path.open('wb') as f:
f.write(image.readall())
def setup_download_dir():
download_dir = Path('images')
if not download_dir.exists():
download_dir.mkdir()
return download_dir
接下来,你需要写一个模块,利用这些函数去逐个下载图片。我们给它命名为“single.py”。它包含了我们最原始版本的Imgur图片下载器的主要函数。这个模块将会通过环境变量“IMGUR_CLIENT_ID”去获取Imgur的client ID。它将会调用“setup_download_dir”去创建下载目录。最后,使用get_links函数去获取图片的列表,过滤掉所有的GIF和专辑URL,然后用“download_link”去将图片下载并保存在磁盘中。下面是“single.py”的代码:
import logging
import os
from time import time
from download import setup_download_dir, get_links, download_link
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)
def main():
ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = [l for l in get_links(client_id) if l.endswith('.jpg')]
for link in links:
download_link(download_dir, link)
print('Took {}s'.format(time() - ts))
if __name__ == '__main__':
main()
在我的笔记本上,这个脚本花了19.4秒去下载91张图片。请注意这些数字在不同的网络上也会有所不同。19.4秒并不是非常的长,但是如果我们要下载更多的图片怎么办呢?或许是900张而不是90张。平均下载一张图片要0.2秒,900张的话大概需要3分钟。那么9000张图片将会花掉30分钟。好消息是使用了并发或者并行后,我们可以将这个速度显著地提高。
接下来的代码示例将只会显示导入特有模块和新模块的import语句。所有相关的Python脚本都可以在这方便地找到this GitHub repository。
使用线程
线程是最出名的实现并发和并行的方式之一。操作系统一般提供了线程的特性。线程比进程要小,而且共享同一块内存空间。
在这里,我们将写一个替代“single.py”的新模块。它将创建一个有八个线程的池,加上主线程的话总共就是九个线程。之所以是八个线程,是因为我的电脑有8个CPU内核,而一个工作线程对应一个内核看起来还不错。在实践中,线程的数量是仔细考究的,需要考虑到其他的因素,比如在同一台机器上跑的的其他应用和服务。
下面的脚本几乎跟之前的一样,除了我们现在有个新的类,DownloadWorker,一个Thread类的子类。运行无限循环的run方法已经被重写。在每次迭代时,它调用“self.queue.get()”试图从一个线程安全的队列里获取一个URL。它将会一直堵塞,直到队列中出现一个要处理元素。一旦工作线程从队列中得到一个元素,它将会调用之前脚本中用来下载图片到目录中所用到的“download_link”方法。下载完成之后,工作线程向队列发送任务完成的信号。这非常重要,因为队列一直在跟踪队列中的任务数。如果工作线程没有发出任务完成的信号,“queue.join()”的调用将会令整个主线程都在阻塞状态。
from queue import Queue
from threading import Thread
class DownloadWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
# Get the work from the queue and expand the tuple
# 从队列中获取任务并扩展tuple
directory, link = self.queue.get()
download_link(directory, link)
self.queue.task_done()
def main():
ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = [l for l in get_links(client_id) if l.endswith('.jpg')]
# Create a queue to communicate with the worker threads
queue = Queue()
# Create 8 worker threads
# 创建八个工作线程
for x in range(8):
worker = DownloadWorker(queue)
# Setting daemon to True will let the main thread exit even though the workers are blocking
# 将daemon设置为True将会使主线程退出,即使worker都阻塞了
worker.daemon = True
worker.start()
# Put the tasks into the queue as a tuple
# 将任务以tuple的形式放入队列中
for link in links:
logger.info('Queueing {}'.format(link))
queue.put((download_dir, link))
# Causes the main thread to wait for the queue to finish processing all the tasks
# 让主线程等待队列完成所有的任务
queue.join()
print('Took {}'.format(time() - ts))
在同一个机器上运行这个脚本,下载时间变成了4.1秒!即比之前的例子快4.7倍。虽然这快了很多,但还是要提一下,由于GIL的缘故,在这个进程中同一时间只有一个线程在运行。因此,这段代码是并发的但不是并行的。而它仍然变快的原因是这是一个IO密集型的任务。进程下载图片时根本毫不费力,而主要的时间都花在了等待网络上。这就是为什么线程可以提供很大的速度提升。每当线程中的一个准备工作时,进程可以不断转换线程。使用Python或其他有GIL的解释型语言中的线程模块实际上会降低性能。如果你的代码执行的是CPU密集型的任务,例如解压gzip文件,使用线程模块将会导致执行时间变长。对于CPU密集型任务和真正的并行执行,我们可以使用多进程(multiprocessing)模块。
官方的Python实现——CPython——带有GIL,但不是所有的Python实现都是这样的。比如,IronPython,使用.NET框架实现的Python就没有GIL,基于Java实现的Jython也同样没有。你可以点这查看现有的Python实现。
生成多进程
多进程模块比线程模块更易使用,因为我们不需要像线程示例那样新增一个类。我们唯一需要做的改变在主函数中。
为了使用多进程,我们得建立一个多进程池。通过它提供的map方法,我们把URL列表传给池,然后8个新进程就会生成,它们将并行地去下载图片。这就是真正的并行,不过这是有代价的。整个脚本的内存将会被拷贝到各个子进程中。在我们的例子中这不算什么,但是在大型程序中它很容易导致严重的问题。
from functools import partial
from multiprocessing.pool import Pool
def main():
ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = [l for l in get_links(client_id) if l.endswith('.jpg')]
download = partial(download_link, download_dir)
with Pool(8) as p:
p.map(download, links)
print('Took {}s'.format(time() - ts))
分布式任务
你已经知道了线程和多进程模块可以给你自己的电脑跑脚本时提供很大的帮助,那么在你想要在不同的机器上执行任务,或者在你需要扩大规模而超过一台机器的的能力范围时,你该怎么办呢?一个很好的使用案例是网络应用的长时间后台任务。如果你有一些很耗时的任务,你不会希望在同一台机器上占用一些其他的应用代码所需要的子进程或线程。这将会使你的应用的性能下降,影响到你的用户们。如果能在另外一台甚至很多台其他的机器上跑这些任务就好了。
Python库RQ非常适用于这类任务。它是一个简单却很强大的库。首先将一个函数和它的参数放入队列中。它将函数调用的表示序列化(pickle) ,然后将这些表示添加到一个Redis列表中。任务进入队列只是第一步,什么都还没有做。我们至少还需要一个能去监听任务队列的worker(工作线程)。
第一步是在你的电脑上安装和使用Redis服务器,或是拥有一台能正常的使用的Redis服务器的使用权。接着,对于现有的代码只需要一些小小的改动。先创建一个RQ队列的实例并通过redis-py 库传给一台Redis服务器。然后,我们执行“q.enqueue(download_link, download_dir, link)”,而不只是调用“download_link” 。enqueue方法的第一个参数是一个函数,当任务真正执行时,其他的参数或关键字参数将会传给该函数。
最后一步是启动一些worker。RQ提供了方便的脚本,可以在默认队列上运行起worker。只要在终端窗口中执行“rqworker”,就可以开始监听默认队列了。请确认你当前的工作目录与脚本所在的是同一个。如果你想监听别的队列,你可以执行“rqworker queue_name”,然后将会开始执行名为queue_name的队列。RQ的一个很好的点就是,只要你可以连接到Redis,你就可以在任意数量上的机器上跑起任意数量的worker;因此,它可以让你的应用扩展性得到提升。下面是RQ版本的代码:
from redis import Redis
from rq import Queue
def main():
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = [l for l in get_links(client_id) if l.endswith('.jpg')]
q = Queue(connection=Redis(host='localhost', port=6379))
for link in links:
q.enqueue(download_link, download_dir, link)
然而RQ并不是Python任务队列的唯一解决方案。RQ确实易用并且能在简单的案例中起到很大的作用,但是如果有更高级的需求,我们可以使用其他的解决方案(例如 Celery)。
总结
如果你的代码是IO密集型的,线程和多进程可以帮到你。多进程比线程更易用,但是消耗更多的内存。如果你的代码是CPU密集型的,多进程就明显是更好的选择——特别是所使用的机器是多核或多CPU的。对于网络应用,在你需要扩展到多台机器上执行任务,RQ是更好的选择。
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
在神经网络模型搭建中,“最后一层是否添加激活函数”是新手常困惑的关键问题——有人照搬中间层的ReLU激活,导致回归任务输出异 ...
2025-12-05在机器学习落地过程中,“模型准确率高但不可解释”“面对数据噪声就失效”是两大核心痛点——金融风控模型若无法解释决策依据, ...
2025-12-05在CDA(Certified Data Analyst)数据分析师的能力模型中,“指标计算”是基础技能,而“指标体系搭建”则是区分新手与资深分析 ...
2025-12-05在回归分析的结果解读中,R方(决定系数)是衡量模型拟合效果的核心指标——它代表因变量的变异中能被自变量解释的比例,取值通 ...
2025-12-04在城市规划、物流配送、文旅分析等场景中,经纬度热力图是解读空间数据的核心工具——它能将零散的GPS坐标(如外卖订单地址、景 ...
2025-12-04在CDA(Certified Data Analyst)数据分析师的指标体系中,“通用指标”与“场景指标”并非相互割裂的两个部分,而是支撑业务分 ...
2025-12-04每到“双十一”,电商平台的销售额会迎来爆发式增长;每逢冬季,北方的天然气消耗量会显著上升;每月的10号左右,工资发放会带动 ...
2025-12-03随着数字化转型的深入,企业面临的数据量呈指数级增长——电商的用户行为日志、物联网的传感器数据、社交平台的图文视频等,这些 ...
2025-12-03在CDA(Certified Data Analyst)数据分析师的工作体系中,“指标”是贯穿始终的核心载体——从“销售额环比增长15%”的业务结论 ...
2025-12-03在神经网络训练中,损失函数的数值变化常被视为模型训练效果的“核心仪表盘”——初学者盯着屏幕上不断下降的损失值满心欢喜,却 ...
2025-12-02在CDA(Certified Data Analyst)数据分析师的日常工作中,“用部分数据推断整体情况”是高频需求——从10万条订单样本中判断全 ...
2025-12-02在数据预处理的纲量统一环节,标准化是消除量纲影响的核心手段——它将不同量级的特征(如“用户年龄”“消费金额”)转化为同一 ...
2025-12-02在数据驱动决策成为企业核心竞争力的今天,A/B测试已从“可选优化工具”升级为“必选验证体系”。它通过控制变量法构建“平行实 ...
2025-12-01在时间序列预测任务中,LSTM(长短期记忆网络)凭借对时序依赖关系的捕捉能力成为主流模型。但很多开发者在实操中会遇到困惑:用 ...
2025-12-01引言:数据时代的“透视镜”与“掘金者” 在数字经济浪潮下,数据已成为企业决策的核心资产,而CDA数据分析师正是挖掘数据价值的 ...
2025-12-01数据分析师的日常,常始于一堆“毫无章法”的数据点:电商后台导出的零散订单记录、APP埋点收集的无序用户行为日志、传感器实时 ...
2025-11-28在MySQL数据库运维中,“query end”是查询执行生命周期的收尾阶段,理论上耗时极短——主要完成结果集封装、资源释放、事务状态 ...
2025-11-28在CDA(Certified Data Analyst)数据分析师的工具包中,透视分析方法是处理表结构数据的“瑞士军刀”——无需复杂代码,仅通过 ...
2025-11-28在统计分析中,数据的分布形态是决定“用什么方法分析、信什么结果”的底层逻辑——它如同数据的“性格”,直接影响着描述统计的 ...
2025-11-27在电商订单查询、用户信息导出等业务场景中,技术人员常面临一个选择:是一次性查询500条数据,还是分5次每次查询100条?这个问 ...
2025-11-27