相对关系型数据库,我们知道 Elasticsearch 有很多优点:高性能、可扩展、近实时搜索、支持大数据量的数据分析。然后它不是万能的,他并没有对处理索引实体之间的关系给出很好的解决方法,不像关系型数据库那样使用范式来规范你的数据。所以如何更好的在 Elasticsearch 中进行数据建模非常重要。
我们以「电影」和「演员」的的关系举例来说明 Elasticsearch 中如何实现关系管理,一部电影中会存在多个演员,我们可以通过以下几种方式实现这种关系数据的管理:
movie 有两个属性:title,actors actor 有两个属性:first_name, last_name 复制代码
PUT my_movies { "mappings" : { "properties" : { "actors" : { "properties" : { "first_name" : {"type" : "keyword"}, "last_name" : {"type" : "keyword"} } }, "title" : { "type" : "text", "fields" : { "keyword" : {"type" : "keyword","ignore_above" : 256} } } } } } 复制代码
PUT my_movies { "mappings" : { "properties" : { "actors" : { "type": "nested", //指定 actors 是一个 Nested 对象,默认情况下type="object" "properties" : { "first_name" : {"type" : "keyword"}, "last_name" : {"type" : "keyword"} }}, "title" : { "type" : "text", "fields" : {"keyword":{"type":"keyword","ignore_above":256}} } } } } 复制代码
//对于查询「actor.first_name = a」 且 「actor.last_name = b」演员的电影就不会出现问题 POST my_movies/_search { "query": { "nested": { //嵌套对象被隐藏在独立的文档中,查询时必须使用 nested 查询,否则无法查询到 "path": "actors", //必须指定 path,因为一个索引中可能存在多个 nest 字段 "query": { "bool": { "must": [ {"match": {"actors.first_name": "a"}}, {"match": {"actors.last_name": "b"}} ] } } } } } 复制代码
index.mapping.nested_fields.limit:设置每个 nested 对象中可以拥有字段个数上限,默认是 50 index.mapping.nested_objects.limit:设置每个文档可以拥有的 nested 对象最大个数,默认是 10000 复制代码
PUT my_movies { "mappings": { "properties": { "movie_comments_relation": { //属于 my_movies 的一个字段,该字段是 join 类型用于指定父子文档的关系 "type": "join", //指明 join 类型 "relations": { //声明 Parent/Child 的关系 "movie": "actor" //movie 是 Parent 的名称, actor 是 Child 的名称 } }, "content": {"type": "text"}, "title": {"type": "keyword"} } } } 复制代码
- 更新父文档时,不需要重新索引对应的子文档 - 创建、修改、删除子文档,不会影响父文档和其它子文档,适用于对于子文档较多或者更新频率较频繁的场景 - 子文档可以作为搜索结果独立返回 复制代码
#索引 ID=movie1 的父文档 PUT my_movies/_doc/movie1 { "title":"黑客帝国", "blog_comments_relation":{ //对于父文档的创建这里也可以直接缩略写成 "blog_comments_relation": "movie" "name":"movie" //通过 blog_comments_relation.name = movie 来指定这是在创建一个父文档 } } #索引子文档 PUT my_movies/_doc/actor?routing=movie1 //加上 routing 为了让父文档和子文档索引在同一个分片上,确保查询 join 的性能,routing 参数必须传 { "first_name":"Jack", "last_name":"Moble", "blog_comments_relation":{ "name":"actor", //指定当前索引是一个子文档 "parent":"movie1" //他的父文档的 ID 是 movie1 } } 复制代码
# 直接根据子文档的 ID 是拿不到子文档的信息的,必须添加 routing 参数,指定他对应的父文档的 ID # 根据 Parent Id 查询父文档对应的子文档 POST my_movies/_search { "query": { "parent_id": { "type": "movie", "id": "movie1" } } } # Has Child 查询,根据子文档的一些信息,返回所属于的父文档 POST my_movies/_search { "query": { "has_child": { "type": "actor", "query" : {//查询子文档的 first_name 等于 "Jack" 的所有父文档 "match": {"first_name" : "Jack"} } } } } # Has Parent 查询,根据父文档的一些信息,返回相关的子文档信息 POST my_movies/_search { "query": { "has_parent": { "parent_type": "movie", "query" : { "match": {"title" : "Learning Hadoop"} } } } } 复制代码
模型的扩展性和稳定性非常重要,如果没有定义好,后期随着需求的变动,可能会出现频繁的索引重建问题,那么什么情况下需要重建索引呢?
- Mapping 信息发生变更:字段类型、分词器、字典更新等 - Setting 信息发生变更:主分片数变更等 - 集群内、集群间的数据迁移 复制代码
Elasticsearch 提供了 Update By Query 和 Reindex 两种方式进行索引的更新和重建:
//将 Dynamic 属性设置为 false,表示 mapping 信息不会动态更改,即使新增加了一个字段,也不会被索引,仅仅存储在 _source 中 PUT test { "mappings": { "dynamic": false, "properties": { "text": {"type": "text"} } } } // 新增加的字段 flag 不会被索引查询 POST test/_doc?refresh { "text": "words words", "flag": "bar" } // 可以通过 _update_by_query 进行索引重建,使 flag 字段可以被索引到 POST test/_update_by_query?refresh&conflicts=proceed 复制代码
- 使用 Update By Query 进行文档更新时会首先做一个快照并记录下版本号,如果在更新过程中有新的数据插入就会引起版本冲突 - 默认情况下,如果有一个文档在更新时有版本冲突,那么整个更新就会失败,但是已经更新的文档无法回退 - 可以将参数 conficts 设置为 proceed,在更新文档时遇到版本冲突不会中止更新 复制代码
POST twitter,blog/_update_by_query 复制代码
POST twitter/_update_by_query?routing=1 复制代码
POST twitter/_update_by_query?scroll_size=100 复制代码
PUT _ingest/pipeline/set-foo { "description" : "sets foo", "processors" : [ { "set" : { "field": "foo", "value": "bar" } } ] } POST twitter/_update_by_query?pipeline=set-foo 复制代码
# 通过 wait_for_completion = false 设置异步更新,此时会返回一个 taskId POST twitter/_update_by_query?wait_for_completion=false # 通过 taskId 可以直接获取到更新进度 GET /_tasks/r1A2WoRbTwKZ516z6NEs5A:36619 复制代码
ES 不允许在原有 Mapping 上对已有数据的字段类型进行修改,只能重新创建的新索引,然后再设置正确的字段类型,再重新导入数据,这个时候就需要用到 ReIndex API。
# 和 Update By Query 一样,可以通过参数 wait_for_completion=false 异步的方式获取进度 # 和 Update By Query 一样,可以通过参数 conflicts=proceed 来控制遇到版本冲突继续执行 POST _reindex?wait_for_completion=false&conflicts=proceed { "source": { "index": "blogs" }, "dest": { "index": "blogs_fix", "op_type": "create" //如果 dest 中文档存在可能会导致版本冲突,这时可以加 op_type = create,表示只有文档不存在的时候才会写入 } } 复制代码
- 修改索引的主分片数 - 改变字段中的 Mapping 字段类型 - 集群内数据迁移,跨集群数据迁移 复制代码
# 目标源需要添加白名单,表示允许访问的地址:reindex.remote.whitelist: "otherhost:9200" POST _reindex { "source": { "remote": { "host": "http://otherhost:9200", //集群的地址 "username": "user", "password": "pass" }, "index": "source", "query": { // test 字段 是 data 的文档都重建索引 "match": { "test": "data" } } }, "dest": { "index": "dest" } } 复制代码
POST _reindex { "max_docs": 1, "source": { "index": "twitter" }, "dest": { "index": "new_twitter" } } 复制代码
POST _reindex { "source": { "index": ["twitter", "blog"] }, "dest": { "index": "all_together" } } 复制代码
POST _reindex { "source": { "index": "twitter", "_source": ["user", "_doc"] # 只重建每个文档的 user 和 _doc 字段 }, "dest": { "index": "new_twitter" } } 复制代码
POST _reindex { "source": { "index": "twitter" }, "dest": { "index": "new_twitter", "version_type": "external" }, "script": { "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}", "lang": "painless" } } 复制代码
在前面的文章 Elasticsearch 分布式原理以及相关读写逻辑 中,我们了解了 Elasticsearch 中节点的分类,其实还有一类节点叫 Ingest Pipeline Node:
- Split Processor:将字符串值分成一个数组 - Remove/Rename Processor: 移除一个重命名字段 - Append:新增加一个字段 - Convert:数据类型转换 - Date/JSON: 日期格式转换 - Date Index Name Processor: 将通过该处理器的文档,分配到指定时间格式的索引中 - Fail Processor: 异常处理 - Foreach Proccesor: 对每个数组里面的字段进行处理 - Grok Processor:日志的日期格式切割 - Gsub / Join / Split: 字符串替换、数组转字符串,字符串转数组 - Lowercase / Upcase: 大小写转换 复制代码
# 通过 _ingest/pipeline/_simulate 接口模拟检查 Processor 工作是否正常 POST _ingest/pipeline/_simulate { "pipeline": { "description": "to split blog tags", "processors": [ #可以定义多个 Processor { "split": { # 使用 split Processor "field": "tags", # 对字段 field 字段进行预处理 "separator": "," # 按照逗号进行切分 } } ] }, "docs": [ # 要处理的文档 { "_index": "index", "_id": "id", "_source": { "title": "Introducing big data......", "tags": "hadoop,elasticsearch,spark", "content": "You konw, for big data" } }, { "_index": "index", "_id": "idxx", "_source": { "title": "Introducing cloud computering", "tags": "openstack,k8s", "content": "You konw, for cloud" } } ] } # 新生成一个名字叫 blog_pipeline 的 Pipeline PUT _ingest/pipeline/blog_pipeline { "description": "a blog pipeline", "processors": [ # 一个 Pipeline 可以有多个 processor, 管道处理 { "split": { "field": "tags", "separator": "," } }, { "set":{ "field": "views", "value": 0 } } ] } # 测试该 pipeline 是否可以正常使用 POST _ingest/pipeline/blog_pipeline/_simulate { "docs": [ { "_source": { "title": "Introducing cloud computering", "tags": "openstack,k8s", "content": "You konw, for cloud" } } ] } # 使用 pipeline 更新数据 PUT tech_blogs/_doc/2?pipeline=blog_pipeline { "title": "Introducing cloud computering", "tags": "openstack,k8s", "content": "You konw, for cloud" } # 使用 blog_pipeline 在 update_by_query 时对数据进行更新操作,只更新哪些没有 field = views 文档 POST tech_blogs/_update_by_query?pipeline=blog_pipeline { "query": { "bool": { "must_not": { "exists": { "field": "views" } } } } } 复制代码
Logstash | Ingest Node | |
---|---|---|
数据输入和输出 | 支持从不同数据源读取写入不同数据源 | 只能从 ES REST API 获取数据并写入 ES |
数据缓存 | 实现了简单的数据队列,支持重写 | 不支持缓存 |
数据处理 | 支持大量的插件,也支持定制化开发 | 支持内置插件和定制化开发 |
配置和使用 | 需要独立部署,增加了一定的架构复杂度 | 无需额外部署 |
- 更新、删除、数据聚合等操作 - 对返回的字段进行计算 - 对文档的算分进行处理 - 在 Ingest Pipeline 中执行脚本 - 在 Reindex API, Update By Query 中对数据进行处理 复制代码
# 保存脚本在 Cluster State POST _scripts/update_views { "script":{ "lang": "painless", "source": "ctx._source.views += params.new_views" } } 复制代码
- Inline Scripts 和 Store Scripts 都会被缓存 - 默认缓存 100 个脚本 - script.cache.max_size 设置最大缓存数 - script.cache.expire 设置缓存超时时间 - script.max_compilations_rate: 默认 5 分钟最多执行 75 次编译 复制代码
另外如果进一步了解 Painness Script 请移步 官方文档
将数据存储在 ES 中主要有「search」 和 「retrieve」两个用途:
其中 「search」 可以通过倒排索引实现全文的检索功能,而 「retrieve」则需要通过 store field 或者 _source 来实现。
- update,update_by_query, reindex 相关的 API - 高亮功能 - 搜索的时候获取不到原始的 JSON 数据 复制代码
PUT logs { "mappings": { "_source": { "includes": [ "*.count", "meta.*" ], "excludes": [ "meta.description", "meta.other.*" ] } } } 复制代码
# 首先会解析整个 _source,然后抽取出部分字段返回 GET /_search { "_source": [ "obj1.*", "obj2.*" ], "query" : { "term" : { "user" : "kimchy" } } } 复制代码
PUT my_index { "mappings": { "properties": { "title": { "type": "text", "store": true }, "date": { "type": "date", "store": true }, "content": { "type": "text" } } } } 复制代码
GET my_index/_search { "stored_fields": [ "title", "date" ] } 复制代码
建模是对真实世界抽取描述的一种工具和方法,上面我们介绍了在 Elasticsearch 中建模用到的一些概念和工具,这里我们总结下在建模过程中需要考虑和注意哪些细节。
PUT /employees/ { "mappings" : { "properties" : { "age" : { "type" : "integer" }, "job" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", # text 类型加一个子类型 keyword "ignore_above" : 50 } } }, } } } # 通过子字段 keword 可以按照整个文本进行聚合 POST employees/_search { "size": 0, "aggs": { "jobs": { "terms": { "field":"job.keyword" } } } } # 通过子字段 keyword 可以对文本进行精确匹配搜索 POST /employees/_search { "query": { "term": { "job.keyword": { "value": "XHDK-A-1293-#fJ3" } } } } 复制代码
1)业务不容易维护 2)Mapping 信息保存在 Cluster State 中,会对集群性能有影响 3)删除和修改数据需要 Reindex 复制代码
PUT softwares/ { "mappings": { "_meta": { "software_version_mapping": "1.0" } } } 复制代码