| 知乎專欄 | 多維度架構 | 微信號 netkiller-ebook | QQ群:128659835 請註明“讀者” |
root@netkiller ~ % /usr/share/logstash/bin/logstash -e "input {stdin{}} output {stdout{}}"
Helloworld
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console
18:03:38.340 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
18:03:38.356 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started
The stdin plugin is now waiting for input:
2017-08-03T10:03:38.375Z localhost Helloworld
18:03:38.384 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
rubydebug提供以json格式輸出到屏幕
root@netkiller ~ % /usr/share/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
My name is neo
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console
18:05:02.734 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
18:05:02.747 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started
The stdin plugin is now waiting for input:
{
"@timestamp" => 2017-08-03T10:05:02.764Z,
"@version" => "1",
"host" => "localhost",
"message" => "My name is neo"
}
18:05:02.782 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
input {
file {
type => "syslog"
path => [ "/var/log/maillog", "/var/log/messages", "/var/log/secure" ]
start_position => "beginning"
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["127.0.0.1:9200"]
}
}
start_position => "beginning" 從頭開始讀,如果沒有這個選項,只會讀取最後更新的數據。
input {
file { path =>"/var/log/messages" type =>"syslog"}
file { path =>"/var/log/apache/access.log" type =>"apache"}
}
input {
file {
type => "syslog"
path => [ "/var/log/secure", "/var/log/messages", "/var/log/syslog" ]
}
tcp {
port => "5145"
type => "syslog-network"
}
udp {
port => "5145"
type => "syslog-network"
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
}
}
input {
redis {
host => "127.0.0.1"
port => "6379"
key => "logstash:demo"
data_type => "list"
codec => "json"
type => "logstash-redis-demo"
tags => ["logstashdemo"]
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
}
}
指定 Database 10
root@netkiller /etc/logstash/conf.d % cat spring-boot-redis.conf
input {
redis {
codec => json
host => "localhost"
port => 6379
db => 10
key => "logstash:redis"
data_type => "list"
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-api"
}
}
![]() |
input {
kafka {
zk_connect => "kafka:2181"
group_id => "logstash"
topic_id => "apache_logs"
consumer_threads => 16
}
}
root@netkiller /etc/logstash/conf.d % cat jdbc.conf
input {
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "123456"
schedule => "* * * * *"
statement => "select * from article where id > :sql_last_value"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
record_last_run => true
last_run_metadata_path => "/var/tmp/article.last"
}
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "123456"
schedule => "* * * * *" #定時cron的表達式,這裡是每分鐘執行一次
statement => "select * from article where ctime > :sql_last_value"
use_column_value => true
tracking_column => "ctime"
tracking_column_type => "timestamp"
record_last_run => true
last_run_metadata_path => "/var/tmp/article-ctime.last"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "information"
document_type => "article"
document_id => "%{id}"
action => "update"
doc_as_upsert => true
}
}
系統預設是 ISO8601 如果需要轉換為 yyyy-MM-dd-HH:mm:ss 參考:
filter {
date {
match => [ "ctime", "yyyy-MM-dd HH:mm:ss" ]
locale => "cn"
}
date {
match => [ "mtime", "yyyy-MM-dd HH:mm:ss" ]
locale => "cn"
}
}
創建匹配檔案 /usr/share/logstash/patterns
mkdir /usr/share/logstash/patterns
vim /usr/share/logstash/patterns
NGUSERNAME [a-zA-Z\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
NGINXACCESS %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent}
filter {
if [type] == "nginx-access" {
grok {
match => { "message" => "%{NGINXACCESS}" }
}
}
}
input {
file {
type => "syslog"
path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ]
sincedb_path => "/opt/logstash/sincedb-access"
}
syslog {
type => "syslog"
port => "5544"
}
}
filter {
grok {
type => "syslog"
match => [ "message", "%{SYSLOGBASE2}" ]
add_tag => [ "syslog", "grokked" ]
}
}
output {
elasticsearch { host => "elk.netkiller.cn" }
}
input {
file {
type => "SSRCode"
path => "/SD/2015*/01*/*.csv"
start_position => "beginning"
}
}
filter {
csv {
columns => ["Code","Source"]
separator => ","
}
kv {
source => "uri"
field_split => "&?"
value_split => "="
}
}
# output logs to console and to elasticsearch
output {
stdout {}
elasticsearch {
hosts => ["172.16.1.1:9200"]
}
}
input {
stdin {}
}
filter {
ruby {
init => "
begin
@@csv_file = 'output.csv'
@@csv_headers = ['A','B','C']
if File.zero?(@@csv_file) || !File.exist?(@@csv_file)
CSV.open(@@csv_file, 'w') do |csv|
csv << @@csv_headers
end
end
end
"
code => "
begin
event['@metadata']['csv_file'] = @@csv_file
event['@metadata']['csv_headers'] = @@csv_headers
end
"
}
csv {
columns => ["a", "b", "c"]
}
}
output {
csv {
fields => ["a", "b", "c"]
path => "%{[@metadata][csv_file]}"
}
stdout {
codec => rubydebug {
metadata => true
}
}
}
測試
echo "1,2,3\n4,5,6\n7,8,9" | ./bin/logstash -f csv-headers.conf
輸出結果
A,B,C 1,2,3 4,5,6 7,8,9
日期格式化, 將ISO 8601日期格式轉換為 %Y-%m-%d %H:%M:%S
保存下面內容到配置檔案data.conf
input {
stdin{}
}
filter {
ruby {
init => "require 'time'"
code => "event.set('ctime', event.get('ctime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
}
ruby {
init => "require 'time'"
code => "event.set('mtime', event.get('mtime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
}
}
output {
stdout {
codec => rubydebug
}
}
/usr/share/logstash/bin/logstash -f date.conf
output {
file {
path => "/path/to/%{host}/%{+yyyy}/%{+MM}/%{+dd}.log.gz"
message_format => "%{message}"
gzip => true
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logging"
}
}
配置實現每日切割一個 index
index => "logstash-%{+YYYY.MM.dd}"
"_index" : "logstash-2017.03.22"
index 自定義 logstash-%{type}-%{+YYYY.MM.dd}
input {
redis {
data_type => "list"
key => "logstash:redis"
host => "127.0.0.1"
port => 6379
threads => 5
codec => "json"
}
}
filter {
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
workers => 1
flush_size => 20
idle_flush_time => 1
template_overwrite => true
}
stdout{}
}