449 lines
14 KiB
Go
449 lines
14 KiB
Go
//------------------------------------------------------------------------------
|
|
// Getting and monitoring Elasticsearch indices and send the metrics into zabbix
|
|
//-------------------------------------------------------------------------------
|
|
// Author: Sergey Kalinin
|
|
// e-mail: svk@nuk-svk.ru
|
|
// website: https://nuk-svk.ru
|
|
// Git repos: https://git.nuk-svk.ru
|
|
//-------------------------------------------------------------------------------
|
|
// Require:
|
|
// github.com/adubkov/go-zabbix
|
|
// github.com/elastic/go-elasticsearch
|
|
//------------------------------------------------------------------------------
|
|
// ZABBIX_SERVER="zabbix"
|
|
// ZABBIX_PORT="10051"
|
|
// ZABBIX_HOST="elastic cluster"
|
|
// ELASTICSEARCH_URL="https://user:pass@elastic:9200"
|
|
//-------------------------------------------------------------------------------
|
|
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/adubkov/go-zabbix"
|
|
"github.com/elastic/go-elasticsearch/v7"
|
|
"github.com/elastic/go-elasticsearch/v7/esapi"
|
|
)
|
|
|
|
type json_records struct {
|
|
INDEX string
|
|
RECORD_COUNT int
|
|
}
|
|
|
|
// Определяем входит ли строка в slice
|
|
func StringContains(a []string, x string) bool {
|
|
for _, n := range a {
|
|
if x == n {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
var Exclude_index_list string
|
|
|
|
// Проверка вхождения имени индекса (префикса) в список исключений
|
|
func IndexPrefixExclude(index_name string) bool {
|
|
exclude_index_name := strings.Split(Exclude_index_list, ",")
|
|
// log.Println(Exclude_index_list)
|
|
for _, n := range exclude_index_name {
|
|
if strings.HasPrefix(index_name, n) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func EsSearch(index_name string, es_request_time_range int) int {
|
|
var (
|
|
r map[string]interface{}
|
|
timestamp_field string
|
|
)
|
|
request_time_range := "now-" + strconv.Itoa(es_request_time_range) + "h"
|
|
|
|
cfg := elasticsearch.Config{
|
|
// Addresses: []string{
|
|
// "http://localhost:9200",
|
|
// "http://localhost:9201",
|
|
// },
|
|
// Username: "foo",
|
|
// Password: "bar",
|
|
Transport: &http.Transport{
|
|
MaxIdleConnsPerHost: 10,
|
|
ResponseHeaderTimeout: 60 * time.Second,
|
|
DialContext: (&net.Dialer{Timeout: time.Second}).DialContext,
|
|
TLSClientConfig: &tls.Config{
|
|
MinVersion: tls.VersionTLS10,
|
|
MaxVersion: tls.VersionTLS12,
|
|
InsecureSkipVerify: true,
|
|
},
|
|
},
|
|
}
|
|
|
|
es, err := elasticsearch.NewClient(cfg)
|
|
if err != nil {
|
|
log.Fatalf("Error creating the client: %s", err)
|
|
}
|
|
res, err := es.Info()
|
|
if err != nil {
|
|
log.Fatalf("Error getting response: %s", err)
|
|
}
|
|
defer res.Body.Close()
|
|
// Check response status
|
|
if res.IsError() {
|
|
log.Fatalf("Error: %s", res.String())
|
|
}
|
|
|
|
// Checking the @timestamp field exists
|
|
// if !EsIndexFieldExists(es, index_name, `@timestamp`) {
|
|
// // continue
|
|
// log.Println(EsIndexFieldExists(es, index_name, `@timestamp`), "@timestamp")
|
|
// }
|
|
// log.Println(EsIndexFieldExists(es, index_name, `@timestamp`), "@timestamp")
|
|
|
|
if EsIndexFieldExists(es, index_name, `timestamp`) {
|
|
// continue
|
|
timestamp_field = "timestamp"
|
|
// log.Println(EsIndexFieldExists(es, index_name, `timestamp`), "timestamp")
|
|
} else {
|
|
timestamp_field = "@timestamp"
|
|
}
|
|
// log.Println(EsIndexFieldExists(es, index_name, `timestamp`), "timestamp")
|
|
log.Println(index_name, timestamp_field)
|
|
|
|
var buf bytes.Buffer
|
|
query := map[string]interface{}{
|
|
"query": map[string]interface{}{
|
|
"range": map[string]interface{}{
|
|
timestamp_field: map[string]interface{}{
|
|
// "gt": "now-1h",
|
|
"gt": request_time_range,
|
|
},
|
|
// "match": map[string]interface{}{
|
|
// "host": "10.250.1.212",
|
|
},
|
|
},
|
|
"size": 1,
|
|
"sort": map[string]interface{}{
|
|
timestamp_field: map[string]interface{}{
|
|
"order": "desc",
|
|
},
|
|
},
|
|
}
|
|
if err := json.NewEncoder(&buf).Encode(query); err != nil {
|
|
log.Fatalf("Error encoding query: %s", err)
|
|
}
|
|
|
|
// Perform the search request.
|
|
res, err = es.Search(
|
|
es.Search.WithContext(context.Background()),
|
|
es.Search.WithIndex(index_name+"*"),
|
|
es.Search.WithBody(&buf),
|
|
es.Search.WithTrackTotalHits(true),
|
|
es.Search.WithPretty(),
|
|
)
|
|
if err != nil {
|
|
log.Fatalf("Error getting response: %s", err)
|
|
}
|
|
defer res.Body.Close()
|
|
|
|
if res.IsError() {
|
|
var e map[string]interface{}
|
|
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
|
|
log.Fatalf("Error parsing the response body: %s", err)
|
|
} else {
|
|
// Print the response status and error information.
|
|
log.Fatalf("[%s] %s: %s",
|
|
res.Status(),
|
|
e["error"].(map[string]interface{})["type"],
|
|
e["error"].(map[string]interface{})["reason"],
|
|
)
|
|
}
|
|
}
|
|
|
|
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
|
|
log.Fatalf("Error parsing the response body: %s", err)
|
|
}
|
|
// Вывод статуса, количество выбранных записей и времени запроса
|
|
hits := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
|
|
return hits
|
|
}
|
|
|
|
func EsIndexDiscover(es_index_list []string) string {
|
|
result_discovery_json := string("{\"data\": [")
|
|
|
|
for _, value := range es_index_list {
|
|
// result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\",\"{#ITEM_TYPE}\":\"Records count\"" + "},"
|
|
result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\"" + "},"
|
|
}
|
|
result_discovery_json = strings.TrimRight(result_discovery_json, ",") + "]}"
|
|
// fmt.Println(result_discovery_json)
|
|
return (result_discovery_json)
|
|
}
|
|
|
|
func EsIndexRecordsCount(es_index_list []string, es_request_time_range int) {
|
|
// fmt.Println(es_request_time_range)
|
|
result_index := make(map[int]json_records)
|
|
j := int(0)
|
|
for _, value := range es_index_list {
|
|
result_index[j] = json_records{value, EsSearch(value, es_request_time_range)}
|
|
j++
|
|
}
|
|
log.Println(result_index)
|
|
}
|
|
|
|
func EsClusterInfo(es *elasticsearch.Client) {
|
|
var (
|
|
r map[string]interface{}
|
|
)
|
|
// 1. Get cluster info
|
|
res, err := es.Info()
|
|
if err != nil {
|
|
log.Fatalf("Error getting response: %s", err)
|
|
}
|
|
defer res.Body.Close()
|
|
// Check response status
|
|
if res.IsError() {
|
|
log.Fatalf("Error: %s", res.String())
|
|
}
|
|
// Deserialize the response into a map.
|
|
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
|
|
log.Fatalf("Error parsing the response body: %s", err)
|
|
}
|
|
// Print client and server version numbers.
|
|
fmt.Printf("Client: %s\n", elasticsearch.Version)
|
|
fmt.Printf("Client: %s\n", elasticsearch.Version)
|
|
fmt.Printf("Server: %s\n", r["version"].(map[string]interface{})["number"])
|
|
}
|
|
|
|
func EsIndexFieldExists(es *elasticsearch.Client, index_name string, field_name string) bool {
|
|
// Check if the filed exists into index
|
|
var (
|
|
fields_request map[string]interface{}
|
|
fields []string
|
|
indices []string
|
|
field_exists bool
|
|
)
|
|
fields = append(fields, field_name)
|
|
indices = append(indices, index_name)
|
|
|
|
req := esapi.FieldCapsRequest{
|
|
// Index: []string{},
|
|
Index: indices,
|
|
Fields: fields,
|
|
}
|
|
|
|
res, err := req.Do(context.Background(), es)
|
|
if err != nil {
|
|
log.Fatalf("ERROR: %s", err)
|
|
}
|
|
if err := json.NewDecoder(res.Body).Decode(&fields_request); err != nil {
|
|
log.Fatalf("->Error parsing the response body: %s", err)
|
|
}
|
|
|
|
for i, f := range fields_request {
|
|
if i == "fields" {
|
|
// if f.(map[string]interface{})[`@timestamp`] == nil {
|
|
if f.(map[string]interface{})[field_name] == nil {
|
|
field_exists = false
|
|
} else {
|
|
field_exists = true
|
|
}
|
|
}
|
|
}
|
|
return (field_exists)
|
|
}
|
|
|
|
func EsGetIndices(es *elasticsearch.Client) []string {
|
|
//Get indices list
|
|
// log.Println("sjsjsjsj")
|
|
var (
|
|
index_request []map[string]interface{}
|
|
es_index_list []string
|
|
)
|
|
req := esapi.CatIndicesRequest{
|
|
// Index: []string{},
|
|
Format: "JSON",
|
|
// ExpandWildcards,
|
|
}
|
|
|
|
res, err := req.Do(context.Background(), es)
|
|
if err != nil {
|
|
log.Fatalf("ERROR: %s", err)
|
|
}
|
|
// log.Println(res)
|
|
|
|
if err := json.NewDecoder(res.Body).Decode(&index_request); err != nil {
|
|
log.Fatalf("Error parsing the response body: %s", err)
|
|
}
|
|
for _, hit := range index_request {
|
|
m := regexp.MustCompile("-[0-9]")
|
|
// позиция вхождения регулярки в имени индекса
|
|
i := m.FindStringIndex(hit["index"].(string))
|
|
// наименование индекса
|
|
index_name := hit["index"].(string)
|
|
// log.Println(index_name)
|
|
|
|
// Итоговое наименование индекса
|
|
var index_name_short string
|
|
// Исключаем индексы от кибаны (.kibana-*) и обрезаем цифры (даты и версии)
|
|
if !IndexPrefixExclude(index_name) {
|
|
if i != nil {
|
|
index_name_short = strings.TrimSpace(index_name[0:i[0]])
|
|
} else {
|
|
index_name_short = strings.TrimSpace(index_name)
|
|
}
|
|
if !StringContains(es_index_list, index_name_short) {
|
|
es_index_list = append(es_index_list, index_name_short)
|
|
}
|
|
} else {
|
|
log.Println("Index", index_name, "has name from exclude lists")
|
|
}
|
|
}
|
|
log.Println(es_index_list)
|
|
|
|
return (es_index_list)
|
|
}
|
|
|
|
func ZabbixSender(zabbix_host string, metric_name string, metric_value string) bool {
|
|
var (
|
|
metrics []*zabbix.Metric
|
|
zabbix_port int
|
|
err error
|
|
)
|
|
zabbix_server := os.Getenv("ZABBIX_SERVER")
|
|
|
|
// Read and checkin zabbix server and port
|
|
if zabbix_server == "" {
|
|
log.Println("Login error: make sure environment variables `ZABBIX_SERVER` are defined")
|
|
os.Exit(1)
|
|
}
|
|
if os.Getenv("ZABBIX_PORT") == "" {
|
|
log.Println("Login error: make sure environment variables `ZABBIX_PORT` are defined")
|
|
os.Exit(1)
|
|
}
|
|
if zabbix_port, err = strconv.Atoi(os.Getenv("ZABBIX_PORT")); err != nil {
|
|
log.Println(zabbix_port, "Zabbix port value error")
|
|
}
|
|
|
|
metrics = append(metrics, zabbix.NewMetric(zabbix_host, metric_name, metric_value, time.Now().Unix()))
|
|
metrics = append(metrics, zabbix.NewMetric(zabbix_host, "status", "OK"))
|
|
|
|
// Create instance of Packet class
|
|
packet := zabbix.NewPacket(metrics)
|
|
// fmt.Println(zabbix_host, metric_name, metric_value, metrics)
|
|
// Send packet to zabbix
|
|
z := zabbix.NewSender(zabbix_server, zabbix_port)
|
|
res, err := z.Send(packet)
|
|
log.Println(string(res))
|
|
return true
|
|
}
|
|
|
|
//---------------------------------------------------------------------------
|
|
func main() {
|
|
var (
|
|
operation string
|
|
es_index_name string
|
|
es_request_time_range int
|
|
zabbix_send bool
|
|
// zabbix_port int
|
|
)
|
|
|
|
flag.StringVar(&es_index_name, "indexname", "", "Elasticsearch index name pattern, (like \"filebeat\")")
|
|
flag.IntVar(&es_request_time_range, "timerange", 6, "Elasticsearch time range for records count into hours")
|
|
flag.BoolVar(&zabbix_send, "zabbix-send", false, "Send metrics or discovery data into zabbix")
|
|
flag.StringVar(&Exclude_index_list, "exclude-index", ".", "Elasticsearch index name comma-separated list for exluded, (like \"filebeat,mailindex,e.t.c\")")
|
|
|
|
flag.StringVar(&operation, "operation", "es-cluster-info", "Opertation type, must be:\n\tes-cluster-info - ES Cluster information (version e.t.c)\n\tes-get-indices - geting all index\n\tes-indices-discover - getting es index pattern list\n\tes-records-count - getting the number of records for a time range for all index pattern\n\tes-index-records-count - getting records count for one index (used with -indexname option")
|
|
|
|
flag.Parse()
|
|
|
|
zabbix_host := os.Getenv("ZABBIX_HOST")
|
|
if zabbix_host == "" {
|
|
fmt.Println("Send error: make sure environment variables `ZABBIX_HOST`")
|
|
os.Exit(1)
|
|
}
|
|
|
|
if os.Getenv("ELASTICSEARCH_URL") == "" {
|
|
fmt.Println("Send error: make sure environment variables `ELASTICSEARCH_URL`")
|
|
os.Exit(1)
|
|
}
|
|
// if es_request_time_range > 24 {
|
|
// log.Println("Error: Time range must be a number between 1 and 24")
|
|
// os.Exit(1)
|
|
// }
|
|
|
|
cfg := elasticsearch.Config{
|
|
Transport: &http.Transport{
|
|
MaxIdleConnsPerHost: 10,
|
|
ResponseHeaderTimeout: 30 * time.Second,
|
|
DialContext: (&net.Dialer{Timeout: time.Second}).DialContext,
|
|
TLSClientConfig: &tls.Config{
|
|
MinVersion: tls.VersionTLS10,
|
|
MaxVersion: tls.VersionTLS13,
|
|
InsecureSkipVerify: true,
|
|
},
|
|
},
|
|
}
|
|
|
|
es, err := elasticsearch.NewClient(cfg)
|
|
if err != nil {
|
|
log.Fatalf("Error creating the client: %s", err)
|
|
}
|
|
// fmt.Println(operation)
|
|
switch operation {
|
|
case "es-cluster-info":
|
|
EsClusterInfo(es)
|
|
case "es-get-indices":
|
|
// fmt.Println(operation)
|
|
for _, index := range EsGetIndices(es) {
|
|
fmt.Println(index)
|
|
}
|
|
case "es-indices-discover":
|
|
result_discovery_json := EsIndexDiscover(EsGetIndices(es))
|
|
// fmt.Println(zabbix_send)
|
|
if zabbix_send == true {
|
|
ZabbixSender(zabbix_host, "indices", result_discovery_json)
|
|
}
|
|
fmt.Println(result_discovery_json)
|
|
case "es-records-count":
|
|
// records count for all indices
|
|
// EsIndexRecordsCount(EsGetIndices(es), es_request_time_range)
|
|
result_index := make(map[int]json_records)
|
|
j := int(0)
|
|
for _, es_index_name := range EsGetIndices(es) {
|
|
log.Println(es_index_name)
|
|
result := EsSearch(es_index_name, es_request_time_range)
|
|
result_index[j] = json_records{es_index_name, result}
|
|
log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result)
|
|
if zabbix_send == true {
|
|
ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result))
|
|
ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, strconv.Itoa(es_request_time_range))
|
|
}
|
|
j++
|
|
}
|
|
case "es-index-records-count":
|
|
// records count for one index
|
|
result := EsSearch(es_index_name, es_request_time_range)
|
|
if zabbix_send == true {
|
|
ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result))
|
|
ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, strconv.Itoa(es_request_time_range))
|
|
}
|
|
log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result)
|
|
}
|
|
}
|