Elasticsearch 数据同步


Logstash

Logstash 数据采集引擎
注:使用时,Logstash版本和Elasticsearch版本须保持一致!

  • 数据采集

    从数据库或者其它数据源中采集数据至es中

  • 以id或update_time作为同步边界

    基于定时任务调度实现以下两种方式的同步
    根据id更新:适合增量同步,不适合修改同步
    根据update_time:logstash会维护一个更新时间,从而比对数据源的更新时间,实现数据同步

  • logstash-input-jdbc 插件

    此插件用于同步Mysql数据源,已自带

  • 预先创建索引

    数据同步前,ES应先有一个索引库

环境准备

根据具体版本准备如下软件环境
我的当前目录:/home/software/

  • jdk

    jdk-8u191-linux-x64.tar.gz

  • logstash

    logstash-6.4.3.tar.gz,下载地址

  • jdbc驱动

    mysql-connector-java-5-1-41.jar

安装及配置logstash

  • 解压

    tar -zxvf logstash-6.4.3.tar.gz
  • 移动目录

    mv logstash-6.4.3 /usr/local/
    cd /usr/local/logstash-6.4.3/
  • 创建一个配置目录

    mkdir sync
    cd sync
  • 创建一个配置文件

    vim logstash-db-sync.conf
    :wq
  • 创建一个sql脚本文件

    内容为数据同步的查询sql

    vim items.sql
    :wq

    注:sql的where部分,可使用如 where tName.update_time >= :sql_last_value
    :sql_last_value 是logstash维护一个数据最后更新时间的内置变量,每次同步完成后会更新的一个边界值

  • 拷贝mysql驱动至当前目录

    cp /home/software/mysql-connector-java-5-1-41.jar .
  • 配置 logstash-db-sync.conf

    input {
      jdbc {
          # 设置 Mysql/MariaDB 数据库url及数据库名称
          jdbc_connection_string => "jdbc:mysql://host:port/database?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true
          # 用户名和密码
          jdbc_user => "root"
          jdbc_password => "root"
          # 数据库驱动所在位置,可以是绝对路径或者相对路径
          jdbc_driver_library => "/usr/local/logstash-6.4.3/sync/mysql-connector-java-5-1-41.jar"
          # 驱动类名
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          # 开启分页,数据采集过程中
          jdbc_paging_enabled => "true"
          # 分页每页数据,可以自定义
          jdbc_page_size => "10000"
          # 执行的sql文件路径
          statement_filepath => "/usr/local/lostash-6.4.3/sync/items.sql
          # 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次
          schedule => "* * * * *"
          # 索引类型
          type => "_doc"
          # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
          use_column_value => true
          # 记录上一次追踪的结果值,会自动创建trach_time这个文件
          last_run_metadata_path => "/usr/local/logstash-6.4.3/sync/track_time"
          # 如果 use_column_value 为true,配置本参数,追踪的 column 名,可以是自增id或者时间
          tracking_column => "update_time"
          # tracking_column 对应字段的类型
          tracking_column_type => "timestamp"
          # 是否清除 last_tun_metadata_path 的记录,true则每次都从头开始查询
          clean_run => false
          # 数据库字段名称大写转小写
          lowercase_column_names => false
      }
    }
    output {
      elasticsearch {
          # es地址,集群可以配置多个
          hosts => ["192.168.1.187.9200"]
          # 同步的索引名
          index => "shop-items"
          # 设置_docID和数据相同
          document_id => "%{id}"
      }
      # 日志输出
      stdout {
          # 输出至控制台
          codec => json_lines
      }
    }
  • 启动 logstash

    cd /usr/local/logstash-6.4.3/bin/
    ./logstash -f /usr/local/logstash-6.4.4/sync/logstash-db-sync.conf

    -f 表示指定配置文件启动
    观察日志,等待1分钟可见同步日志的输出

总结

以上,我们完成了 Logstash 实现ES数据同步
当数据新增、更新时,会自动两步数据至ES
注:在数据库删除数据,ES不会同步删除。解决:可以通过逻辑删除,在ES查询时带上删除标记达到”删除”效果


文章作者: 小动物不困
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 小动物不困 !
评论
 本篇
Elasticsearch 数据同步 Elasticsearch 数据同步
Logstash Logstash 数据采集引擎注:使用时,Logstash版本和Elasticsearch版本须保持一致! 数据采集 从数据库或者其它数据源中采集数据至es中 以id或update_time作为同步边界 基于定时任
2020-10-11
下一篇 
Elasticsearch 集群 Elasticsearch 集群
0. 前提 三台服务器已正常安装ES,安装ES可参考这里 本教程ES安装目录为:/usr/local/elasticsearch-7.4.2/,三个节点一致 服务器与IP约定 节点一:192.168.1.184节点二:192.168.1.1
2020-10-08
  目录