From 0b5b1c59f1e44de79a2ef31726c48e4a83a2d91b Mon Sep 17 00:00:00 2001 From: svk Date: Tue, 6 Apr 2021 20:25:03 +0300 Subject: [PATCH] Elasticsearch indices monitoring --- es-monitoring/.gitignore | 3 + es-monitoring/Dockerfile | 17 + es-monitoring/README.md | 29 ++ es-monitoring/cronjobs | 2 + es-monitoring/docker-compose.yml | 19 + es-monitoring/es-monitoring.go | 405 ++++++++++++++++++ es-monitoring/go.mod | 12 + es-monitoring/go.sum | 14 + .../Template_ES_Indices_Monitoring.xml | 67 +++ 9 files changed, 568 insertions(+) create mode 100644 es-monitoring/.gitignore create mode 100644 es-monitoring/Dockerfile create mode 100644 es-monitoring/README.md create mode 100644 es-monitoring/cronjobs create mode 100644 es-monitoring/docker-compose.yml create mode 100644 es-monitoring/es-monitoring.go create mode 100644 es-monitoring/go.mod create mode 100644 es-monitoring/go.sum create mode 100644 es-monitoring/zabbix_templates/Template_ES_Indices_Monitoring.xml diff --git a/es-monitoring/.gitignore b/es-monitoring/.gitignore new file mode 100644 index 0000000..43d7aec --- /dev/null +++ b/es-monitoring/.gitignore @@ -0,0 +1,3 @@ +.env +main +es-monitoring \ No newline at end of file diff --git a/es-monitoring/Dockerfile b/es-monitoring/Dockerfile new file mode 100644 index 0000000..92a5e50 --- /dev/null +++ b/es-monitoring/Dockerfile @@ -0,0 +1,17 @@ +FROM golang:alpine AS build +RUN apk --no-cache add gcc g++ make git +WORKDIR /go/src/app +COPY . . +RUN go get ./... +RUN GOOS=linux go build -ldflags="-s -w" -o ./bin/es-monitoring ./es-monitoring.go +FROM alpine:3.9 +#RUN apk --no-cache add ca-certificates +WORKDIR /usr/bin +COPY --from=build /go/src/app/bin /go/bin + +COPY cronjobs /etc/crontabs/root + +# start crond with log level 8 in foreground, output to stderr +CMD ["crond", "-f", "-d", "8"] + +#ENTRYPOINT /go/bin/es-monitor \ No newline at end of file diff --git a/es-monitoring/README.md b/es-monitoring/README.md new file mode 100644 index 0000000..875fa9d --- /dev/null +++ b/es-monitoring/README.md @@ -0,0 +1,29 @@ +# Es Monitoring + +Мониторинг индексов Elasticsearch + +# использование + + export ELASTICSEARCH_URL="https://__USER__:__PASSWORD__@elastic:200" + export ZABBIX_SERVER=https://zabbix2 + export ZABBIX_HOST=elastic + export ZABBIX_USERNAME=zabbix_helpers + export ZABBIX_PASSWORD= + +``` +Usage of ./es-monitor: + -indexname string + Elasticsearch index name pattern, (like "filebeat") + -operation string + Opertation type, must be: + es-cluster-info - ES Cluster information (version e.t.c) + es-get-indices - geting all index + es-indices-discover - getting es index pattern list + es-records-count - getting the number of records for a time range for all index pattern + es-index-records-count - getting records count for one index (used with -indexname option (default "es-cluster-info") + -timerange int + Elasticsearch time range for records count into hours (default 6) + -zabbix-send + Send metrics or discovery data into zabbix + +``` \ No newline at end of file diff --git a/es-monitoring/cronjobs b/es-monitoring/cronjobs new file mode 100644 index 0000000..24eeb17 --- /dev/null +++ b/es-monitoring/cronjobs @@ -0,0 +1,2 @@ +*/10 * * * * /go/bin/es-monitoring -operation es-records-count -timerange 1 -zabbix-send >> /var/log/es-monitoring.log 2>&1 +0 * * * * /go/bin/es-monitoring -operation es-indices-discover -zabbix-send >> /var/log/es-monitoring.log 2>&1 diff --git a/es-monitoring/docker-compose.yml b/es-monitoring/docker-compose.yml new file mode 100644 index 0000000..980ebe8 --- /dev/null +++ b/es-monitoring/docker-compose.yml @@ -0,0 +1,19 @@ +version: '3' + +services: + es_monitoring: + image: $IMAGE_PATH/es_monitoring:latest + environment: + - ZABBIX_USER=${ZABBIX_USER} + - ZABBIX_PASSWORD=${ZABBIX_PASSWORD} + - ZABBIX_HOST=elastic + - ZABBIX_SERVER=zabbix + - ZABBIX_PORT=10051 + - ELASTICSEARCH_URL=${ELASTICSEARCH_URL} + restart: always + build: + context: . + logging: + options: + max-size: "10m" + max-file: "5" diff --git a/es-monitoring/es-monitoring.go b/es-monitoring/es-monitoring.go new file mode 100644 index 0000000..74e5e8d --- /dev/null +++ b/es-monitoring/es-monitoring.go @@ -0,0 +1,405 @@ +//------------------------------------------------------------------------------ +// Getting and monitoring Elasticsearch indices and send the metrics into zabbix +//------------------------------------------------------------------------------- +// Author: Sergey Kalinin +// e-mail: svk@nuk-svk.ru +// website: https://nuk-svk.ru +// Git repos: https://git.nuk-svk.ru +//------------------------------------------------------------------------------- +// Require: +// github.com/adubkov/go-zabbix +// github.com/elastic/go-elasticsearch +//------------------------------------------------------------------------------- +// ZABBIX_SERVER="zabbix" +// ZABBIX_PORT="10051" +// ZABBIX_HOST="elastic cluster" +// ELASTICSEARCH_URL="https://user:pass@elastic:9200" +//------------------------------------------------------------------------------- + +package main + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "flag" + "fmt" + "log" + "net" + "net/http" + "os" + "regexp" + "strconv" + "strings" + "time" + + "github.com/adubkov/go-zabbix" + "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" +) + +type json_records struct { + INDEX string + RECORD_COUNT int +} + +// Определяем входит ли строка в slice +func StringContains(a []string, x string) bool { + for _, n := range a { + if x == n { + return true + } + } + return false +} + +func EsSearch(index_name string, es_request_time_range int) int { + var ( + r map[string]interface{} + ) + request_time_range := "now-" + strconv.Itoa(es_request_time_range) + "h" + + cfg := elasticsearch.Config{ + // Addresses: []string{ + // "http://localhost:9200", + // "http://localhost:9201", + // }, + // Username: "foo", + // Password: "bar", + Transport: &http.Transport{ + MaxIdleConnsPerHost: 10, + ResponseHeaderTimeout: 60 * time.Second, + DialContext: (&net.Dialer{Timeout: time.Second}).DialContext, + TLSClientConfig: &tls.Config{ + MaxVersion: tls.VersionTLS11, + InsecureSkipVerify: true, + }, + }, + } + + es, err := elasticsearch.NewClient(cfg) + if err != nil { + log.Fatalf("Error creating the client: %s", err) + } + res, err := es.Info() + if err != nil { + log.Fatalf("Error getting response: %s", err) + } + defer res.Body.Close() + // Check response status + if res.IsError() { + log.Fatalf("Error: %s", res.String()) + } + var buf bytes.Buffer + query := map[string]interface{}{ + "query": map[string]interface{}{ + "range": map[string]interface{}{ + "@timestamp": map[string]interface{}{ + // "gt": "now-1h", + "gt": request_time_range, + }, + // "match": map[string]interface{}{ + // "host": "10.250.1.212", + }, + }, + "size": 1, + "sort": map[string]interface{}{ + "@timestamp": map[string]interface{}{ + "order": "desc", + }, + }, + } + if err := json.NewEncoder(&buf).Encode(query); err != nil { + log.Fatalf("Error encoding query: %s", err) + } + + // Perform the search request. + res, err = es.Search( + es.Search.WithContext(context.Background()), + es.Search.WithIndex(index_name+"*"), + es.Search.WithBody(&buf), + es.Search.WithTrackTotalHits(true), + es.Search.WithPretty(), + ) + if err != nil { + log.Fatalf("Error getting response: %s", err) + } + defer res.Body.Close() + + if res.IsError() { + var e map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&e); err != nil { + log.Fatalf("Error parsing the response body: %s", err) + } else { + // Print the response status and error information. + log.Fatalf("[%s] %s: %s", + res.Status(), + e["error"].(map[string]interface{})["type"], + e["error"].(map[string]interface{})["reason"], + ) + } + } + + if err := json.NewDecoder(res.Body).Decode(&r); err != nil { + log.Fatalf("Error parsing the response body: %s", err) + } + // Вывод статуса, количество выбранных записей и времени запроса + hits := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)) + return hits +} + +func EsIndexDiscover(es_index_list []string) string { + result_discovery_json := string("{\"data\": [") + + for _, value := range es_index_list { + // result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\",\"{#ITEM_TYPE}\":\"Records count\"" + "}," + result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\"" + "}," + } + result_discovery_json = strings.TrimRight(result_discovery_json, ",") + "]}" + // fmt.Println(result_discovery_json) + return (result_discovery_json) +} + +func EsIndexRecordsCount(es_index_list []string, es_request_time_range int) { + // fmt.Println(es_request_time_range) + result_index := make(map[int]json_records) + j := int(0) + for _, value := range es_index_list { + result_index[j] = json_records{value, EsSearch(value, es_request_time_range)} + j++ + } + log.Println(result_index) +} + +func EsClusterInfo(es *elasticsearch.Client) { + var ( + r map[string]interface{} + ) + // 1. Get cluster info + res, err := es.Info() + if err != nil { + log.Fatalf("Error getting response: %s", err) + } + defer res.Body.Close() + // Check response status + if res.IsError() { + log.Fatalf("Error: %s", res.String()) + } + // Deserialize the response into a map. + if err := json.NewDecoder(res.Body).Decode(&r); err != nil { + log.Fatalf("Error parsing the response body: %s", err) + } + // Print client and server version numbers. + fmt.Printf("Client: %s\n", elasticsearch.Version) + fmt.Printf("Client: %s\n", elasticsearch.Version) + fmt.Printf("Server: %s\n", r["version"].(map[string]interface{})["number"]) +} + +func EsIndexFieldExists(es *elasticsearch.Client, index_name string, field_name string) bool { + // Check if the filed exists into index + var ( + fields_request map[string]interface{} + fields []string + indices []string + field_exists bool + ) + fields = append(fields, field_name) + indices = append(indices, index_name) + + req := esapi.FieldCapsRequest{ + // Index: []string{}, + Index: indices, + Fields: fields, + } + + res, err := req.Do(context.Background(), es) + if err != nil { + log.Fatalf("ERROR: %s", err) + } + if err := json.NewDecoder(res.Body).Decode(&fields_request); err != nil { + log.Fatalf("->Error parsing the response body: %s", err) + } + + for i, f := range fields_request { + if i == "fields" { + // if f.(map[string]interface{})[`@timestamp`] == nil { + if f.(map[string]interface{})[field_name] == nil { + field_exists = false + } else { + field_exists = true + } + } + } + return (field_exists) +} + +func EsGetIndices(es *elasticsearch.Client) []string { + //Get indices list + var ( + index_request []map[string]interface{} + es_index_list []string + ) + req := esapi.CatIndicesRequest{ + // Index: []string{}, + Format: "JSON", + // ExpandWildcards, + } + + res, err := req.Do(context.Background(), es) + if err != nil { + log.Fatalf("ERROR: %s", err) + } + + if err := json.NewDecoder(res.Body).Decode(&index_request); err != nil { + log.Fatalf("Error parsing the response body: %s", err) + } + for _, hit := range index_request { + m := regexp.MustCompile("-[0-9]") + // позиция вхождения регулярки в имени индекса + i := m.FindStringIndex(hit["index"].(string)) + // наименование индекса + index_name := hit["index"].(string) + + // Checking the @timestamp field exists + if !EsIndexFieldExists(es, index_name, `@timestamp`) { + continue + // fmt.Println(EsIndexFieldExists(es, index_name)) + } + + // Итоговое наименование индекса + var index_name_short string + // Исключаем индексы от кибаны (.kibana-*) и обрезаем цифры (даты и версии) + if !strings.HasPrefix(index_name, ".") { + if i != nil { + index_name_short = strings.TrimSpace(index_name[0:i[0]]) + } else { + index_name_short = strings.TrimSpace(index_name) + } + if !StringContains(es_index_list, index_name_short) { + es_index_list = append(es_index_list, index_name_short) + } + } + } + return (es_index_list) +} + +func ZabbixSender(zabbix_host string, metric_name string, metric_value string) bool { + var ( + metrics []*zabbix.Metric + zabbix_port int + err error + ) + zabbix_server := os.Getenv("ZABBIX_SERVER") + + // Read and checkin zabbix server and port + if zabbix_server == "" { + log.Println("Login error: make sure environment variables `ZABBIX_SERVER` are defined") + os.Exit(1) + } + if os.Getenv("ZABBIX_PORT") == "" { + log.Println("Login error: make sure environment variables `ZABBIX_PORT` are defined") + os.Exit(1) + } + if zabbix_port, err = strconv.Atoi(os.Getenv("ZABBIX_PORT")); err != nil { + log.Println(zabbix_port, "Zabbix port value error") + } + + metrics = append(metrics, zabbix.NewMetric(zabbix_host, metric_name, metric_value, time.Now().Unix())) + metrics = append(metrics, zabbix.NewMetric(zabbix_host, "status", "OK")) + + // Create instance of Packet class + packet := zabbix.NewPacket(metrics) + // fmt.Println(zabbix_host, metric_name, metric_value, metrics) + // Send packet to zabbix + z := zabbix.NewSender(zabbix_server, zabbix_port) + res, err := z.Send(packet) + log.Println(string(res)) + return true +} + +//--------------------------------------------------------------------------- +func main() { + var ( + operation string + es_index_name string + es_request_time_range int + zabbix_send bool + // zabbix_port int + ) + + flag.StringVar(&es_index_name, "indexname", "", "Elasticsearch index name pattern, (like \"filebeat\")") + flag.IntVar(&es_request_time_range, "timerange", 6, "Elasticsearch time range for records count into hours") + flag.BoolVar(&zabbix_send, "zabbix-send", false, "Send metrics or discovery data into zabbix") + + flag.StringVar(&operation, "operation", "es-cluster-info", "Opertation type, must be:\n\tes-cluster-info - ES Cluster information (version e.t.c)\n\tes-get-indices - geting all index\n\tes-indices-discover - getting es index pattern list\n\tes-records-count - getting the number of records for a time range for all index pattern\n\tes-index-records-count - getting records count for one index (used with -indexname option") + + flag.Parse() + + zabbix_host := os.Getenv("ZABBIX_HOST") + if zabbix_host == "" { + fmt.Println("Send error: make sure environment variables `ZABBIX_HOST`") + os.Exit(1) + } + // if es_request_time_range > 24 { + // log.Println("Error: Time range must be a number between 1 and 24") + // os.Exit(1) + // } + + cfg := elasticsearch.Config{ + Transport: &http.Transport{ + MaxIdleConnsPerHost: 10, + ResponseHeaderTimeout: 30 * time.Second, + DialContext: (&net.Dialer{Timeout: time.Second}).DialContext, + TLSClientConfig: &tls.Config{ + MaxVersion: tls.VersionTLS11, + InsecureSkipVerify: true, + }, + }, + } + + es, err := elasticsearch.NewClient(cfg) + if err != nil { + log.Fatalf("Error creating the client: %s", err) + } + // fmt.Println(operation) + switch operation { + case "es-cluster-info": + EsClusterInfo(es) + case "es-get-indices": + // fmt.Println(operation) + for _, index := range EsGetIndices(es) { + fmt.Println(index) + } + case "es-indices-discover": + result_discovery_json := EsIndexDiscover(EsGetIndices(es)) + // fmt.Println(zabbix_send) + if zabbix_send == true { + ZabbixSender(zabbix_host, "indices", result_discovery_json) + } + fmt.Println(result_discovery_json) + case "es-records-count": + // records count for all indices + // EsIndexRecordsCount(EsGetIndices(es), es_request_time_range) + result_index := make(map[int]json_records) + j := int(0) + for _, es_index_name := range EsGetIndices(es) { + result := EsSearch(es_index_name, es_request_time_range) + result_index[j] = json_records{es_index_name, result} + log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result) + if zabbix_send == true { + ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result)) + ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, strconv.Itoa(es_request_time_range)) + } + j++ + } + case "es-index-records-count": + // records count for one index + result := EsSearch(es_index_name, es_request_time_range) + if zabbix_send == true { + ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result)) + ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, strconv.Itoa(es_request_time_range)) + } + log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result) + } +} diff --git a/es-monitoring/go.mod b/es-monitoring/go.mod new file mode 100644 index 0000000..59fa9f2 --- /dev/null +++ b/es-monitoring/go.mod @@ -0,0 +1,12 @@ +module my-elasticsearch-app + +go 1.16 + +require github.com/elastic/go-elasticsearch/v7 v7.5.1-0.20210322101442-a3e161131102 + +require ( + github.com/adubkov/go-zabbix v0.0.0-20170118040903-3c6a95ec4fdc // indirect + github.com/cavaliercoder/go-zabbix v0.0.0-20210304010121-96120c17dd42 // indirect + github.com/nixys/nxs-go-zabbix/v5 v5.0.0 + github.com/rday/zabbix v0.0.0-20170517233925-1cf60ccd42f9 // indirect +) diff --git a/es-monitoring/go.sum b/es-monitoring/go.sum new file mode 100644 index 0000000..67d6a4b --- /dev/null +++ b/es-monitoring/go.sum @@ -0,0 +1,14 @@ +github.com/adubkov/go-zabbix v0.0.0-20170118040903-3c6a95ec4fdc h1:gqqI4ZPa7uwK+gX9Zgk2AweAh+2dX0FpETcXTsA2TrE= +github.com/adubkov/go-zabbix v0.0.0-20170118040903-3c6a95ec4fdc/go.mod h1:ihDXRSVen590YHlXIrv00CcmRrL6pUho/Iwm3ZmM8n8= +github.com/cavaliercoder/go-zabbix v0.0.0-20210304010121-96120c17dd42 h1:S+MAp8YOH/TkyOTHa1/z/reraTs7fJL0xBOx5+3bA78= +github.com/cavaliercoder/go-zabbix v0.0.0-20210304010121-96120c17dd42/go.mod h1:o9iZ0ep18zjkTdG1yoCmBZSMAWo2qUXVMxqmEl+6GLo= +github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= +github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= +github.com/elastic/go-elasticsearch/v7 v7.5.1-0.20210322101442-a3e161131102 h1:o4HAbzLv9EOWc6ue8fyB00AcE78Y3fp8FD9RYy0z73M= +github.com/elastic/go-elasticsearch/v7 v7.5.1-0.20210322101442-a3e161131102/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/mitchellh/mapstructure v1.3.0 h1:iDwIio/3gk2QtLLEsqU5lInaMzos0hDTz8a6lazSFVw= +github.com/mitchellh/mapstructure v1.3.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/nixys/nxs-go-zabbix/v5 v5.0.0 h1:AvVNPs/w2xSampQn25KnvNOx49O9mE5KuIlLrSTXpnE= +github.com/nixys/nxs-go-zabbix/v5 v5.0.0/go.mod h1:9R4cq2L8wHvVAz0ZfNIvzyXRUGCsrJOd7HvWiRoz6TI= +github.com/rday/zabbix v0.0.0-20170517233925-1cf60ccd42f9 h1:ARUuqpY5LjSR4//P6TolxJP81zjO+qNuOlEVD4mvIfs= +github.com/rday/zabbix v0.0.0-20170517233925-1cf60ccd42f9/go.mod h1:IIFS/h+mpikvp6WVUQwg9fO8IlGy7q3LE1wt3MeWhvg= diff --git a/es-monitoring/zabbix_templates/Template_ES_Indices_Monitoring.xml b/es-monitoring/zabbix_templates/Template_ES_Indices_Monitoring.xml new file mode 100644 index 0000000..146cd44 --- /dev/null +++ b/es-monitoring/zabbix_templates/Template_ES_Indices_Monitoring.xml @@ -0,0 +1,67 @@ + + + 5.0 + 2021-04-05T12:40:38Z + + + Elastic cluster + + + + + +