京公网安备 11010802034615号
经营许可证编号:京B2-20210330
使用Python操作Elasticsearch数据索引的教程
Elasticsearch是一个分布式、Restful的搜索及分析服务器,Apache Solr一样,它也是基于Lucence的索引服务器,但我认为Elasticsearch对比Solr的优点在于:
轻量级:安装启动方便,下载文件之后一条命令就可以启动;
Schema free:可以向服务器提交任意结构的JSON对象,Solr中使用schema.xml指定了索引结构;
多索引文件支持:使用不同的index参数就能创建另一个索引文件,Solr中需要另行配置;
分布式:Solr Cloud的配置比较复杂。
环境搭建
启动Elasticsearch,访问端口在9200,通过浏览器可以查看到返回的JSON数据,Elasticsearch提交和返回的数据格式都是JSON.
>> bin/elasticsearch -f
安装官方提供的Python API,在OS X上安装后出现一些Python运行错误,是因为setuptools版本太旧引起的,删除重装后恢复正常。
>> pip install elasticsearch
索引操作
对于单条索引,可以调用create或index方法。
from datetime import datetime
from elasticsearch import Elasticsearch
es = Elasticsearch() #create a localhost server connection, or Elasticsearch("ip")
es.create(index="test-index", doc_type="test-type", id=1,
body={"any":"data", "timestamp": datetime.now()})
Elasticsearch批量索引的命令是bulk,目前Python API的文档示例较少,花了不少时间阅读源代码才弄清楚批量索引的提交格式。
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch("10.18.13.3")
j = 0
count = int(df[0].count())
actions = []
while (j < count):
action = {
"_index": "tickets-index",
"_type": "tickets",
"_id": j + 1,
"_source": {
"crawaldate":df[0][j],
"flight":df[1][j],
"price":float(df[2][j]),
"discount":float(df[3][j]),
"date":df[4][j],
"takeoff":df[5][j],
"land":df[6][j],
"source":df[7][j],
"timestamp": datetime.now()}
}
actions.append(action)
j += 1
if (len(actions) == 500000):
helpers.bulk(es, actions)
del actions[0:len(actions)]
if (len(actions) > 0):
helpers.bulk(es, actions)
del actions[0:len(actions)]
在这里发现Python API序列化JSON时对数据类型支撑比较有限,原始数据使用的NumPy.Int32必须转换为int才能索引。此外,现在的bulk操作默认是每次提交500条数据,我修改为5000甚至50000进行测试,会有索引不成功的情况。
#helpers.py source code
def streaming_bulk(client, actions, chunk_size=500, raise_on_error=False,
expand_action_callback=expand_action, **kwargs):
actions = map(expand_action_callback, actions)
# if raise on error is set, we need to collect errors per chunk before raising them
errors = []
while True:
chunk = islice(actions, chunk_size)
bulk_actions = []
for action, data in chunk:
bulk_actions.append(action)
if data is not None:
bulk_actions.append(data)
if not bulk_actions:
return
def bulk(client, actions, stats_only=False, **kwargs):
success, failed = 0, 0
# list of errors to be collected is not stats_only
errors = []
for ok, item in streaming_bulk(client, actions, **kwargs):
# go through request-reponse pairs and detect failures
if not ok:
if not stats_only:
errors.append(item)
failed += 1
else:
success += 1
return success, failed if stats_only else errors
对于索引的批量删除和更新操作,对应的文档格式如下,更新文档中的doc节点是必须的。
{
'_op_type': 'delete',
'_index': 'index-name',
'_type': 'document',
'_id': 42,
}
{
'_op_type': 'update',
'_index': 'index-name',
'_type': 'document',
'_id': 42,
'doc': {'question': 'The life, universe and everything.'}
}
常见错误
SerializationError:JSON数据序列化出错,通常是因为不支持某个节点值的数据类型
RequestError:提交数据格式不正确
ConflictError:索引ID冲突
TransportError:连接无法建立
性能

