在开发 Python 应用时,经常会使用到 Jupyter 来完成 Python 应用的开发及调试。简而言之,Jupyter Notebook 是以网页的形式打开,可以在网页页面中直接编写代码和运行代码,代码的运行结果也会直接在代码块下显示。如在编程过程中需要编写说明文档,可在同一个页面中直接编写,便于作及时的说明和解释。在今天的文章中,我将使用 Jupyter 来进行展示。
今天我就一个简单的例子来进行展示如何使用 Python 语言导入一个 CSV 文件 。这个 CSV 的文件很简单,但是我们通过这个文件来展示如何在 Python 中使用相应的 API 来导入数据。下载的 addresses.csv 文件很简单:
addresses.csv
id,firstname,surname,address,city,state,postcode 1,John,Doe,120 jefferson st.,Riverside,NJ,08075 2,Jack,McGinnis,220 hobo Av.,Phila,PA,09119 3,John Da Man,Repici,120 Jefferson St.,Riverside,NJ,08075 4,Stephen,Tyler,7452 Terrace At the Plaz road,SomeTown,SD,91234 5,Joan the bone,Anne,Jet 9th at Terrace plc,Desert City,CO,00123
我们在自己的电脑上创建一个叫做 py-elasticsearch 的目录,并把 addresses.csv 文件拷贝到这个文件夹中。我们在这个目录中启动 jupyter:
$ pwd /Users/liuxg/python/py-elasticsearch $ ls addresses.csv
我们需要安装如下的部分:
你可以参照文章 “Elastic:菜鸟上手指南” 来根据自己的操作系统来安装自己的 Elasticsearch 及 Kibana。你需要启动 Elasticsearch 及 Kibana。
你需要按照 Jupyter 来创建 notebook。请根据自己的操作系统安装相应的软件。
你可以安装最新的 Python 来进行实践。在我的电脑上,我安装的版本是 3.8.5。
$ jupyter notebook
这样就创建了我们的一个 jupyter notebook。我们创建一个叫做 py-elasticsearch 的 notebook:
我们可以在命令前面添加 !来运行 SHELL 指令。上面显示我们的 python 版本信息。
我们接下来需要在自己的电脑上安装相应的模块:
pip3 install elasticsearch pip3 panda
我们接下来输入如下的代码:
try: import os import sys import elasticsearch from elasticsearch import Elasticsearch import pandas as pd print("All Modules Loaded ! ") except Exception as e: print("Some Modules are Missing {}".format(e))
你可以使用 SHIFT + ENTER 来执行代码。上面显示所有的模块都已经被装载了。如果你没有看到上面的消息,你需要安装相应的模块。
我们接下来创建一个函数来连接 Elasticsearch:
def connect_elasticsearch(): es = None es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) if es.ping(): print('Yupiee Connected ') else: print('Awww it could not connect!') return es es = connect_elasticsearch() es.ping()
上面代码显示我们已经成功地连接到 Elasticsearch。接下来我们来创建一个叫做 liuxg-test 的 index:
es.indices.create(index="liuxg-test", ignore=400)
我们可以到 Kibana 中查看是否有一个叫做 liuxg-test 的 index 已经被创建:
GET _cat/indices/liuxg-test
我们接下来显示所有的索引:
res = es.indices.get_alias("*") for name in res: print(name)
接下来,我们来删除上面我们已经创建的 liuxg-test 索引:
es.indices.delete(index="liuxg-test", ignore=[400,404])
上面显示已经成功。我们可以去 Kibana,并再次查询 liuxg-test 索引:
GET _cat/indices/liuxg-test
显然这次,我们没有看到 liuxg-test 这个索引。它表明我们的索引已经被删除了。
我们接下来导入两个文档到 Elasticsearch 中去:
e1 = { "first_name":"nitin", "last_name":"panwar", "age": 27, "about": "Love to play cricket", "interests": ['sports','music'], } e2 = { "first_name" : "Jane", "last_name" : "Smith", "age" : 32, "about" : "I like to collect rock albums", "interests": [ "music" ] }
es.indices.create(index='people', ignore=400) res1 = es.index(index='people', doc_type='_doc', body=e1, id=1) res2 = es.index(index='people', doc_type='_doc', body=e2, id=2)
在上面,我们创建了一个叫做 people 的索引,并把两个文档 e1 及 e2 导入到 Elasticsearch 中:
我们可以到 Kibana 中查看所以 people 的文档:
GET people/_search
我们可以清楚地看到有两个文档被成功地导入到 Elasticsearch 中。
我们接下来删除一个文档:
res = es.delete(index='people', doc_type='_doc', id=1)
我们到 Kibana 中去查看 id 为1 的文档:
GET people/_doc/1
上面的命令显示该文档不存在。
接下来,我们来搜索所有的文档:
res = es.search(index = 'megacorp', body = {'query': {"match_all": {}} } )
在很多的时候,我们需要定义一个索引的 settings 及 mapping。我们可以按照如下的调用来完成:
settings = { "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "dynamic": "true", "_source": { "enabled": "true" }, "properties": { "title": { "type": "text" }, "name": { "type": "text" } } } } indexName = 'liuxg-with-mapping' es.indices.create(index=indexName, ignore=[400,404], body=settings)
我们可以在 Kibana 中进行查看:
GET liuxg-with-mapping
接下来,我们来进行我们的正题。我们重新创建一个叫做 csv-elasticsearch 的 Notebook:
我们打入如下的命令来装载所有的模块:
try: import elasticsearch from elasticsearch import Elasticsearch import pandas as pd import json from ast import literal_eval from tqdm import tqdm import datetime import os import sys import numpy as np from elasticsearch import helpers print("Loaded ............") except Exception as E: print("Some Modules are Missing{}".format(e))
如果你看到 Loaded,则表明所有的模块都装载正确。否则,你需要安装相应的模块。接下来,我们确保我们之前的 addresses.csv 位于我们的 Jupyter 启动的目录里:
for name in os.listdir(): print(name)
我们接下来尝试阅读这个 csv 文件。
df = pd.read_csv("addresses.csv") df.head(2)
显然 Pandas 很方便地让我们读入我们的数据。上面显示我们有5个文档,有7列。在下面的代码中,我们将把文档中的 id 当做 Elasticsearch 文档中的 id。这个 id 是唯一的。我们来创建连接到 Elasticsearch 的实例:
ENDPOINT = "http://localhost:9200" es = Elasticsearch(timeout=600, hosts = ENDPOINT) es.ping()
在上面,我们对数据做了简单的清洗。对于确实任何一个项的文档,我们直接去掉。接下来,我们把 df 中的数据转换为 Elasticsearch 可以理解的格式:
df2 = df.to_dict('records')
我们需要创建一个 generator 来把数据写入到 Elasticsearch:
def generator(df2): for c, line in enumerate(df2): try: yield { '_index': "addresses", '_type': '_doc', '_id': line.get("id", None), '_source': { "firstname":line.get("firstname", ""), "surname":line.get("surname", ""), "address":line.get("address", ""), "city":line.get("city", ""), "state":line.get("state", ""), "postcode":line.get("postcode", "") } } except StopIteration: return
接下来,我们使用 helper 来导入数据:
try: res = helper.bulk(es, generator(df2)) print("Working") except Exception as e: pass
我们到 Kibana 中查看 addresses 索引:
GET _cat/indices/addresses?v
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size yellow open addresses eQCjFfWQTIyMR2D4eCyv-g 1 1 5 5 17.4kb 17.4kb
上面显示有四个文档。我们最早的 CSV 文档中有5个文档,这是因为我们在进行 helper.bulk 之前,已经调用过 next(my) 一次。
我们可以使用如下的命令来进行查询:
更多关于如何使用 Python 导入数据到 Elasticsearch 的介绍,可以参考文章 “Elasticsearch:Elasticsearch 开发入门 - Python”。