zabbix-helpers/es-monitoring/es-monitoring.go

406 lines
12 KiB
Go
Raw Normal View History

2024-11-06 10:16:00 +03:00
//------------------------------------------------------------------------------
// Getting and monitoring Elasticsearch indices and send the metrics into zabbix
//-------------------------------------------------------------------------------
// Author: Sergey Kalinin
// e-mail: svk@nuk-svk.ru
// website: https://nuk-svk.ru
// Git repos: https://git.nuk-svk.ru
//-------------------------------------------------------------------------------
// Require:
// github.com/adubkov/go-zabbix
// github.com/elastic/go-elasticsearch
//-------------------------------------------------------------------------------
// ZABBIX_SERVER="zabbix"
// ZABBIX_PORT="10051"
// ZABBIX_HOST="elastic cluster"
// ELASTICSEARCH_URL="https://user:pass@elastic:9200"
//-------------------------------------------------------------------------------
package main
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/adubkov/go-zabbix"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
)
type json_records struct {
INDEX string
RECORD_COUNT int
}
// Определяем входит ли строка в slice
func StringContains(a []string, x string) bool {
for _, n := range a {
if x == n {
return true
}
}
return false
}
func EsSearch(index_name string, es_request_time_range int) int {
var (
r map[string]interface{}
)
request_time_range := "now-" + strconv.Itoa(es_request_time_range) + "h"
cfg := elasticsearch.Config{
// Addresses: []string{
// "http://localhost:9200",
// "http://localhost:9201",
// },
// Username: "foo",
// Password: "bar",
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: 60 * time.Second,
DialContext: (&net.Dialer{Timeout: time.Second}).DialContext,
TLSClientConfig: &tls.Config{
MaxVersion: tls.VersionTLS11,
InsecureSkipVerify: true,
},
},
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
// Check response status
if res.IsError() {
log.Fatalf("Error: %s", res.String())
}
var buf bytes.Buffer
query := map[string]interface{}{
"query": map[string]interface{}{
"range": map[string]interface{}{
"@timestamp": map[string]interface{}{
// "gt": "now-1h",
"gt": request_time_range,
},
// "match": map[string]interface{}{
// "host": "10.250.1.212",
},
},
"size": 1,
"sort": map[string]interface{}{
"@timestamp": map[string]interface{}{
"order": "desc",
},
},
}
if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Fatalf("Error encoding query: %s", err)
}
// Perform the search request.
res, err = es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithIndex(index_name+"*"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
} else {
// Print the response status and error information.
log.Fatalf("[%s] %s: %s",
res.Status(),
e["error"].(map[string]interface{})["type"],
e["error"].(map[string]interface{})["reason"],
)
}
}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// Вывод статуса, количество выбранных записей и времени запроса
hits := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
return hits
}
func EsIndexDiscover(es_index_list []string) string {
result_discovery_json := string("{\"data\": [")
for _, value := range es_index_list {
// result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\",\"{#ITEM_TYPE}\":\"Records count\"" + "},"
result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\"" + "},"
}
result_discovery_json = strings.TrimRight(result_discovery_json, ",") + "]}"
// fmt.Println(result_discovery_json)
return (result_discovery_json)
}
func EsIndexRecordsCount(es_index_list []string, es_request_time_range int) {
// fmt.Println(es_request_time_range)
result_index := make(map[int]json_records)
j := int(0)
for _, value := range es_index_list {
result_index[j] = json_records{value, EsSearch(value, es_request_time_range)}
j++
}
log.Println(result_index)
}
func EsClusterInfo(es *elasticsearch.Client) {
var (
r map[string]interface{}
)
// 1. Get cluster info
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
// Check response status
if res.IsError() {
log.Fatalf("Error: %s", res.String())
}
// Deserialize the response into a map.
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// Print client and server version numbers.
fmt.Printf("Client: %s\n", elasticsearch.Version)
fmt.Printf("Client: %s\n", elasticsearch.Version)
fmt.Printf("Server: %s\n", r["version"].(map[string]interface{})["number"])
}
func EsIndexFieldExists(es *elasticsearch.Client, index_name string, field_name string) bool {
// Check if the filed exists into index
var (
fields_request map[string]interface{}
fields []string
indices []string
field_exists bool
)
fields = append(fields, field_name)
indices = append(indices, index_name)
req := esapi.FieldCapsRequest{
// Index: []string{},
Index: indices,
Fields: fields,
}
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("ERROR: %s", err)
}
if err := json.NewDecoder(res.Body).Decode(&fields_request); err != nil {
log.Fatalf("->Error parsing the response body: %s", err)
}
for i, f := range fields_request {
if i == "fields" {
// if f.(map[string]interface{})[`@timestamp`] == nil {
if f.(map[string]interface{})[field_name] == nil {
field_exists = false
} else {
field_exists = true
}
}
}
return (field_exists)
}
func EsGetIndices(es *elasticsearch.Client) []string {
//Get indices list
var (
index_request []map[string]interface{}
es_index_list []string
)
req := esapi.CatIndicesRequest{
// Index: []string{},
Format: "JSON",
// ExpandWildcards,
}
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("ERROR: %s", err)
}
if err := json.NewDecoder(res.Body).Decode(&index_request); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
for _, hit := range index_request {
m := regexp.MustCompile("-[0-9]")
// позиция вхождения регулярки в имени индекса
i := m.FindStringIndex(hit["index"].(string))
// наименование индекса
index_name := hit["index"].(string)
// Checking the @timestamp field exists
if !EsIndexFieldExists(es, index_name, `@timestamp`) {
continue
// fmt.Println(EsIndexFieldExists(es, index_name))
}
// Итоговое наименование индекса
var index_name_short string
// Исключаем индексы от кибаны (.kibana-*) и обрезаем цифры (даты и версии)
if !strings.HasPrefix(index_name, ".") {
if i != nil {
index_name_short = strings.TrimSpace(index_name[0:i[0]])
} else {
index_name_short = strings.TrimSpace(index_name)
}
if !StringContains(es_index_list, index_name_short) {
es_index_list = append(es_index_list, index_name_short)
}
}
}
return (es_index_list)
}
func ZabbixSender(zabbix_host string, metric_name string, metric_value string) bool {
var (
metrics []*zabbix.Metric
zabbix_port int
err error
)
zabbix_server := os.Getenv("ZABBIX_SERVER")
// Read and checkin zabbix server and port
if zabbix_server == "" {
log.Println("Login error: make sure environment variables `ZABBIX_SERVER` are defined")
os.Exit(1)
}
if os.Getenv("ZABBIX_PORT") == "" {
log.Println("Login error: make sure environment variables `ZABBIX_PORT` are defined")
os.Exit(1)
}
if zabbix_port, err = strconv.Atoi(os.Getenv("ZABBIX_PORT")); err != nil {
log.Println(zabbix_port, "Zabbix port value error")
}
metrics = append(metrics, zabbix.NewMetric(zabbix_host, metric_name, metric_value, time.Now().Unix()))
metrics = append(metrics, zabbix.NewMetric(zabbix_host, "status", "OK"))
// Create instance of Packet class
packet := zabbix.NewPacket(metrics)
// fmt.Println(zabbix_host, metric_name, metric_value, metrics)
// Send packet to zabbix
z := zabbix.NewSender(zabbix_server, zabbix_port)
res, err := z.Send(packet)
log.Println(string(res))
return true
}
//---------------------------------------------------------------------------
func main() {
var (
operation string
es_index_name string
es_request_time_range int
zabbix_send bool
// zabbix_port int
)
flag.StringVar(&es_index_name, "indexname", "", "Elasticsearch index name pattern, (like \"filebeat\")")
flag.IntVar(&es_request_time_range, "timerange", 6, "Elasticsearch time range for records count into hours")
flag.BoolVar(&zabbix_send, "zabbix-send", false, "Send metrics or discovery data into zabbix")
flag.StringVar(&operation, "operation", "es-cluster-info", "Opertation type, must be:\n\tes-cluster-info - ES Cluster information (version e.t.c)\n\tes-get-indices - geting all index\n\tes-indices-discover - getting es index pattern list\n\tes-records-count - getting the number of records for a time range for all index pattern\n\tes-index-records-count - getting records count for one index (used with -indexname option")
flag.Parse()
zabbix_host := os.Getenv("ZABBIX_HOST")
if zabbix_host == "" {
fmt.Println("Send error: make sure environment variables `ZABBIX_HOST`")
os.Exit(1)
}
// if es_request_time_range > 24 {
// log.Println("Error: Time range must be a number between 1 and 24")
// os.Exit(1)
// }
cfg := elasticsearch.Config{
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: 30 * time.Second,
DialContext: (&net.Dialer{Timeout: time.Second}).DialContext,
TLSClientConfig: &tls.Config{
MaxVersion: tls.VersionTLS11,
InsecureSkipVerify: true,
},
},
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// fmt.Println(operation)
switch operation {
case "es-cluster-info":
EsClusterInfo(es)
case "es-get-indices":
// fmt.Println(operation)
for _, index := range EsGetIndices(es) {
fmt.Println(index)
}
case "es-indices-discover":
result_discovery_json := EsIndexDiscover(EsGetIndices(es))
// fmt.Println(zabbix_send)
if zabbix_send == true {
ZabbixSender(zabbix_host, "indices", result_discovery_json)
}
fmt.Println(result_discovery_json)
case "es-records-count":
// records count for all indices
// EsIndexRecordsCount(EsGetIndices(es), es_request_time_range)
result_index := make(map[int]json_records)
j := int(0)
for _, es_index_name := range EsGetIndices(es) {
result := EsSearch(es_index_name, es_request_time_range)
result_index[j] = json_records{es_index_name, result}
log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result)
if zabbix_send == true {
ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result))
ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, strconv.Itoa(es_request_time_range))
}
j++
}
case "es-index-records-count":
// records count for one index
result := EsSearch(es_index_name, es_request_time_range)
if zabbix_send == true {
ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result))
ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, strconv.Itoa(es_request_time_range))
}
log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result)
}
}