//------------------------------------------------------------------------------ // Getting and monitoring Elasticsearch indices and send the metrics into zabbix //------------------------------------------------------------------------------- // Author: Sergey Kalinin // e-mail: svk@nuk-svk.ru // website: https://nuk-svk.ru // Git repos: https://git.nuk-svk.ru //------------------------------------------------------------------------------- // Require: // github.com/adubkov/go-zabbix // github.com/elastic/go-elasticsearch //------------------------------------------------------------------------------- // ZABBIX_SERVER="zabbix" // ZABBIX_PORT="10051" // ZABBIX_HOST="elastic cluster" // ELASTICSEARCH_URL="https://user:pass@elastic:9200" //------------------------------------------------------------------------------- package main import ( "bytes" "context" "crypto/tls" "encoding/json" "flag" "fmt" "log" "net" "net/http" "os" "regexp" "strconv" "strings" "time" "github.com/adubkov/go-zabbix" "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" ) type json_records struct { INDEX string RECORD_COUNT int } // Определяем входит ли строка в slice func StringContains(a []string, x string) bool { for _, n := range a { if x == n { return true } } return false } func EsSearch(index_name string, es_request_time_range int) int { var ( r map[string]interface{} ) request_time_range := "now-" + strconv.Itoa(es_request_time_range) + "h" cfg := elasticsearch.Config{ // Addresses: []string{ // "http://localhost:9200", // "http://localhost:9201", // }, // Username: "foo", // Password: "bar", Transport: &http.Transport{ MaxIdleConnsPerHost: 10, ResponseHeaderTimeout: 60 * time.Second, DialContext: (&net.Dialer{Timeout: time.Second}).DialContext, TLSClientConfig: &tls.Config{ MaxVersion: tls.VersionTLS11, InsecureSkipVerify: true, }, }, } es, err := elasticsearch.NewClient(cfg) if err != nil { log.Fatalf("Error creating the client: %s", err) } res, err := es.Info() if err != nil { log.Fatalf("Error getting response: %s", err) } defer res.Body.Close() // Check response status if res.IsError() { log.Fatalf("Error: %s", res.String()) } var buf bytes.Buffer query := map[string]interface{}{ "query": map[string]interface{}{ "range": map[string]interface{}{ "@timestamp": map[string]interface{}{ // "gt": "now-1h", "gt": request_time_range, }, // "match": map[string]interface{}{ // "host": "10.250.1.212", }, }, "size": 1, "sort": map[string]interface{}{ "@timestamp": map[string]interface{}{ "order": "desc", }, }, } if err := json.NewEncoder(&buf).Encode(query); err != nil { log.Fatalf("Error encoding query: %s", err) } // Perform the search request. res, err = es.Search( es.Search.WithContext(context.Background()), es.Search.WithIndex(index_name+"*"), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), es.Search.WithPretty(), ) if err != nil { log.Fatalf("Error getting response: %s", err) } defer res.Body.Close() if res.IsError() { var e map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&e); err != nil { log.Fatalf("Error parsing the response body: %s", err) } else { // Print the response status and error information. log.Fatalf("[%s] %s: %s", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["reason"], ) } } if err := json.NewDecoder(res.Body).Decode(&r); err != nil { log.Fatalf("Error parsing the response body: %s", err) } // Вывод статуса, количество выбранных записей и времени запроса hits := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)) return hits } func EsIndexDiscover(es_index_list []string) string { result_discovery_json := string("{\"data\": [") for _, value := range es_index_list { // result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\",\"{#ITEM_TYPE}\":\"Records count\"" + "}," result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\"" + "}," } result_discovery_json = strings.TrimRight(result_discovery_json, ",") + "]}" // fmt.Println(result_discovery_json) return (result_discovery_json) } func EsIndexRecordsCount(es_index_list []string, es_request_time_range int) { // fmt.Println(es_request_time_range) result_index := make(map[int]json_records) j := int(0) for _, value := range es_index_list { result_index[j] = json_records{value, EsSearch(value, es_request_time_range)} j++ } log.Println(result_index) } func EsClusterInfo(es *elasticsearch.Client) { var ( r map[string]interface{} ) // 1. Get cluster info res, err := es.Info() if err != nil { log.Fatalf("Error getting response: %s", err) } defer res.Body.Close() // Check response status if res.IsError() { log.Fatalf("Error: %s", res.String()) } // Deserialize the response into a map. if err := json.NewDecoder(res.Body).Decode(&r); err != nil { log.Fatalf("Error parsing the response body: %s", err) } // Print client and server version numbers. fmt.Printf("Client: %s\n", elasticsearch.Version) fmt.Printf("Client: %s\n", elasticsearch.Version) fmt.Printf("Server: %s\n", r["version"].(map[string]interface{})["number"]) } func EsIndexFieldExists(es *elasticsearch.Client, index_name string, field_name string) bool { // Check if the filed exists into index var ( fields_request map[string]interface{} fields []string indices []string field_exists bool ) fields = append(fields, field_name) indices = append(indices, index_name) req := esapi.FieldCapsRequest{ // Index: []string{}, Index: indices, Fields: fields, } res, err := req.Do(context.Background(), es) if err != nil { log.Fatalf("ERROR: %s", err) } if err := json.NewDecoder(res.Body).Decode(&fields_request); err != nil { log.Fatalf("->Error parsing the response body: %s", err) } for i, f := range fields_request { if i == "fields" { // if f.(map[string]interface{})[`@timestamp`] == nil { if f.(map[string]interface{})[field_name] == nil { field_exists = false } else { field_exists = true } } } return (field_exists) } func EsGetIndices(es *elasticsearch.Client) []string { //Get indices list var ( index_request []map[string]interface{} es_index_list []string ) req := esapi.CatIndicesRequest{ // Index: []string{}, Format: "JSON", // ExpandWildcards, } res, err := req.Do(context.Background(), es) if err != nil { log.Fatalf("ERROR: %s", err) } if err := json.NewDecoder(res.Body).Decode(&index_request); err != nil { log.Fatalf("Error parsing the response body: %s", err) } for _, hit := range index_request { m := regexp.MustCompile("-[0-9]") // позиция вхождения регулярки в имени индекса i := m.FindStringIndex(hit["index"].(string)) // наименование индекса index_name := hit["index"].(string) // Checking the @timestamp field exists if !EsIndexFieldExists(es, index_name, `@timestamp`) { continue // fmt.Println(EsIndexFieldExists(es, index_name)) } // Итоговое наименование индекса var index_name_short string // Исключаем индексы от кибаны (.kibana-*) и обрезаем цифры (даты и версии) if !strings.HasPrefix(index_name, ".") { if i != nil { index_name_short = strings.TrimSpace(index_name[0:i[0]]) } else { index_name_short = strings.TrimSpace(index_name) } if !StringContains(es_index_list, index_name_short) { es_index_list = append(es_index_list, index_name_short) } } } return (es_index_list) } func ZabbixSender(zabbix_host string, metric_name string, metric_value string) bool { var ( metrics []*zabbix.Metric zabbix_port int err error ) zabbix_server := os.Getenv("ZABBIX_SERVER") // Read and checkin zabbix server and port if zabbix_server == "" { log.Println("Login error: make sure environment variables `ZABBIX_SERVER` are defined") os.Exit(1) } if os.Getenv("ZABBIX_PORT") == "" { log.Println("Login error: make sure environment variables `ZABBIX_PORT` are defined") os.Exit(1) } if zabbix_port, err = strconv.Atoi(os.Getenv("ZABBIX_PORT")); err != nil { log.Println(zabbix_port, "Zabbix port value error") } metrics = append(metrics, zabbix.NewMetric(zabbix_host, metric_name, metric_value, time.Now().Unix())) metrics = append(metrics, zabbix.NewMetric(zabbix_host, "status", "OK")) // Create instance of Packet class packet := zabbix.NewPacket(metrics) // fmt.Println(zabbix_host, metric_name, metric_value, metrics) // Send packet to zabbix z := zabbix.NewSender(zabbix_server, zabbix_port) res, err := z.Send(packet) log.Println(string(res)) return true } //--------------------------------------------------------------------------- func main() { var ( operation string es_index_name string es_request_time_range int zabbix_send bool // zabbix_port int ) flag.StringVar(&es_index_name, "indexname", "", "Elasticsearch index name pattern, (like \"filebeat\")") flag.IntVar(&es_request_time_range, "timerange", 6, "Elasticsearch time range for records count into hours") flag.BoolVar(&zabbix_send, "zabbix-send", false, "Send metrics or discovery data into zabbix") flag.StringVar(&operation, "operation", "es-cluster-info", "Opertation type, must be:\n\tes-cluster-info - ES Cluster information (version e.t.c)\n\tes-get-indices - geting all index\n\tes-indices-discover - getting es index pattern list\n\tes-records-count - getting the number of records for a time range for all index pattern\n\tes-index-records-count - getting records count for one index (used with -indexname option") flag.Parse() zabbix_host := os.Getenv("ZABBIX_HOST") if zabbix_host == "" { fmt.Println("Send error: make sure environment variables `ZABBIX_HOST`") os.Exit(1) } // if es_request_time_range > 24 { // log.Println("Error: Time range must be a number between 1 and 24") // os.Exit(1) // } cfg := elasticsearch.Config{ Transport: &http.Transport{ MaxIdleConnsPerHost: 10, ResponseHeaderTimeout: 30 * time.Second, DialContext: (&net.Dialer{Timeout: time.Second}).DialContext, TLSClientConfig: &tls.Config{ MaxVersion: tls.VersionTLS11, InsecureSkipVerify: true, }, }, } es, err := elasticsearch.NewClient(cfg) if err != nil { log.Fatalf("Error creating the client: %s", err) } // fmt.Println(operation) switch operation { case "es-cluster-info": EsClusterInfo(es) case "es-get-indices": // fmt.Println(operation) for _, index := range EsGetIndices(es) { fmt.Println(index) } case "es-indices-discover": result_discovery_json := EsIndexDiscover(EsGetIndices(es)) // fmt.Println(zabbix_send) if zabbix_send == true { ZabbixSender(zabbix_host, "indices", result_discovery_json) } fmt.Println(result_discovery_json) case "es-records-count": // records count for all indices // EsIndexRecordsCount(EsGetIndices(es), es_request_time_range) result_index := make(map[int]json_records) j := int(0) for _, es_index_name := range EsGetIndices(es) { result := EsSearch(es_index_name, es_request_time_range) result_index[j] = json_records{es_index_name, result} log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result) if zabbix_send == true { ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result)) ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, strconv.Itoa(es_request_time_range)) } j++ } case "es-index-records-count": // records count for one index result := EsSearch(es_index_name, es_request_time_range) if zabbix_send == true { ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result)) ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, strconv.Itoa(es_request_time_range)) } log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result) } }