京公网安备 11010802034615号
经营许可证编号:京B2-20210330
Python利用多进程将大量数据放入有限内存的教程
这是一篇有关如何将大量的数据放入有限的内存中的简略教程。
与客户工作时,有时会发现他们的数据库实际上只是一个csv或Excel文件仓库,你只能将就着用,经常需要在不更新他们的数据仓库的情况下完成工作。大部分情况下,如果将这些文件存储在一个简单的数据库框架中或许更好,但时间可能不允许。这种方法对时间、机器硬件和所处环境都有要求。
下面介绍一个很好的例子:假设有一堆表格(没有使用Neo4j、MongoDB或其他类型的数据库,仅仅使用csvs、tsvs等格式存储的表格),如果将所有表格组合在一起,得到的数据帧太大,无法放入内存。所以第一个想法是:将其拆分成不同的部分,逐个存储。这个方案看起来不错,但处理起来很慢。除非我们使用多核处理器。
目标
这里的目标是从所有职位中(大约1万个),找出相关的的职位。将这些职位与政府给的职位代码组合起来。接着将组合的结果与对应的州(行政单位)信息组合起来。然后用通过word2vec生成的属性信息在我们的客户的管道中增强已有的属性。
这个任务要求在短时间内完成,谁也不愿意等待。想象一下,这就像在不使用标准的关系型数据库的情况下进行多个表的连接。
数据
示例脚本
下面的是一个示例脚本,展示了如何使用multiprocessing来在有限的内存空间中加速操作过程。脚本的第一部分是和特定任务相关的,可以自由跳过。请着重关注第二部分,这里侧重的是multiprocessing引擎。
#import the necessary packages
import pandas as pd
import us
import numpy as np
from multiprocessing import Pool,cpu_count,Queue,Manager
# the data in one particular column was number in the form that horrible excel version
# of a number where '12000' is '12,000' with that beautiful useless comma in there.
# did I mention I excel bothers me?
# instead of converting the number right away, we only convert them when we need to
def median_maker(column):
return np.median([int(x.replace(',','')) for x in column])
# dictionary_of_dataframes contains a dataframe with information for each title; e.g title is 'Data Scientist'
# related_title_score_df is the dataframe of information for the title; columns = ['title','score']
### where title is a similar_title and score is how closely the two are related, e.g. 'Data Analyst', 0.871
# code_title_df contains columns ['code','title']
# oes_data_df is a HUGE dataframe with all of the Bureau of Labor Statistics(BLS) data for a given time period (YAY FREE DATA, BOO BAD CENSUS DATA!)
def job_title_location_matcher(title,location):
try:
related_title_score_df = dictionary_of_dataframes[title]
# we limit dataframe1 to only those related_titles that are above
# a previously established threshold
related_title_score_df = related_title_score_df[title_score_df['score']>80]
#we merge the related titles with another table and its codes
codes_relTitles_scores = pd.merge(code_title_df,related_title_score_df)
codes_relTitles_scores = codes_relTitles_scores.drop_duplicates()
# merge the two dataframes by the codes
merged_df = pd.merge(codes_relTitles_scores, oes_data_df)
#limit the BLS data to the state we want
all_merged = merged_df[merged_df['area_title']==str(us.states.lookup(location).name)]
#calculate some summary statistics for the time we want
group_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90 = all_merged[['tot_emp','a_mean','a_pct10','a_pct25','a_median','a_pct75','a_pct90']].apply(median_maker)
row = [title,location,group_med_emp,group_mean,group_pct10,group_pct25, group_median, group_pct75, group_pct90]
#convert it all to strings so we can combine them all when writing to file
row_string = [str(x) for x in row]
return row_string
except:
# if it doesnt work for a particular title/state just throw it out, there are enough to make this insignificant
'do nothing'
这里发生了神奇的事情:
#runs the function and puts the answers in the queue
def worker(row, q):
ans = job_title_location_matcher(row[0],row[1])
q.put(ans)
# this writes to the file while there are still things that could be in the queue
# this allows for multiple processes to write to the same file without blocking eachother
def listener(q):
f = open(filename,'wb')
while 1:
m = q.get()
if m =='kill':
break
f.write(','.join(m) + 'n')
f.flush()
f.close()
def main():
#load all your data, then throw out all unnecessary tables/columns
filename = 'skill_TEST_POOL.txt'
#sets up the necessary multiprocessing tasks
manager = Manager()
q = manager.Queue()
pool = Pool(cpu_count() + 2)
watcher = pool.map_async(listener,(q,))
jobs = []
#titles_states is a dataframe of millions of job titles and states they were found in
for i in titles_states.iloc:
job = pool.map_async(worker, (i, q))
jobs.append(job)
for job in jobs:
job.get()
q.put('kill')
pool.close()
pool.join()
if __name__ == "__main__":
main()
由于每个数据帧的大小都不同(总共约有100Gb),所以将所有数据都放入内存是不可能的。通过将最终的数据帧逐行写入内存,但从来不在内存中存储完整的数据帧。我们可以完成所有的计算和组合任务。这里的“标准方法”是,我们可以仅仅在“job_title_location_matcher”的末尾编写一个“write_line”方法,但这样每次只会处理一个实例。根据我们需要处理的职位/州的数量,这大概需要2天的时间。而通过multiprocessing,只需2个小时。
虽然读者可能接触不到本教程处理的任务环境,但通过multiprocessing,可以突破许多计算机硬件的限制。本例的工作环境是c3.8xl ubuntu ec2,硬件为32核60Gb内存(虽然这个内存很大,但还是无法一次性放入所有数据)。这里的关键之处是我们在60Gb的内存的机器上有效的处理了约100Gb的数据,同时速度提升了约25倍。通过multiprocessing在多核机器上自动处理大规模的进程,可以有效提高机器的利用率。也许有些读者已经知道了这个方法,但对于其他人,可以通过multiprocessing能带来非常大的收益。
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
在数据可视化实践中,数据系列与数据标签的混淆是导致图表失效的高频问题——将数据标签的样式调整等同于数据系列的维度优化,或 ...
2025-11-21在数据可视化领域,“静态报表无法展现数据的时间变化与维度关联”是长期痛点——当业务人员需要分析“不同年份的区域销售趋势” ...
2025-11-21在企业战略决策的场景中,“PESTEL分析”“波特五力模型”等经典方法常被提及,但很多时候却陷入“定性描述多、数据支撑少”的困 ...
2025-11-21在企业数字化转型过程中,“业务模型”与“数据模型”常被同时提及,却也频繁被混淆——业务团队口中的“用户增长模型”聚焦“如 ...
2025-11-20在游戏行业“高获客成本、低留存率”的痛点下,“提前预测用户流失并精准召回”成为运营核心命题。而用户流失并非突发行为——从 ...
2025-11-20在商业数据分析领域,“懂理论、会工具”只是入门门槛,真正的核心竞争力在于“实践落地能力”——很多分析师能写出规范的SQL、 ...
2025-11-20在数据可视化领域,树状图(Tree Diagram)是呈现层级结构数据的核心工具——无论是电商商品分类、企业组织架构,还是数据挖掘中 ...
2025-11-17核心结论:“分析前一天浏览与第二天下单的概率提升”属于数据挖掘中的关联规则挖掘(含序列模式挖掘) 技术——它聚焦“时间序 ...
2025-11-17在数据驱动成为企业核心竞争力的今天,很多企业陷入“数据多但用不好”的困境:营销部门要做用户转化分析却拿不到精准数据,运营 ...
2025-11-17在使用Excel透视表进行数据汇总分析时,我们常遇到“需通过两个字段相乘得到关键指标”的场景——比如“单价×数量=金额”“销量 ...
2025-11-14在测试环境搭建、数据验证等场景中,经常需要将UAT(用户验收测试)环境的表数据同步到SIT(系统集成测试)环境,且两者表结构完 ...
2025-11-14在数据驱动的企业中,常有这样的困境:分析师提交的“万字数据报告”被束之高阁,而一张简洁的“复购率趋势图+核心策略标注”却 ...
2025-11-14在实证研究中,层次回归分析是探究“不同变量组对因变量的增量解释力”的核心方法——通过分步骤引入自变量(如先引入人口统计学 ...
2025-11-13在实时数据分析、实时业务监控等场景中,“数据新鲜度”直接决定业务价值——当电商平台需要实时统计秒杀订单量、金融系统需要实 ...
2025-11-13在数据量爆炸式增长的今天,企业对数据分析的需求已从“有没有”升级为“好不好”——不少团队陷入“数据堆砌却无洞察”“分析结 ...
2025-11-13在主成分分析(PCA)、因子分析等降维方法中,“成分得分系数矩阵” 与 “载荷矩阵” 是两个高频出现但极易混淆的核心矩阵 —— ...
2025-11-12大数据早已不是单纯的技术概念,而是渗透各行业的核心生产力。但同样是拥抱大数据,零售企业的推荐系统、制造企业的设备维护、金 ...
2025-11-12在数据驱动的时代,“数据分析” 已成为企业决策的核心支撑,但很多人对其认知仍停留在 “用 Excel 做报表”“写 SQL 查数据” ...
2025-11-12金融统计不是单纯的 “数据计算”,而是贯穿金融业务全流程的 “风险量化工具”—— 从信贷审批中的客户风险评估,到投资组合的 ...
2025-11-11这个问题很有实战价值,mtcars 数据集是多元线性回归的经典案例,通过它能清晰展现 “多变量影响分析” 的核心逻辑。核心结论是 ...
2025-11-11