|
|
@ -0,0 +1,405 @@ |
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
// Getting and monitoring Elasticsearch indices and send the metrics into zabbix
|
|
|
|
//-------------------------------------------------------------------------------
|
|
|
|
// Author: Sergey Kalinin
|
|
|
|
// e-mail: svk@nuk-svk.ru
|
|
|
|
// website: https://nuk-svk.ru
|
|
|
|
// Git repos: https://git.nuk-svk.ru
|
|
|
|
//-------------------------------------------------------------------------------
|
|
|
|
// Require:
|
|
|
|
// github.com/adubkov/go-zabbix
|
|
|
|
// github.com/elastic/go-elasticsearch
|
|
|
|
//-------------------------------------------------------------------------------
|
|
|
|
// ZABBIX_SERVER="zabbix"
|
|
|
|
// ZABBIX_PORT="10051"
|
|
|
|
// ZABBIX_HOST="elastic cluster"
|
|
|
|
// ELASTICSEARCH_URL="https://user:pass@elastic:9200"
|
|
|
|
//-------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
package main |
|
|
|
|
|
|
|
import ( |
|
|
|
"bytes" |
|
|
|
"context" |
|
|
|
"crypto/tls" |
|
|
|
"encoding/json" |
|
|
|
"flag" |
|
|
|
"fmt" |
|
|
|
"log" |
|
|
|
"net" |
|
|
|
"net/http" |
|
|
|
"os" |
|
|
|
"regexp" |
|
|
|
"strconv" |
|
|
|
"strings" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/adubkov/go-zabbix" |
|
|
|
"github.com/elastic/go-elasticsearch/v7" |
|
|
|
"github.com/elastic/go-elasticsearch/v7/esapi" |
|
|
|
) |
|
|
|
|
|
|
|
type json_records struct { |
|
|
|
INDEX string |
|
|
|
RECORD_COUNT int |
|
|
|
} |
|
|
|
|
|
|
|
// Определяем входит ли строка в slice
|
|
|
|
func StringContains(a []string, x string) bool { |
|
|
|
for _, n := range a { |
|
|
|
if x == n { |
|
|
|
return true |
|
|
|
} |
|
|
|
} |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
func EsSearch(index_name string, es_request_time_range int) int { |
|
|
|
var ( |
|
|
|
r map[string]interface{} |
|
|
|
) |
|
|
|
request_time_range := "now-" + strconv.Itoa(es_request_time_range) + "h" |
|
|
|
|
|
|
|
cfg := elasticsearch.Config{ |
|
|
|
// Addresses: []string{
|
|
|
|
// "http://localhost:9200",
|
|
|
|
// "http://localhost:9201",
|
|
|
|
// },
|
|
|
|
// Username: "foo",
|
|
|
|
// Password: "bar",
|
|
|
|
Transport: &http.Transport{ |
|
|
|
MaxIdleConnsPerHost: 10, |
|
|
|
ResponseHeaderTimeout: 60 * time.Second, |
|
|
|
DialContext: (&net.Dialer{Timeout: time.Second}).DialContext, |
|
|
|
TLSClientConfig: &tls.Config{ |
|
|
|
MaxVersion: tls.VersionTLS11, |
|
|
|
InsecureSkipVerify: true, |
|
|
|
}, |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
es, err := elasticsearch.NewClient(cfg) |
|
|
|
if err != nil { |
|
|
|
log.Fatalf("Error creating the client: %s", err) |
|
|
|
} |
|
|
|
res, err := es.Info() |
|
|
|
if err != nil { |
|
|
|
log.Fatalf("Error getting response: %s", err) |
|
|
|
} |
|
|
|
defer res.Body.Close() |
|
|
|
// Check response status
|
|
|
|
if res.IsError() { |
|
|
|
log.Fatalf("Error: %s", res.String()) |
|
|
|
} |
|
|
|
var buf bytes.Buffer |
|
|
|
query := map[string]interface{}{ |
|
|
|
"query": map[string]interface{}{ |
|
|
|
"range": map[string]interface{}{ |
|
|
|
"@timestamp": map[string]interface{}{ |
|
|
|
// "gt": "now-1h",
|
|
|
|
"gt": request_time_range, |
|
|
|
}, |
|
|
|
// "match": map[string]interface{}{
|
|
|
|
// "host": "10.250.1.212",
|
|
|
|
}, |
|
|
|
}, |
|
|
|
"size": 1, |
|
|
|
"sort": map[string]interface{}{ |
|
|
|
"@timestamp": map[string]interface{}{ |
|
|
|
"order": "desc", |
|
|
|
}, |
|
|
|
}, |
|
|
|
} |
|
|
|
if err := json.NewEncoder(&buf).Encode(query); err != nil { |
|
|
|
log.Fatalf("Error encoding query: %s", err) |
|
|
|
} |
|
|
|
|
|
|
|
// Perform the search request.
|
|
|
|
res, err = es.Search( |
|
|
|
es.Search.WithContext(context.Background()), |
|
|
|
es.Search.WithIndex(index_name+"*"), |
|
|
|
es.Search.WithBody(&buf), |
|
|
|
es.Search.WithTrackTotalHits(true), |
|
|
|
es.Search.WithPretty(), |
|
|
|
) |
|
|
|
if err != nil { |
|
|
|
log.Fatalf("Error getting response: %s", err) |
|
|
|
} |
|
|
|
defer res.Body.Close() |
|
|
|
|
|
|
|
if res.IsError() { |
|
|
|
var e map[string]interface{} |
|
|
|
if err := json.NewDecoder(res.Body).Decode(&e); err != nil { |
|
|
|
log.Fatalf("Error parsing the response body: %s", err) |
|
|
|
} else { |
|
|
|
// Print the response status and error information.
|
|
|
|
log.Fatalf("[%s] %s: %s", |
|
|
|
res.Status(), |
|
|
|
e["error"].(map[string]interface{})["type"], |
|
|
|
e["error"].(map[string]interface{})["reason"], |
|
|
|
) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if err := json.NewDecoder(res.Body).Decode(&r); err != nil { |
|
|
|
log.Fatalf("Error parsing the response body: %s", err) |
|
|
|
} |
|
|
|
// Вывод статуса, количество выбранных записей и времени запроса
|
|
|
|
hits := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)) |
|
|
|
return hits |
|
|
|
} |
|
|
|
|
|
|
|
func EsIndexDiscover(es_index_list []string) string { |
|
|
|
result_discovery_json := string("{\"data\": [") |
|
|
|
|
|
|
|
for _, value := range es_index_list { |
|
|
|
// result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\",\"{#ITEM_TYPE}\":\"Records count\"" + "},"
|
|
|
|
result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\"" + "}," |
|
|
|
} |
|
|
|
result_discovery_json = strings.TrimRight(result_discovery_json, ",") + "]}" |
|
|
|
// fmt.Println(result_discovery_json)
|
|
|
|
return (result_discovery_json) |
|
|
|
} |
|
|
|
|
|
|
|
func EsIndexRecordsCount(es_index_list []string, es_request_time_range int) { |
|
|
|
// fmt.Println(es_request_time_range)
|
|
|
|
result_index := make(map[int]json_records) |
|
|
|
j := int(0) |
|
|
|
for _, value := range es_index_list { |
|
|
|
result_index[j] = json_records{value, EsSearch(value, es_request_time_range)} |
|
|
|
j++ |
|
|
|
} |
|
|
|
log.Println(result_index) |
|
|
|
} |
|
|
|
|
|
|
|
func EsClusterInfo(es *elasticsearch.Client) { |
|
|
|
var ( |
|
|
|
r map[string]interface{} |
|
|
|
) |
|
|
|
// 1. Get cluster info
|
|
|
|
res, err := es.Info() |
|
|
|
if err != nil { |
|
|
|
log.Fatalf("Error getting response: %s", err) |
|
|
|
} |
|
|
|
defer res.Body.Close() |
|
|
|
// Check response status
|
|
|
|
if res.IsError() { |
|
|
|
log.Fatalf("Error: %s", res.String()) |
|
|
|
} |
|
|
|
// Deserialize the response into a map.
|
|
|
|
if err := json.NewDecoder(res.Body).Decode(&r); err != nil { |
|
|
|
log.Fatalf("Error parsing the response body: %s", err) |
|
|
|
} |
|
|
|
// Print client and server version numbers.
|
|
|
|
fmt.Printf("Client: %s\n", elasticsearch.Version) |
|
|
|
fmt.Printf("Client: %s\n", elasticsearch.Version) |
|
|
|
fmt.Printf("Server: %s\n", r["version"].(map[string]interface{})["number"]) |
|
|
|
} |
|
|
|
|
|
|
|
func EsIndexFieldExists(es *elasticsearch.Client, index_name string, field_name string) bool { |
|
|
|
// Check if the filed exists into index
|
|
|
|
var ( |
|
|
|
fields_request map[string]interface{} |
|
|
|
fields []string |
|
|
|
indices []string |
|
|
|
field_exists bool |
|
|
|
) |
|
|
|
fields = append(fields, field_name) |
|
|
|
indices = append(indices, index_name) |
|
|
|
|
|
|
|
req := esapi.FieldCapsRequest{ |
|
|
|
// Index: []string{},
|
|
|
|
Index: indices, |
|
|
|
Fields: fields, |
|
|
|
} |
|
|
|
|
|
|
|
res, err := req.Do(context.Background(), es) |
|
|
|
if err != nil { |
|
|
|
log.Fatalf("ERROR: %s", err) |
|
|
|
} |
|
|
|
if err := json.NewDecoder(res.Body).Decode(&fields_request); err != nil { |
|
|
|
log.Fatalf("->Error parsing the response body: %s", err) |
|
|
|
} |
|
|
|
|
|
|
|
for i, f := range fields_request { |
|
|
|
if i == "fields" { |
|
|
|
// if f.(map[string]interface{})[`@timestamp`] == nil {
|
|
|
|
if f.(map[string]interface{})[field_name] == nil { |
|
|
|
field_exists = false |
|
|
|
} else { |
|
|
|
field_exists = true |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return (field_exists) |
|
|
|
} |
|
|
|
|
|
|
|
func EsGetIndices(es *elasticsearch.Client) []string { |
|
|
|
//Get indices list
|
|
|
|
var ( |
|
|
|
index_request []map[string]interface{} |
|
|
|
es_index_list []string |
|
|
|
) |
|
|
|
req := esapi.CatIndicesRequest{ |
|
|
|
// Index: []string{},
|
|
|
|
Format: "JSON", |
|
|
|
// ExpandWildcards,
|
|
|
|
} |
|
|
|
|
|
|
|
res, err := req.Do(context.Background(), es) |
|
|
|
if err != nil { |
|
|
|
log.Fatalf("ERROR: %s", err) |
|
|
|
} |
|
|
|
|
|
|
|
if err := json.NewDecoder(res.Body).Decode(&index_request); err != nil { |
|
|
|
log.Fatalf("Error parsing the response body: %s", err) |
|
|
|
} |
|
|
|
for _, hit := range index_request { |
|
|
|
m := regexp.MustCompile("-[0-9]") |
|
|
|
// позиция вхождения регулярки в имени индекса
|
|
|
|
i := m.FindStringIndex(hit["index"].(string)) |
|
|
|
// наименование индекса
|
|
|
|
index_name := hit["index"].(string) |
|
|
|
|
|
|
|
// Checking the @timestamp field exists
|
|
|
|
if !EsIndexFieldExists(es, index_name, `@timestamp`) { |
|
|
|
continue |
|
|
|
// fmt.Println(EsIndexFieldExists(es, index_name))
|
|
|
|
} |
|
|
|
|
|
|
|
// Итоговое наименование индекса
|
|
|
|
var index_name_short string |
|
|
|
// Исключаем индексы от кибаны (.kibana-*) и обрезаем цифры (даты и версии)
|
|
|
|
if !strings.HasPrefix(index_name, ".") { |
|
|
|
if i != nil { |
|
|
|
index_name_short = strings.TrimSpace(index_name[0:i[0]]) |
|
|
|
} else { |
|
|
|
index_name_short = strings.TrimSpace(index_name) |
|
|
|
} |
|
|
|
if !StringContains(es_index_list, index_name_short) { |
|
|
|
es_index_list = append(es_index_list, index_name_short) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return (es_index_list) |
|
|
|
} |
|
|
|
|
|
|
|
func ZabbixSender(zabbix_host string, metric_name string, metric_value string) bool { |
|
|
|
var ( |
|
|
|
metrics []*zabbix.Metric |
|
|
|
zabbix_port int |
|
|
|
err error |
|
|
|
) |
|
|
|
zabbix_server := os.Getenv("ZABBIX_SERVER") |
|
|
|
|
|
|
|
// Read and checkin zabbix server and port
|
|
|
|
if zabbix_server == "" { |
|
|
|
log.Println("Login error: make sure environment variables `ZABBIX_SERVER` are defined") |
|
|
|
os.Exit(1) |
|
|
|
} |
|
|
|
if os.Getenv("ZABBIX_PORT") == "" { |
|
|
|
log.Println("Login error: make sure environment variables `ZABBIX_PORT` are defined") |
|
|
|
os.Exit(1) |
|
|
|
} |
|
|
|
if zabbix_port, err = strconv.Atoi(os.Getenv("ZABBIX_PORT")); err != nil { |
|
|
|
log.Println(zabbix_port, "Zabbix port value error") |
|
|
|
} |
|
|
|
|
|
|
|
metrics = append(metrics, zabbix.NewMetric(zabbix_host, metric_name, metric_value, time.Now().Unix())) |
|
|
|
metrics = append(metrics, zabbix.NewMetric(zabbix_host, "status", "OK")) |
|
|
|
|
|
|
|
// Create instance of Packet class
|
|
|
|
packet := zabbix.NewPacket(metrics) |
|
|
|
// fmt.Println(zabbix_host, metric_name, metric_value, metrics)
|
|
|
|
// Send packet to zabbix
|
|
|
|
z := zabbix.NewSender(zabbix_server, zabbix_port) |
|
|
|
res, err := z.Send(packet) |
|
|
|
log.Println(string(res)) |
|
|
|
return true |
|
|
|
} |
|
|
|
|
|
|
|
//---------------------------------------------------------------------------
|
|
|
|
func main() { |
|
|
|
var ( |
|
|
|
operation string |
|
|
|
es_index_name string |
|
|
|
es_request_time_range int |
|
|
|
zabbix_send bool |
|
|
|
// zabbix_port int
|
|
|
|
) |
|
|
|
|
|
|
|
flag.StringVar(&es_index_name, "indexname", "", "Elasticsearch index name pattern, (like \"filebeat\")") |
|
|
|
flag.IntVar(&es_request_time_range, "timerange", 6, "Elasticsearch time range for records count into hours") |
|
|
|
flag.BoolVar(&zabbix_send, "zabbix-send", false, "Send metrics or discovery data into zabbix") |
|
|
|
|
|
|
|
flag.StringVar(&operation, "operation", "es-cluster-info", "Opertation type, must be:\n\tes-cluster-info - ES Cluster information (version e.t.c)\n\tes-get-indices - geting all index\n\tes-indices-discover - getting es index pattern list\n\tes-records-count - getting the number of records for a time range for all index pattern\n\tes-index-records-count - getting records count for one index (used with -indexname option") |
|
|
|
|
|
|
|
flag.Parse() |
|
|
|
|
|
|
|
zabbix_host := os.Getenv("ZABBIX_HOST") |
|
|
|
if zabbix_host == "" { |
|
|
|
fmt.Println("Send error: make sure environment variables `ZABBIX_HOST`") |
|
|
|
os.Exit(1) |
|
|
|
} |
|
|
|
// if es_request_time_range > 24 {
|
|
|
|
// log.Println("Error: Time range must be a number between 1 and 24")
|
|
|
|
// os.Exit(1)
|
|
|
|
// }
|
|
|
|
|
|
|
|
cfg := elasticsearch.Config{ |
|
|
|
Transport: &http.Transport{ |
|
|
|
MaxIdleConnsPerHost: 10, |
|
|
|
ResponseHeaderTimeout: 30 * time.Second, |
|
|
|
DialContext: (&net.Dialer{Timeout: time.Second}).DialContext, |
|
|
|
TLSClientConfig: &tls.Config{ |
|
|
|
MaxVersion: tls.VersionTLS11, |
|
|
|
InsecureSkipVerify: true, |
|
|
|
}, |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
es, err := elasticsearch.NewClient(cfg) |
|
|
|
if err != nil { |
|
|
|
log.Fatalf("Error creating the client: %s", err) |
|
|
|
} |
|
|
|
// fmt.Println(operation)
|
|
|
|
switch operation { |
|
|
|
case "es-cluster-info": |
|
|
|
EsClusterInfo(es) |
|
|
|
case "es-get-indices": |
|
|
|
// fmt.Println(operation)
|
|
|
|
for _, index := range EsGetIndices(es) { |
|
|
|
fmt.Println(index) |
|
|
|
} |
|
|
|
case "es-indices-discover": |
|
|
|
result_discovery_json := EsIndexDiscover(EsGetIndices(es)) |
|
|
|
// fmt.Println(zabbix_send)
|
|
|
|
if zabbix_send == true { |
|
|
|
ZabbixSender(zabbix_host, "indices", result_discovery_json) |
|
|
|
} |
|
|
|
fmt.Println(result_discovery_json) |
|
|
|
case "es-records-count": |
|
|
|
// records count for all indices
|
|
|
|
// EsIndexRecordsCount(EsGetIndices(es), es_request_time_range)
|
|
|
|
result_index := make(map[int]json_records) |
|
|
|
j := int(0) |
|
|
|
for _, es_index_name := range EsGetIndices(es) { |
|
|
|
result := EsSearch(es_index_name, es_request_time_range) |
|
|
|
result_index[j] = json_records{es_index_name, result} |
|
|
|
log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result) |
|
|
|
if zabbix_send == true { |
|
|
|
ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result)) |
|
|
|
ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, strconv.Itoa(es_request_time_range)) |
|
|
|
} |
|
|
|
j++ |
|
|
|
} |
|
|
|
case "es-index-records-count": |
|
|
|
// records count for one index
|
|
|
|
result := EsSearch(es_index_name, es_request_time_range) |
|
|
|
if zabbix_send == true { |
|
|
|
ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result)) |
|
|
|
ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, strconv.Itoa(es_request_time_range)) |
|
|
|
} |
|
|
|
log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result) |
|
|
|
} |
|
|
|
} |