39.1Elastic Stack笔记

        搜索组件:Solr, ElasticSearch    
        Lucene Core:
            Apache LuceneTM is a high-performance, full-featured text search engine library written entirely in Java. It is a technology suitable for nearly any application that requires full-text search, especially cross-platform.
            SolrTM is a high performance search server built using Lucene Core, with XML/HTTP and JSON/Python/Ruby APIs, hit highlighting, faceted search, caching, replication, and a web admin interface.
            Elasticsearch is a distributed, RESTful search and analytics engine capable of solving a growing number of use cases. As the heart of the Elastic Stack, it centrally stores your data so you can discover the expected and uncover the unexpected.
    Elastic Stack:
            Logstash is an open source, server-side data processing pipeline that ingests data from a multitude of sources simultaneously, transforms it, and then sends it to your favorite “stash.” (Ours is Elasticsearch, naturally.)
            Filebeat:Log Files
            Packetbeat:Network Data
            Winlogbeat:Windows Event Logs
            Heartbeat:Uptime Monitoring
            Kibana lets you visualize your Elasticsearch data and navigate the Elastic Stack, so you can do anything from learning why you're getting paged at 2:00 a.m. to understanding the impact rain might have on your quarterly numbers.

                状态:green, yellow, red
    ElasticSearch 5的程序环境:
        Unit File:elasticsearch.service

            cluster.name: myels
            node.name: node1
            path.data: /data/els/data
            path.logs: /data/els/logs
            http.port: 9200
            discovery.zen.ping.unicast.hosts: ["node1", "node2", "node3"]
            discovery.zen.minimum_master_nodes: 2        
    RESTful API: CRUD(Create, Read, Update, Delete)
        curl  -X<VERB> '<PROTOCOL>://<HOST>:<PORT>/<PATH>?<QUERY_STRING>' -d '<BODY>'

        特殊PATH:/_cat, /_search, /_cluster
         curl -XGET ''
         curl -XGET ''
        curl -XGET ''
        curl -XGET ''
            curl  -XPUT  
            {"key1": "value1", "key2": value, ...}

            primary shard:r/w
            replica shard: r
            _cluster, _cat, _search
        curl -X GET '<SCHEME://<HOST>:<PORT>/[INDEX/TYPE/]_search?q=KEYWORD&sort=DOMAIN:[asc|desc]&from=#&size=#&_source=DOMAIN_LIST'
        bike, bake, age: [20,200]
        查询类型:Query DSL,简单字符串;
                (1) q=KEYWORD, 相当于q=_all:KEYWORD
                (2) q=DOMAIN:KEYWORD
                            "name" : "Docker in Action",
                            "publisher" : "wrox",
                            "datatime" : "2015-12-01",
                            "author" : "Blair"
                        _all: "Docker in Action Wrox 2015-12-01 Blair"
                default_operator, 默认值为OR
                size=, 默认为10;
            Full text queries
            (1) 设置elasticsearch.yml配置文件:
                http.cors.enabled: true
                http.cors.allow-origin: "*"
            (2) 安装head:
                $ git clone https://github.com/mobz/elasticsearch-head.git
                $ cd elasticsearch-head
                $ npm install
                $ npm run start
                $ npm run proxy
        E: elasticsearch
        L: logstash,日志收集工具;
            ELK Beats Platform:
                Filebeat:是logstash forwarder的替换者,因此是一个日志收集工具;
        while true; do curl -H "X-Forwarded-For:$[$RANDOM%223+1].$[$RANDOM%255].1.1"$[$RANDOM%25+1].html; sleep 1; done
        input {
        output {
            input {
                stdin {}

            output {
                stdout {
                    codec => rubydebug
            input {
                file {
                    path => ["/var/log/httpd/access_log"]
                    start_position => "beginning"

            filter {
                grok {
                    match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                    remove_field: "message"

            output {
                stdout {
                    codec => rubydebug
        示例3:date filter插件示例:
                filter {
                        grok {
                                match => {
                                        "message" => "%{HTTPD_COMBINEDLOG}"
                                remove_field => "message"
                        date {
                                match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                                remove_field => "timestamp"
            The mutate filter allows you to perform general mutations on fields. You can rename, remove, replace, and modify fields in your events.
        示例4:mutate filter插件
            filter {
                    grok {
                            match => {
                                    "message" => "%{HTTPD_COMBINEDLOG}"
                    date {
                            match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                    mutate {
                            rename => {
                                    "agent" => "user_agent"
            filter {
                    grok {
                            match => {
                                    "message" => "%{HTTPD_COMBINEDLOG}"
                    date {
                            match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                    mutate {
                            rename => {
                                    "agent" => "user_agent"
                    geoip {
                            source => "clientip"
                            target => "geoip"
                            database => "/etc/logstash/maxmind/GeoLite2-City.mmdb"
            (1) 从redis加载数据
                input {
                    redis {
                        batch_count => 1
                        data_type => "list"
                        key => "logstash-list"
                        host => ""
                        port => 6379
                        threads => 5
            (2) 将数据存入redis
                output {
                    redis {
                        data_type => "channel"
                        key => "logstash-%{+yyyy.MM.dd}"
        示例4:将数据写入els cluster
            output {
                elasticsearch {
                    hosts => ["http://node1:9200/","http://node2:9200/","http://node3:9200/"]
                    user => "ec18487808b6908009d3"
                    password => "efcec6a1e0"
                    index => "logstash-%{+YYYY.MM.dd}"
                    document_type => "apache_logs"

            input {
                beats {
                    port => 5044

            filter {
                grok {
                    match => { 
                    "message" => "%{COMBINEDAPACHELOG}"
                    remove_field => "message"
                geoip {
                    source => "clientip"
                    target => "geoip"
                    database => "/etc/logstash/GeoLite2-City.mmdb"

            output {
                elasticsearch {
                    hosts => ["","",""]
                    index => "logstash-%{+YYYY.MM.dd}"
                    action => "index"
                    document_type => "apache_logs"
       GET /logo.jpg  203 0.12
                %{IP:clientip} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
                { clientip:, method: GET, request: /logo.jpg, bytes: 203, duration: 0.12}
                %{IPORHOST:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)" %{HOST:domain} %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent} "(%{WORD:x_forword}|-)" (%{URIHOST:upstream_host}|-) %{NUMBER:upstream_response} (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{BASE16FLOAT:upstream_response_time}) > (%{BASE16FLOAT:request_time})
                 "message" => "%{IPORHOST:clientip} \[%{HTTPDATE:time}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"(?<http_referer>\S+)\" \"(?<http_user_agent>\S+)\" \"(?<http_x_forwarded_for>\S+)\""
                 filter {
                    grok {
                        match => {
                            "message" => "%{IPORHOST:clientip} \[%{HTTPDATE:time}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"(?<http_referer>\S+)\" \"(?<http_user_agent>\S+)\" \"(?<http_x_forwarded_for>\S+)\""
                        remote_field: message
                filter {
                    grok {
                        match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx
                        ][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\
                        " %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"
                        %{DATA:[nginx][access][agent]}\""] }
                        remove_field => "message"
                    date {
                        match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
                        remove_field => "[nginx][access][time]"
                    useragent {
                        source => "[nginx][access][agent]"
                        target => "[nginx][access][user_agent]"
                        remove_field => "[nginx][access][agent]"
                    geoip {
                        source => "[nginx][access][remote_ip]"
                        target => "geoip"
                        database => "/etc/logstash/GeoLite2-City.mmdb"
                output {                                                                                                     
                    elasticsearch {                                                                                      
                        hosts => ["node1:9200","node2:9200","node3:9200"]                                            
                        index => "logstash-ngxaccesslog-%{+YYYY.MM.dd}"                                              
                    2、target => "geoip"
        除了使用grok filter plugin实现日志输出json化之外,还可以直接配置服务输出为json格式;
            filter {
                    grok {
                            match => {
                                    "message" => "%{HTTPD_COMBINEDLOG} \"%{DATA:realclient}\""
                            remove_field => "message"
                    date {
                            match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                            remove_field => "timestamp"
            filter {
                    grok {
                            match => {
                                    "message" => "%{HTTPD_COMMONLOG}"
                            remove_field => "message"
                    date {
                            match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                            remove_field => "timestamp"
            log_format   json  '{"@timestamp":"$time_iso8601",'

            access_log  logs/access.log  json;                  
    Sometimes you only want to filter or output an event under certain conditions. For that, you can use a conditional.

    Conditionals in Logstash look and act the same way they do in programming languages. Conditionals support if, else if and else statements and can be nested.
    The conditional syntax is:

        if EXPRESSION {
        } else if EXPRESSION {
        } else {
        What’s an expression? Comparison tests, boolean logic, and so on!

        You can use the following comparison operators:

        equality: ==, !=, <, >, <=, >=
        regexp: =~, !~ (checks a pattern on the right against a string value on the left) inclusion: in, not in
        The supported boolean operators are:

            and, or, nand, xor
        The supported unary operators are:

        Expressions can be long and complex. Expressions can contain other expressions, you can negate expressions with !, and you can group them with parentheses (...).
        filter {
            if [type] == 'tomcat-accesslog' {
                grok {}
            if [type] == 'httpd-accesslog' {
                grok {}

- input_type: log
    - /var/log/httpd/access_log
    logtype: access

- paths:
    - /var/log/httpd/error_log
    logtype: errors    

input {
        redis {
                host => ""
                port => 6379
                password => "dhy.com"
                db => 0
                key => "filebeat"
                data_type => "list"

filter {
    if [fields][logtype] == "access" {
        grok {
                match => { "message" => "%{HTTPD_COMBINEDLOG}" }
                remove_field => ["message","beat"]
        date {
                match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                remove_field => "timestamp"
        geoip {
                source => "clientip"
                target => "geoip"
                database => "/etc/logstash/maxmind/GeoLite2-City.mmdb"

output {
    if [fields][logtype] == "access" {
        elasticsearch {
                hosts => ["http://node01.dhy.com:9200/","http://node02.dhy.com:9200/"]
                index => "logstash-%{+YYYY.MM.dd}"
                document_type => "httpd_access_logs"
    } else {
        elasticsearch {
                hosts => ["http://node01.dhy.com:9200/","http://node02.dhy.com:9200/"]
                index => "logstash-%{+YYYY.MM.dd}"
                document_type => "httpd_error_logs"

