知乎專欄 | 多維度架構 | 微信號 netkiller-ebook | QQ群:128659835 請註明“讀者” |
root@netkiller ~ % /usr/share/logstash/bin/logstash -e "input {stdin{}} output {stdout{}}" Helloworld ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console 18:03:38.340 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000} 18:03:38.356 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started The stdin plugin is now waiting for input: 2017-08-03T10:03:38.375Z localhost Helloworld 18:03:38.384 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
rubydebug提供以json格式輸出到屏幕
root@netkiller ~ % /usr/share/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}' My name is neo ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console 18:05:02.734 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000} 18:05:02.747 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started The stdin plugin is now waiting for input: { "@timestamp" => 2017-08-03T10:05:02.764Z, "@version" => "1", "host" => "localhost", "message" => "My name is neo" } 18:05:02.782 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
input { file { type => "syslog" path => [ "/var/log/maillog", "/var/log/messages", "/var/log/secure" ] start_position => "beginning" } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["127.0.0.1:9200"] } }
start_position => "beginning" 從頭開始讀,如果沒有這個選項,只會讀取最後更新的數據。
input { file { path =>"/var/log/messages" type =>"syslog"} file { path =>"/var/log/apache/access.log" type =>"apache"} }
input { file { type => "syslog" path => [ "/var/log/secure", "/var/log/messages", "/var/log/syslog" ] } tcp { port => "5145" type => "syslog-network" } udp { port => "5145" type => "syslog-network" } } output { elasticsearch { hosts => ["127.0.0.1:9200"] } }
input { redis { host => "127.0.0.1" port => "6379" key => "logstash:demo" data_type => "list" codec => "json" type => "logstash-redis-demo" tags => ["logstashdemo"] } } output { elasticsearch { hosts => ["127.0.0.1:9200"] } }
指定 Database 10
root@netkiller /etc/logstash/conf.d % cat spring-boot-redis.conf input { redis { codec => json host => "localhost" port => 6379 db => 10 key => "logstash:redis" data_type => "list" } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["127.0.0.1:9200"] index => "logstash-api" } }
input { kafka { zk_connect => "kafka:2181" group_id => "logstash" topic_id => "apache_logs" consumer_threads => 16 } }
root@netkiller /etc/logstash/conf.d % cat jdbc.conf input { jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "123456" schedule => "* * * * *" statement => "select * from article where id > :sql_last_value" use_column_value => true tracking_column => "id" tracking_column_type => "numeric" record_last_run => true last_run_metadata_path => "/var/tmp/article.last" } jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "123456" schedule => "* * * * *" #定時cron的表達式,這裡是每分鐘執行一次 statement => "select * from article where ctime > :sql_last_value" use_column_value => true tracking_column => "ctime" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/var/tmp/article-ctime.last" } } output { elasticsearch { hosts => "localhost:9200" index => "information" document_type => "article" document_id => "%{id}" action => "update" doc_as_upsert => true } }
系統預設是 ISO8601 如果需要轉換為 yyyy-MM-dd-HH:mm:ss 參考:
filter { date { match => [ "ctime", "yyyy-MM-dd HH:mm:ss" ] locale => "cn" } date { match => [ "mtime", "yyyy-MM-dd HH:mm:ss" ] locale => "cn" } }
創建匹配檔案 /usr/share/logstash/patterns
mkdir /usr/share/logstash/patterns vim /usr/share/logstash/patterns NGUSERNAME [a-zA-Z\.\@\-\+_%]+ NGUSER %{NGUSERNAME} NGINXACCESS %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent}
filter { if [type] == "nginx-access" { grok { match => { "message" => "%{NGINXACCESS}" } } } }
input { file { type => "syslog" path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ] sincedb_path => "/opt/logstash/sincedb-access" } syslog { type => "syslog" port => "5544" } } filter { grok { type => "syslog" match => [ "message", "%{SYSLOGBASE2}" ] add_tag => [ "syslog", "grokked" ] } } output { elasticsearch { host => "elk.netkiller.cn" } }
input { file { type => "SSRCode" path => "/SD/2015*/01*/*.csv" start_position => "beginning" } } filter { csv { columns => ["Code","Source"] separator => "," } kv { source => "uri" field_split => "&?" value_split => "=" } } # output logs to console and to elasticsearch output { stdout {} elasticsearch { hosts => ["172.16.1.1:9200"] } }
input { stdin {} } filter { ruby { init => " begin @@csv_file = 'output.csv' @@csv_headers = ['A','B','C'] if File.zero?(@@csv_file) || !File.exist?(@@csv_file) CSV.open(@@csv_file, 'w') do |csv| csv << @@csv_headers end end end " code => " begin event['@metadata']['csv_file'] = @@csv_file event['@metadata']['csv_headers'] = @@csv_headers end " } csv { columns => ["a", "b", "c"] } } output { csv { fields => ["a", "b", "c"] path => "%{[@metadata][csv_file]}" } stdout { codec => rubydebug { metadata => true } } }
測試
echo "1,2,3\n4,5,6\n7,8,9" | ./bin/logstash -f csv-headers.conf
輸出結果
A,B,C 1,2,3 4,5,6 7,8,9
日期格式化, 將ISO 8601日期格式轉換為 %Y-%m-%d %H:%M:%S
保存下面內容到配置檔案data.conf
input { stdin{} } filter { ruby { init => "require 'time'" code => "event.set('ctime', event.get('ctime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } ruby { init => "require 'time'" code => "event.set('mtime', event.get('mtime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } } output { stdout { codec => rubydebug } }
/usr/share/logstash/bin/logstash -f date.conf
output { file { path => "/path/to/%{host}/%{+yyyy}/%{+MM}/%{+dd}.log.gz" message_format => "%{message}" gzip => true } }
output { stdout { codec => rubydebug } elasticsearch { hosts => ["127.0.0.1:9200"] index => "logging" } }
配置實現每日切割一個 index
index => "logstash-%{+YYYY.MM.dd}" "_index" : "logstash-2017.03.22"
index 自定義 logstash-%{type}-%{+YYYY.MM.dd}
input { redis { data_type => "list" key => "logstash:redis" host => "127.0.0.1" port => 6379 threads => 5 codec => "json" } } filter { } output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "logstash-%{type}-%{+YYYY.MM.dd}" document_type => "%{type}" workers => 1 flush_size => 20 idle_flush_time => 1 template_overwrite => true } stdout{} }