京公网安备 11010802034615号
经营许可证编号:京B2-20210330
Python多进程multiprocessing用法实例分析
本文实例讲述了Python多进程multiprocessing用法。分享给大家供大家参考,具体如下:
mutilprocess简介
像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。
简单的创建进程:
import multiprocessing
def worker(num):
"""thread worker function"""
print 'Worker:', num
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
确定当前的进程,即是给进程命名,方便标识区分,跟踪
import multiprocessing
import time
def worker():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(2)
print name, 'Exiting'
def my_service():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(3)
print name, 'Exiting'
if __name__ == '__main__':
service = multiprocessing.Process(name='my_service',
target=my_service)
worker_1 = multiprocessing.Process(name='worker 1',
target=worker)
worker_2 = multiprocessing.Process(target=worker) # default name
worker_1.start()
worker_2.start()
service.start()
守护进程就是不阻挡主程序退出,自己干自己的 mutilprocess.setDaemon(True)就这句等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了
守护进程:
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print 'Starting:', name
time.sleep(2)
print 'Exiting :', name
def non_daemon():
name = multiprocessing.current_process().name
print 'Starting:', name
print 'Exiting :', name
if __name__ == '__main__':
d = multiprocessing.Process(name='daemon',
target=daemon)
d.daemon = True
n = multiprocessing.Process(name='non-daemon',
target=non_daemon)
n.daemon = False
d.start()
n.start()
d.join(1)
print 'd.is_alive()', d.is_alive()
n.join()
最好使用 poison pill,强制的使用terminate()注意 terminate之后要join,使其可以更新状态
终止进程:
import multiprocessing
import time
def slow_worker():
print 'Starting worker'
time.sleep(0.1)
print 'Finished worker'
if __name__ == '__main__':
p = multiprocessing.Process(target=slow_worker)
print 'BEFORE:', p, p.is_alive()
p.start()
print 'DURING:', p, p.is_alive()
p.terminate()
print 'TERMINATED:', p, p.is_alive()
p.join()
print 'JOINED:', p, p.is_alive()
①. == 0 未生成任何错误
②. 0 进程有一个错误,并以该错误码退出
③. < 0 进程由一个-1 * exitcode信号结束
进程的退出状态:
import multiprocessing
import sys
import time
def exit_error():
sys.exit(1)
def exit_ok():
return
def return_value():
return 1
def raises():
raise RuntimeError('There was an error!')
def terminated():
time.sleep(3)
if __name__ == '__main__':
jobs = []
for f in [exit_error, exit_ok, return_value, raises, terminated]:
print 'Starting process for', f.func_name
j = multiprocessing.Process(target=f, name=f.func_name)
jobs.append(j)
j.start()
jobs[-1].terminate()
for j in jobs:
j.join()
print '%15s.exitcode = %s' % (j.name, j.exitcode)
方便的调试,可以用logging
日志:
import multiprocessing
import logging
import sys
def worker():
print 'Doing some work'
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p = multiprocessing.Process(target=worker)
p.start()
p.join()
利用class来创建进程,定制子类
派生进程:
import multiprocessing
class Worker(multiprocessing.Process):
def run(self):
print 'In %s' % self.name
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
python进程间传递消息:
import multiprocessing
class MyFancyClass(object):
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print 'Doing something fancy in %s for %s!' % \
(proc_name, self.name)
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
print '%s: %s' % (proc_name, next_task)
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print 'Creating %d consumers' % num_consumers
consumers = [ Consumer(tasks, results)
for i in xrange(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in xrange(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in xrange(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print 'Result:', result
num_jobs -= 1
Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。
进程间信号传递:
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print 'wait_for_event: starting'
e.wait()
print 'wait_for_event: e.is_set()->', e.is_set()
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print 'wait_for_event_timeout: starting'
e.wait(t)
print 'wait_for_event_timeout: e.is_set()->', e.is_set()
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(name='block',
target=wait_for_event,
args=(e,))
w1.start()
w2 = multiprocessing.Process(name='nonblock',
target=wait_for_event_timeout,
args=(e, 2))
w2.start()
print 'main: waiting before calling Event.set()'
time.sleep(3)
e.set()
print 'main: event is set'
Python多进程,一般的情况是Queue来传递。
Queue:
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()
多线程优先队列Queue:
import Queue
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print "Starting " + self.name
process_data(self.name, self.q)
print "Exiting " + self.name
def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data = q.get()
queueLock.release()
print "%s processing %s" % (threadName, data)
else:
queueLock.release()
time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1
# Create new threads
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# Fill the queue
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# Wait for queue to empty
while not workQueue.empty():
pass
# Notify threads it's time to exit
exitFlag = 1
# Wait for all threads to complete
for t in threads:
t.join()
print "Exiting Main Thread"
多进程使用Queue通信的例子
import time
from multiprocessing import Process,Queue
MSG_QUEUE = Queue(5)
def startA(msgQueue):
while True:
if msgQueue.empty() > 0:
print ('queue is empty %d' % (msgQueue.qsize()))
else:
msg = msgQueue.get()
print( 'get msg %s' % (msg,))
time.sleep(1)
def startB(msgQueue):
while True:
msgQueue.put('hello world')
print( 'put hello world queue size is %d' % (msgQueue.qsize(),))
time.sleep(3)
if __name__ == '__main__':
processA = Process(target=startA,args=(MSG_QUEUE,))
processB = Process(target=startB,args=(MSG_QUEUE,))
processA.start()
print( 'processA start..')
主进程定义了一个Queue类型的变量,并作为Process的args参数传给子进程processA和processB,两个进程一个向队列中写数据,一个读数据。
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
CDA持证人已遍布在世界范围各行各业,包括世界500强企业、顶尖科技独角兽、大型金融机构、国企事业单位、国家行政机关等等,“CDA数据分析师”人才队伍遵守着CDA职业道德准则,发挥着专业技能,已成为支撑科技发展的核心力量。 ...
2026-01-22在数字化时代,企业积累的海量数据如同散落的珍珠,而数据模型就是串联这些珍珠的线——它并非简单的数据集合,而是对现实业务场 ...
2026-01-22在数字化运营场景中,用户每一次点击、浏览、交互都构成了行为轨迹,这些轨迹交织成海量的用户行为路径。但并非所有路径都具备业 ...
2026-01-22在数字化时代,企业数据资产的价值持续攀升,数据安全已从“合规底线”升级为“生存红线”。企业数据安全管理方法论以“战略引领 ...
2026-01-22在SQL数据分析与业务查询中,日期数据是高频处理对象——订单创建时间、用户注册日期、数据统计周期等场景,都需对日期进行格式 ...
2026-01-21在实际业务数据分析中,单一数据表往往无法满足需求——用户信息存储在用户表、消费记录在订单表、商品详情在商品表,想要挖掘“ ...
2026-01-21在数字化转型浪潮中,企业数据已从“辅助资源”升级为“核心资产”,而高效的数据管理则是释放数据价值的前提。企业数据管理方法 ...
2026-01-21在数字化商业环境中,数据已成为企业优化运营、抢占市场、规避风险的核心资产。但商业数据分析绝非“堆砌数据、生成报表”的简单 ...
2026-01-20定量报告的核心价值是传递数据洞察,但密密麻麻的表格、复杂的计算公式、晦涩的数值罗列,往往让读者望而却步,导致核心信息被淹 ...
2026-01-20在CDA(Certified Data Analyst)数据分析师的工作场景中,“精准分类与回归预测”是高频核心需求——比如预测用户是否流失、判 ...
2026-01-20在建筑工程造价工作中,清单汇总分类是核心环节之一,尤其是针对楼梯、楼梯间这类包含多个分项工程(如混凝土浇筑、钢筋制作、扶 ...
2026-01-19数据清洗是数据分析的“前置必修课”,其核心目标是剔除无效信息、修正错误数据,让原始数据具备准确性、一致性与可用性。在实际 ...
2026-01-19在CDA(Certified Data Analyst)数据分析师的日常工作中,常面临“无标签高维数据难以归类、群体规律模糊”的痛点——比如海量 ...
2026-01-19在数据仓库与数据分析体系中,维度表与事实表是构建结构化数据模型的核心组件,二者如同“骨架”与“血肉”,协同支撑起各类业务 ...
2026-01-16在游戏行业“存量竞争”的当下,玩家留存率直接决定游戏的生命周期与商业价值。一款游戏即便拥有出色的画面与玩法,若无法精准识 ...
2026-01-16为配合CDA考试中心的 2025 版 CDA Level III 认证新大纲落地,CDA 网校正式推出新大纲更新后的第一套官方模拟题。该模拟题严格遵 ...
2026-01-16在数据驱动决策的时代,数据分析已成为企业运营、产品优化、业务增长的核心工具。但实际工作中,很多数据分析项目看似流程完整, ...
2026-01-15在CDA(Certified Data Analyst)数据分析师的日常工作中,“高维数据处理”是高频痛点——比如用户画像包含“浏览次数、停留时 ...
2026-01-15在教育测量与评价领域,百分制考试成绩的分布规律是评估教学效果、优化命题设计的核心依据,而正态分布则是其中最具代表性的分布 ...
2026-01-15在用户从“接触产品”到“完成核心目标”的全链路中,流失是必然存在的——电商用户可能“浏览商品却未下单”,APP新用户可能“ ...
2026-01-14