上面是使用MongoDB和Elasticsearch存储相同数据的对比,虽然服务器和操作方式都不完全相同,但可以看出数据库对批量写入还是比索引服务器更具备优势。
Elasticsearch的索引文件是自动分块,达到千万级数据对写入速度也没有影响。但在达到磁盘空间上限时,Elasticsearch出现了文件合并错误,并且大量丢失数据(共丢了100多万条),停止客户端写入后,服务器也无法自动恢复,必须手动停止。在生产环境中这点比较致命,尤其是使用非Java客户端,似乎无法在客户端获取到服务端的Java异常,这使得程序员必须很小心地处理服务端的返回信息。
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
在数字化商业环境中,数据已成为企业优化运营、抢占市场、规避风险的核心资产。但商业数据分析绝非“堆砌数据、生成报表”的简单 ...
2026-01-20定量报告的核心价值是传递数据洞察,但密密麻麻的表格、复杂的计算公式、晦涩的数值罗列,往往让读者望而却步,导致核心信息被淹 ...
2026-01-20在CDA(Certified Data Analyst)数据分析师的工作场景中,“精准分类与回归预测”是高频核心需求——比如预测用户是否流失、判 ...
2026-01-20在建筑工程造价工作中,清单汇总分类是核心环节之一,尤其是针对楼梯、楼梯间这类包含多个分项工程(如混凝土浇筑、钢筋制作、扶 ...
2026-01-19数据清洗是数据分析的“前置必修课”,其核心目标是剔除无效信息、修正错误数据,让原始数据具备准确性、一致性与可用性。在实际 ...
2026-01-19在CDA(Certified Data Analyst)数据分析师的日常工作中,常面临“无标签高维数据难以归类、群体规律模糊”的痛点——比如海量 ...
2026-01-19在数据仓库与数据分析体系中,维度表与事实表是构建结构化数据模型的核心组件,二者如同“骨架”与“血肉”,协同支撑起各类业务 ...
2026-01-16在游戏行业“存量竞争”的当下,玩家留存率直接决定游戏的生命周期与商业价值。一款游戏即便拥有出色的画面与玩法,若无法精准识 ...
2026-01-16为配合CDA考试中心的 2025 版 CDA Level III 认证新大纲落地,CDA 网校正式推出新大纲更新后的第一套官方模拟题。该模拟题严格遵 ...
2026-01-16在数据驱动决策的时代,数据分析已成为企业运营、产品优化、业务增长的核心工具。但实际工作中,很多数据分析项目看似流程完整, ...
2026-01-15在CDA(Certified Data Analyst)数据分析师的日常工作中,“高维数据处理”是高频痛点——比如用户画像包含“浏览次数、停留时 ...
2026-01-15在教育测量与评价领域,百分制考试成绩的分布规律是评估教学效果、优化命题设计的核心依据,而正态分布则是其中最具代表性的分布 ...
2026-01-15在用户从“接触产品”到“完成核心目标”的全链路中,流失是必然存在的——电商用户可能“浏览商品却未下单”,APP新用户可能“ ...
2026-01-14在产品增长的核心指标体系中,次日留存率是当之无愧的“入门级关键指标”——它直接反映用户对产品的首次体验反馈,是判断产品是 ...
2026-01-14在CDA(Certified Data Analyst)数据分析师的业务实操中,“分类预测”是高频核心需求——比如“预测用户是否会购买商品”“判 ...
2026-01-14在数字化时代,用户的每一次操作——无论是电商平台的“浏览-加购-下单”、APP的“登录-点击-留存”,还是金融产品的“注册-实名 ...
2026-01-13在数据驱动决策的时代,“数据质量决定分析价值”已成为行业共识。数据库、日志系统、第三方平台等渠道采集的原始数据,往往存在 ...
2026-01-13在CDA(Certified Data Analyst)数据分析师的核心能力体系中,“通过数据建立模型、实现预测与归因”是进阶关键——比如“预测 ...
2026-01-13在企业数字化转型过程中,业务模型与数据模型是两大核心支撑体系:业务模型承载“业务应该如何运转”的逻辑,数据模型解决“数据 ...
2026-01-12当前手游市场进入存量竞争时代,“拉新难、留存更难”成为行业普遍痛点。对于手游产品而言,用户留存率不仅直接决定产品的生命周 ...
2026-01-12