https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
安裝 JDBC 驅動 和 Logstash
curl -s https://raw.githubusercontent.com/oscm/shell/master/database/mysql/5.7/mysql-connector-java.sh | bash curl -s https://raw.githubusercontent.com/oscm/shell/master/search/logstash/logstash-5.x.sh | bash
mysql 驅動檔案位置在 /usr/share/java/mysql-connector-java.jar
創建配置檔案 /etc/logstash/conf.d/jdbc-mysql.conf
mysql> desc article; +-------------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-----+---------+-------+ | id | int(11) | NO | | 0 | | | title | mediumtext | NO | | NULL | | | description | mediumtext | YES | | NULL | | | author | varchar(100) | YES | | NULL | | | source | varchar(100) | YES | | NULL | | | ctime | datetime | NO | | NULL | | | content | longtext | YES | | NULL | | +-------------+--------------+------+-----+---------+-------+ 7 rows in set (0.00 sec)
input { jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => "* * * * *" statement => "select * from article" } } output { elasticsearch { hosts => "localhost:9200" index => "information" document_type => "article" document_id => "%{id}" } }
root@netkiller /var/log/logstash % systemctl restart logstash root@netkiller /var/log/logstash % systemctl status logstash ● logstash.service - logstash Loaded: loaded (/etc/systemd/system/logstash.service; enabled; vendor preset: disabled) Active: active (running) since Mon 2017-07-31 09:35:00 CST; 11s ago Main PID: 10434 (java) CGroup: /system.slice/logstash.service └─10434 /usr/bin/java -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+DisableExplicitGC -Djava.awt.headless=true -Dfi... Jul 31 09:35:00 netkiller systemd[1]: Started logstash. Jul 31 09:35:00 netkiller systemd[1]: Starting logstash... root@netkiller /var/log/logstash % cat logstash-plain.log [2017-07-31T09:35:28,169][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}} [2017-07-31T09:35:28,172][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"} [2017-07-31T09:35:28,298][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<Java::JavaNet::URI:0x453a18e9>} [2017-07-31T09:35:28,299][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil} [2017-07-31T09:35:28,337][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}} [2017-07-31T09:35:28,344][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash [2017-07-31T09:35:28,465][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<Java::JavaNet::URI:0x66df34ae>]} [2017-07-31T09:35:28,483][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000} [2017-07-31T09:35:29,562][INFO ][logstash.pipeline ] Pipeline main started [2017-07-31T09:35:29,700][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} [2017-07-31T09:36:01,019][INFO ][logstash.inputs.jdbc ] (0.006000s) select * from article
適合數據沒有改變的歸檔數據或者只能增加沒有修改的數據
input { jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => "* * * * *" statement => "select * from article" } } output { elasticsearch { hosts => "localhost:9200" index => "information" document_type => "article" document_id => "%{id}" } }
多張數據表導入到 Elasticsearch
# multiple inputs on logstash jdbc input { jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => "* * * * *" statement => "select * from article" type => "article" } jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => "* * * * *" statement => "select * from comment" type => "comment" } } output { elasticsearch { hosts => "localhost:9200" index => "information" document_type => "%{type}" document_id => "%{id}" } }
需要在每一個jdbc配置項中加入 type 配置,然後 elasticsearch 配置項中加入 document_type => "%{type}"
input { jdbc { statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value" use_column_value => true tracking_column => "id" tracking_column_type => "numeric" # ... other configuration bits } }
tracking_column_type => "numeric" 可以聲明 id 欄位的數據類型, 如果不指定將會預設為日期
[2017-07-31T11:08:00,193][INFO ][logstash.inputs.jdbc ] (0.020000s) select * from article where id > '2017-07-31 02:47:00'
如果複製不對稱可以加入 clean_run => true 配置項,清楚數據
input { jdbc { statement => "SELECT * FROM my_table WHERE create_date > :sql_last_value" use_column_value => true tracking_column => "create_date" # ... other configuration bits } }
如果複製不對稱可以加入 clean_run => true 配置項,清楚數據
statement_filepath 指定 SQL 檔案,有時SQL太複雜寫入 statement 配置項維護部方便,可以將 SQL 寫入一個文本檔案,然後使用 statement_filepath 配置項引用該檔案。
input { jdbc { jdbc_driver_library => "/path/to/driver.jar" jdbc_driver_class => "org.postgresql.Driver" jdbc_url => "jdbc://postgresql" jdbc_user => "neo" jdbc_password => "password" statement_filepath => "query.sql" } }
將需要複製的條件參數寫入 parameters 配置項
input { jdbc { jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb" jdbc_user => "mysql" parameters => { "favorite_artist" => "Beethoven" } schedule => "* * * * *" statement => "SELECT * from songs where artist = :favorite_artist" } }
jdbc_fetch_size => 1000 #jdbc獲取數據的數量大小 jdbc_page_size => 1000 #jdbc一頁的大小, jdbc_paging_enabled => true #和jdbc_page_size組合,將statement的查詢分解成多個查詢,相當於: SELECT * FROM table LIMIT 1000 OFFSET 4000
通過 if [type]=="news" 執行不同的區塊,實現將不同的type輸出到指定的 index 中。
output { if [type]=="news" { elasticsearch { hosts => "node1.netkiller.cn:9200" index => "information" document_id => "%{id}" } } if [type]=="comment" { elasticsearch { hosts => "node2.netkiller.cn:9200" index => "information" document_id => "%{id}" } } }
日期格式化, 將ISO 8601日期格式轉換為 %Y-%m-%d %H:%M:%S
input { jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/cms" jdbc_user => "cms" jdbc_password => "123456" schedule => "* * * * *" statement => "select * from article limit 5" } } filter { ruby { init => "require 'time'" code => "event.set('ctime', event.get('ctime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } ruby { init => "require 'time'" code => "event.set('mtime', event.get('mtime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } } output { stdout { codec => rubydebug } }
下面的例子實現了新數據複製,舊數據更新
input { jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => "* * * * *" #定時cron的表達式,這裡是每分鐘執行一次 statement => "select id, title, description, author, source, ctime, content from article where id > :sql_last_value" use_column_value => true tracking_column => "id" tracking_column_type => "numeric" record_last_run => true last_run_metadata_path => "/var/tmp/article.last" } jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => "* * * * *" #定時cron的表達式,這裡是每分鐘執行一次 statement => "select * from article where ctime > :sql_last_value" use_column_value => true tracking_column => "ctime" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/var/tmp/article-ctime.last" } } output { elasticsearch { hosts => "localhost:9200" index => "information" document_type => "article" document_id => "%{id}" action => "update" # 操作執行的動作,可選值有["index", "delete", "create", "update"] doc_as_upsert => true #支持update模式 } }
jdbc-input-plugin 只能實現資料庫的追加,對於 elasticsearch 增量寫入,但經常jdbc源一端的資料庫可能會做資料庫刪除或者更新操作。這樣一來資料庫與搜索引擎的資料庫就出現了不對稱的情況。
當然你如果有開發團隊可以寫程序在刪除或者更新的時候同步對搜索引擎操作。如果你沒有這個能力,可以嘗試下面的方法。
這裡有一個數據表 article , mtime 欄位定義了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的時間都會變化
mysql> desc article; +-------------+--------------+------+-----+--------------------------------+-------+ | Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-----+--------------------------------+-------+ | id | int(11) | NO | | 0 | | | title | mediumtext | NO | | NULL | | | description | mediumtext | YES | | NULL | | | author | varchar(100) | YES | | NULL | | | source | varchar(100) | YES | | NULL | | | content | longtext | YES | | NULL | | | status | enum('Y','N')| NO | | 'N' | | | ctime | timestamp | NO | | CURRENT_TIMESTAMP | | | mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | | +-------------+--------------+------+-----+--------------------------------+-------+ 7 rows in set (0.00 sec)
logstash 增加 mtime 的查詢規則
jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => "* * * * *" #定時cron的表達式,這裡是每分鐘執行一次 statement => "select * from article where mtime > :sql_last_value" use_column_value => true tracking_column => "mtime" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/var/tmp/article-mtime.last" }
創建資源回收筒表,這個事用於解決資料庫刪除,或者禁用 status = 'N' 這種情況的。
CREATE TABLE `elasticsearch_trash` ( `id` int(11) NOT NULL, `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
為 article 表創建觸發器
CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW BEGIN -- 此處的邏輯是解決文章狀態變為 N 的時候,需要將搜索引擎中對應的數據刪除。 IF NEW.status = 'N' THEN insert into elasticsearch_trash(id) values(OLD.id); END IF; -- 此處邏輯是修改狀態到 Y 的時候,方式elasticsearch_trash仍然存在該文章ID,導致誤刪除。所以需要刪除資源回收筒中得回收記錄。 IF NEW.status = 'Y' THEN delete from elasticsearch_trash where id = OLD.id; END IF; END CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW BEGIN -- 此處邏輯是文章被刪除同事將改文章放入搜索引擎資源回收筒。 insert into elasticsearch_trash(id) values(OLD.id); END
接下來我們需要寫一個簡單地 Shell 每分鐘運行一次,從 elasticsearch_trash 數據表中取出數據,然後使用 curl 命令調用 elasticsearch restful 介面,刪除被收回的數據。
systemctl stop logstash rm -rf /var/tmp/article*
修改 /etc/logstash/conf.d/jdbc.conf 配置檔案
input { jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "123456" schedule => "* * * * *" statement => "select * from article where id > :sql_last_value" use_column_value => true tracking_column => "id" tracking_column_type => "numeric" record_last_run => true last_run_metadata_path => "/var/tmp/article.last" } jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "123456" schedule => "* * * * *" #定時cron的表達式,這裡是每分鐘執行一次 statement => "select * from article where ctime > :sql_last_value" use_column_value => true tracking_column => "ctime" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/var/tmp/article-ctime.last" } } filter { ruby { code => "event.set('ctime', event.get('[ctime]').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } ruby { code => "event.set('mtime', event.get('[mtime]').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } } output { elasticsearch { hosts => "localhost:9200" index => "information" document_type => "article" document_id => "%{id}" action => "update" doc_as_upsert => true } }
刪除就的index,重新創建,並配置 mapping。
curl -XDELETE http://localhost:9200/information curl -XPUT http://localhost:9200/information curl -XPOST http://localhost:9200/information/article/_mapping -d' { "properties": { "title": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "description": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "content": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "ctime": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss" }, "mtime": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss" } } }' curl "http://localhost:9200/information/article/_mapping?pretty"
啟動 logstash 重新複製數據。
rm -rf /var/log/logstash/* systemctl start logstash