MySql教程

Logstash同步MySQL关联表到Elasticsearch的嵌套文档中

本文主要是介绍Logstash同步MySQL关联表到Elasticsearch的嵌套文档中,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

前言:

         上一篇实践了通过Logstash同步MySQL的几张关联表到Elasticsearch中。为了实现同一种业务需求,嵌套文档在资源开销和查询速度上要优于父子文档(针对少量数据的情况)。所以以下就实践一下嵌套文档的基本使用和,以及Logstash如何同步一对多关系表到ElasticSearch的嵌套文档中。

 

RESTful模拟:

       以下以博客内容和博客评论为例,从映射创建,到增,删,改,查,聚合演示嵌套文档的使用方法,索引名 “blog_new”。

1. 创建映射

PUT blog_new
{ "mappings": {    "properties": {        "title": {          "type": "text"
        },        "body": {          "type": "text"
        },        "tags": {          "type": "keyword"
        },        "published_on": {          "type": "keyword"
        },        "comments": {          "type": "nested",          "properties": {            "name": {              "type": "text"
            },            "comment": {              "type": "text"
            },            "age": {              "type": "short"
            },            "rating": {              "type": "short"
            },            "commented_on": {              "type": "text"
            }
          }
        }
      }
  }
}

2. 添加

POST blog_new/blog/2{  "title": "Hero",  "body": "Hero test body...",  "tags": ["Heros", "happy"],  "published_on": "6 Oct 2018",  "comments": [
    {      "name": "steve",      "age": 24,      "rating": 18,      "comment": "Nice article..",      "commented_on": "3 Nov 2018"
    }
  ]
}

3.  删除

POST  blog_new/blog/1/_update
{ "script": {    "lang": "painless",    "source": "ctx._source.comments.removeIf(it -> it.name == 'John');"
 }
}

4. 修改

POST blog_new/blog/2/_update
{  "script": {    "source": "for(e in ctx._source.comments){if (e.name == 'steve') {e.age = 25; e.comment= 'very very good article...';}}" 
  }
}

5. 查询

GET /blog_new/_search?pretty
{  "query": {    "bool": {      "must": [
        {          "nested": {            "path": "comments",            "query": {              "bool": {                "must": [
                  {                    "match": {                      "comments.name": "William"
                    }
                  },
                  {                    "match": {                      "comments.age": 34
                    }
                  }
                ]
              }
            }
          }
        }
      ]
    }
  }
}

6. 聚合

GET blog_new/_search
{  "size": 0,  "aggs": {    "comm_aggs": {      "nested": {        "path": "comments"
      },      "aggs": {        "min_age": {          "min": {            "field": "comments.age"
          }
        }
      }
    }
  }
}

 

Logstash同步:

        同步到ES的嵌套文档和前面的父子文档就有点不一样了,这里只需要一个jdbc。合并主要是通过关联查询出结果,然后聚合导入到ElasticSearch中。以下还是以博客和评论为例,创建索引映射和其他MySQL表之类的就省略,直接看运行命令。

1. 创建嵌套文档索引和映射

可以用上面RESTful方式的映射创建进行修改,主要的是嵌套的类型是nested,执行配置前运行SQL查询效果如下。

https://img2.sycdn.imooc.com/645f033f0001207308440700.jpg

2. 配置同步代码

input {

    stdin {}
	
    jdbc {
        jdbc_driver_library => "E:/2setsoft/1dev/logstash-7.8.0/mysqletc/mysql-connector-java-5.1.7-bin.jar"
		
        jdbc_driver_class => "com.mysql.jdbc.Driver"
		
        jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/community?characterEncoding=UTF-8&useSSL=false"
		
        jdbc_user => root
		
        jdbc_password => "root"
		
        schedule => "*/1 * * * *"
       
        statement => "SELECT community.id AS community_id, community.content, community.location, community.images, comment.content AS comment_content , comment.id AS comment_id FROM yiqi_comment comment LEFT JOIN yiqi_community community ON community.id = comment.community_id"
   }
 
}
 
filter {
	
	aggregate {
        task_id => "%{community_id}"
        code => "
            map['id'] = event.get('community_id')
            map['content'] = event.get('content')
            map['location'] = event.get('location')
            map['images'] = event.get('images')
            map['comment_list'] ||=[]
            map['comment'] ||=[]
            if (event.get('comment_id') != nil)
                if !(map['comment_list'].include? event.get('comment_id'))  
                    map['comment_list'] << event.get('comment_id')        
                    map['comment'] << {
                        'comment_id' => event.get('comment_id'),
                        'content' => event.get('comment_content')
                    }
                end
            end
            event.cancel()
        "
        
        push_previous_map_as_event => true
        timeout => 5
    }
	
    json {
        source => "message"
        remove_field => ["message"]        #remove_field => ["message", "type", "@timestamp", "@version"]
    }
	
    mutate  {        #将不需要的JSON字段过滤,且不会被存入 ES 中
        remove_field => ["tags", "@timestamp", "@version"]
    }
	
}
 
output {
    stdout {        #codec => json_lines
    }
    elasticsearch {
		
        hosts => ["127.0.0.1:9200"]
        index => "test_nested_community_content"
        document_id => "%{id}"
   }
}

3. 运行命令开始同步

bin\logstash -f mysql\mysql.conf

https://img4.sycdn.imooc.com/645f033f0001892519031017.jpg

4. 查询

https://img2.sycdn.imooc.com/645f03400001289d11590809.jpg



这篇关于Logstash同步MySQL关联表到Elasticsearch的嵌套文档中的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!