2024-11-06 10:16:00 +03:00
//------------------------------------------------------------------------------
// Getting and monitoring Elasticsearch indices and send the metrics into zabbix
//-------------------------------------------------------------------------------
// Author: Sergey Kalinin
// e-mail: svk@nuk-svk.ru
// website: https://nuk-svk.ru
// Git repos: https://git.nuk-svk.ru
//-------------------------------------------------------------------------------
// Require:
2024-12-10 12:43:08 +03:00
// github.com/adubkov/go-zabbix
2024-11-06 10:16:00 +03:00
// github.com/elastic/go-elasticsearch
2024-12-10 12:43:08 +03:00
//------------------------------------------------------------------------------
2024-11-06 10:16:00 +03:00
// ZABBIX_SERVER="zabbix"
// ZABBIX_PORT="10051"
// ZABBIX_HOST="elastic cluster"
// ELASTICSEARCH_URL="https://user:pass@elastic:9200"
//-------------------------------------------------------------------------------
package main
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/adubkov/go-zabbix"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
)
type json_records struct {
INDEX string
RECORD_COUNT int
}
// Определяем входит ли строка в slice
func StringContains ( a [ ] string , x string ) bool {
for _ , n := range a {
if x == n {
return true
}
}
return false
}
2024-12-10 12:43:08 +03:00
var Exclude_index_list string
// Проверка вхождения имени индекса (префикса) в список исключений
func IndexPrefixExclude ( index_name string ) bool {
exclude_index_name := strings . Split ( Exclude_index_list , "," )
// log.Println(Exclude_index_list)
for _ , n := range exclude_index_name {
if strings . HasPrefix ( index_name , n ) {
return true
}
}
return false
}
2024-11-06 10:16:00 +03:00
func EsSearch ( index_name string , es_request_time_range int ) int {
var (
r map [ string ] interface { }
2024-12-10 12:43:08 +03:00
timestamp_field string
2024-11-06 10:16:00 +03:00
)
request_time_range := "now-" + strconv . Itoa ( es_request_time_range ) + "h"
cfg := elasticsearch . Config {
// Addresses: []string{
// "http://localhost:9200",
// "http://localhost:9201",
// },
// Username: "foo",
// Password: "bar",
Transport : & http . Transport {
MaxIdleConnsPerHost : 10 ,
ResponseHeaderTimeout : 60 * time . Second ,
DialContext : ( & net . Dialer { Timeout : time . Second } ) . DialContext ,
TLSClientConfig : & tls . Config {
2024-12-10 12:43:08 +03:00
MinVersion : tls . VersionTLS10 ,
MaxVersion : tls . VersionTLS12 ,
2024-11-06 10:16:00 +03:00
InsecureSkipVerify : true ,
} ,
} ,
}
es , err := elasticsearch . NewClient ( cfg )
if err != nil {
log . Fatalf ( "Error creating the client: %s" , err )
}
res , err := es . Info ( )
if err != nil {
log . Fatalf ( "Error getting response: %s" , err )
}
defer res . Body . Close ( )
// Check response status
if res . IsError ( ) {
log . Fatalf ( "Error: %s" , res . String ( ) )
}
2024-12-10 12:43:08 +03:00
// Checking the @timestamp field exists
// if !EsIndexFieldExists(es, index_name, `@timestamp`) {
// // continue
// log.Println(EsIndexFieldExists(es, index_name, `@timestamp`), "@timestamp")
// }
// log.Println(EsIndexFieldExists(es, index_name, `@timestamp`), "@timestamp")
if EsIndexFieldExists ( es , index_name , ` timestamp ` ) {
// continue
timestamp_field = "timestamp"
// log.Println(EsIndexFieldExists(es, index_name, `timestamp`), "timestamp")
} else {
timestamp_field = "@timestamp"
}
// log.Println(EsIndexFieldExists(es, index_name, `timestamp`), "timestamp")
log . Println ( index_name , timestamp_field )
2024-11-06 10:16:00 +03:00
var buf bytes . Buffer
query := map [ string ] interface { } {
"query" : map [ string ] interface { } {
"range" : map [ string ] interface { } {
2024-12-10 12:43:08 +03:00
timestamp_field : map [ string ] interface { } {
2024-11-06 10:16:00 +03:00
// "gt": "now-1h",
"gt" : request_time_range ,
} ,
// "match": map[string]interface{}{
// "host": "10.250.1.212",
} ,
} ,
"size" : 1 ,
"sort" : map [ string ] interface { } {
2024-12-10 12:43:08 +03:00
timestamp_field : map [ string ] interface { } {
2024-11-06 10:16:00 +03:00
"order" : "desc" ,
} ,
} ,
}
if err := json . NewEncoder ( & buf ) . Encode ( query ) ; err != nil {
log . Fatalf ( "Error encoding query: %s" , err )
}
// Perform the search request.
res , err = es . Search (
es . Search . WithContext ( context . Background ( ) ) ,
es . Search . WithIndex ( index_name + "*" ) ,
es . Search . WithBody ( & buf ) ,
es . Search . WithTrackTotalHits ( true ) ,
es . Search . WithPretty ( ) ,
)
if err != nil {
log . Fatalf ( "Error getting response: %s" , err )
}
defer res . Body . Close ( )
if res . IsError ( ) {
var e map [ string ] interface { }
if err := json . NewDecoder ( res . Body ) . Decode ( & e ) ; err != nil {
log . Fatalf ( "Error parsing the response body: %s" , err )
} else {
// Print the response status and error information.
log . Fatalf ( "[%s] %s: %s" ,
res . Status ( ) ,
e [ "error" ] . ( map [ string ] interface { } ) [ "type" ] ,
e [ "error" ] . ( map [ string ] interface { } ) [ "reason" ] ,
)
}
}
if err := json . NewDecoder ( res . Body ) . Decode ( & r ) ; err != nil {
log . Fatalf ( "Error parsing the response body: %s" , err )
}
// Вывод статуса, количество выбранных записей и времени запроса
hits := int ( r [ "hits" ] . ( map [ string ] interface { } ) [ "total" ] . ( map [ string ] interface { } ) [ "value" ] . ( float64 ) )
return hits
}
func EsIndexDiscover ( es_index_list [ ] string ) string {
result_discovery_json := string ( "{\"data\": [" )
for _ , value := range es_index_list {
// result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\",\"{#ITEM_TYPE}\":\"Records count\"" + "},"
result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\"" + "},"
}
result_discovery_json = strings . TrimRight ( result_discovery_json , "," ) + "]}"
// fmt.Println(result_discovery_json)
return ( result_discovery_json )
}
func EsIndexRecordsCount ( es_index_list [ ] string , es_request_time_range int ) {
// fmt.Println(es_request_time_range)
result_index := make ( map [ int ] json_records )
j := int ( 0 )
for _ , value := range es_index_list {
result_index [ j ] = json_records { value , EsSearch ( value , es_request_time_range ) }
j ++
}
log . Println ( result_index )
}
func EsClusterInfo ( es * elasticsearch . Client ) {
var (
r map [ string ] interface { }
)
// 1. Get cluster info
res , err := es . Info ( )
if err != nil {
log . Fatalf ( "Error getting response: %s" , err )
}
defer res . Body . Close ( )
// Check response status
if res . IsError ( ) {
log . Fatalf ( "Error: %s" , res . String ( ) )
}
// Deserialize the response into a map.
if err := json . NewDecoder ( res . Body ) . Decode ( & r ) ; err != nil {
log . Fatalf ( "Error parsing the response body: %s" , err )
}
// Print client and server version numbers.
fmt . Printf ( "Client: %s\n" , elasticsearch . Version )
fmt . Printf ( "Client: %s\n" , elasticsearch . Version )
fmt . Printf ( "Server: %s\n" , r [ "version" ] . ( map [ string ] interface { } ) [ "number" ] )
}
func EsIndexFieldExists ( es * elasticsearch . Client , index_name string , field_name string ) bool {
// Check if the filed exists into index
var (
fields_request map [ string ] interface { }
fields [ ] string
indices [ ] string
field_exists bool
)
fields = append ( fields , field_name )
indices = append ( indices , index_name )
req := esapi . FieldCapsRequest {
// Index: []string{},
Index : indices ,
Fields : fields ,
}
res , err := req . Do ( context . Background ( ) , es )
if err != nil {
log . Fatalf ( "ERROR: %s" , err )
}
if err := json . NewDecoder ( res . Body ) . Decode ( & fields_request ) ; err != nil {
log . Fatalf ( "->Error parsing the response body: %s" , err )
}
for i , f := range fields_request {
if i == "fields" {
// if f.(map[string]interface{})[`@timestamp`] == nil {
if f . ( map [ string ] interface { } ) [ field_name ] == nil {
field_exists = false
} else {
field_exists = true
}
}
}
return ( field_exists )
}
func EsGetIndices ( es * elasticsearch . Client ) [ ] string {
//Get indices list
2024-12-10 12:43:08 +03:00
// log.Println("sjsjsjsj")
2024-11-06 10:16:00 +03:00
var (
index_request [ ] map [ string ] interface { }
es_index_list [ ] string
)
req := esapi . CatIndicesRequest {
// Index: []string{},
Format : "JSON" ,
// ExpandWildcards,
}
res , err := req . Do ( context . Background ( ) , es )
if err != nil {
log . Fatalf ( "ERROR: %s" , err )
}
2024-12-10 12:43:08 +03:00
// log.Println(res)
2024-11-06 10:16:00 +03:00
if err := json . NewDecoder ( res . Body ) . Decode ( & index_request ) ; err != nil {
log . Fatalf ( "Error parsing the response body: %s" , err )
}
for _ , hit := range index_request {
m := regexp . MustCompile ( "-[0-9]" )
// позиция вхождения регулярки в имени индекса
i := m . FindStringIndex ( hit [ "index" ] . ( string ) )
// наименование индекса
index_name := hit [ "index" ] . ( string )
2024-12-10 12:43:08 +03:00
// log.Println(index_name)
2024-11-06 10:16:00 +03:00
// Итоговое наименование индекса
var index_name_short string
// Исключаем индексы от кибаны (.kibana-*) и обрезаем цифры (даты и версии)
2024-12-10 12:43:08 +03:00
if ! IndexPrefixExclude ( index_name ) {
2024-11-06 10:16:00 +03:00
if i != nil {
index_name_short = strings . TrimSpace ( index_name [ 0 : i [ 0 ] ] )
} else {
index_name_short = strings . TrimSpace ( index_name )
}
if ! StringContains ( es_index_list , index_name_short ) {
es_index_list = append ( es_index_list , index_name_short )
}
2024-12-10 12:43:08 +03:00
} else {
log . Println ( "Index" , index_name , "has name from exclude lists" )
2024-11-06 10:16:00 +03:00
}
}
2024-12-10 12:43:08 +03:00
log . Println ( es_index_list )
2024-11-06 10:16:00 +03:00
return ( es_index_list )
}
func ZabbixSender ( zabbix_host string , metric_name string , metric_value string ) bool {
var (
metrics [ ] * zabbix . Metric
zabbix_port int
err error
)
zabbix_server := os . Getenv ( "ZABBIX_SERVER" )
// Read and checkin zabbix server and port
if zabbix_server == "" {
log . Println ( "Login error: make sure environment variables `ZABBIX_SERVER` are defined" )
os . Exit ( 1 )
}
if os . Getenv ( "ZABBIX_PORT" ) == "" {
log . Println ( "Login error: make sure environment variables `ZABBIX_PORT` are defined" )
os . Exit ( 1 )
}
if zabbix_port , err = strconv . Atoi ( os . Getenv ( "ZABBIX_PORT" ) ) ; err != nil {
log . Println ( zabbix_port , "Zabbix port value error" )
}
metrics = append ( metrics , zabbix . NewMetric ( zabbix_host , metric_name , metric_value , time . Now ( ) . Unix ( ) ) )
metrics = append ( metrics , zabbix . NewMetric ( zabbix_host , "status" , "OK" ) )
// Create instance of Packet class
packet := zabbix . NewPacket ( metrics )
// fmt.Println(zabbix_host, metric_name, metric_value, metrics)
// Send packet to zabbix
z := zabbix . NewSender ( zabbix_server , zabbix_port )
res , err := z . Send ( packet )
log . Println ( string ( res ) )
return true
}
//---------------------------------------------------------------------------
func main ( ) {
var (
operation string
es_index_name string
es_request_time_range int
zabbix_send bool
// zabbix_port int
)
flag . StringVar ( & es_index_name , "indexname" , "" , "Elasticsearch index name pattern, (like \"filebeat\")" )
flag . IntVar ( & es_request_time_range , "timerange" , 6 , "Elasticsearch time range for records count into hours" )
flag . BoolVar ( & zabbix_send , "zabbix-send" , false , "Send metrics or discovery data into zabbix" )
2024-12-10 12:43:08 +03:00
flag . StringVar ( & Exclude_index_list , "exclude-index" , "." , "Elasticsearch index name comma-separated list for exluded, (like \"filebeat,mailindex,e.t.c\")" )
2024-11-06 10:16:00 +03:00
flag . StringVar ( & operation , "operation" , "es-cluster-info" , "Opertation type, must be:\n\tes-cluster-info - ES Cluster information (version e.t.c)\n\tes-get-indices - geting all index\n\tes-indices-discover - getting es index pattern list\n\tes-records-count - getting the number of records for a time range for all index pattern\n\tes-index-records-count - getting records count for one index (used with -indexname option" )
flag . Parse ( )
zabbix_host := os . Getenv ( "ZABBIX_HOST" )
if zabbix_host == "" {
fmt . Println ( "Send error: make sure environment variables `ZABBIX_HOST`" )
os . Exit ( 1 )
}
2024-12-10 12:43:08 +03:00
if os . Getenv ( "ELASTICSEARCH_URL" ) == "" {
fmt . Println ( "Send error: make sure environment variables `ELASTICSEARCH_URL`" )
os . Exit ( 1 )
}
2024-11-06 10:16:00 +03:00
// if es_request_time_range > 24 {
// log.Println("Error: Time range must be a number between 1 and 24")
// os.Exit(1)
// }
cfg := elasticsearch . Config {
Transport : & http . Transport {
MaxIdleConnsPerHost : 10 ,
ResponseHeaderTimeout : 30 * time . Second ,
DialContext : ( & net . Dialer { Timeout : time . Second } ) . DialContext ,
TLSClientConfig : & tls . Config {
2024-12-10 12:43:08 +03:00
MinVersion : tls . VersionTLS10 ,
MaxVersion : tls . VersionTLS13 ,
2024-11-06 10:16:00 +03:00
InsecureSkipVerify : true ,
} ,
} ,
}
es , err := elasticsearch . NewClient ( cfg )
if err != nil {
log . Fatalf ( "Error creating the client: %s" , err )
}
// fmt.Println(operation)
switch operation {
case "es-cluster-info" :
EsClusterInfo ( es )
case "es-get-indices" :
// fmt.Println(operation)
for _ , index := range EsGetIndices ( es ) {
fmt . Println ( index )
}
case "es-indices-discover" :
result_discovery_json := EsIndexDiscover ( EsGetIndices ( es ) )
// fmt.Println(zabbix_send)
if zabbix_send == true {
ZabbixSender ( zabbix_host , "indices" , result_discovery_json )
}
fmt . Println ( result_discovery_json )
case "es-records-count" :
// records count for all indices
// EsIndexRecordsCount(EsGetIndices(es), es_request_time_range)
result_index := make ( map [ int ] json_records )
j := int ( 0 )
for _ , es_index_name := range EsGetIndices ( es ) {
2024-12-10 12:43:08 +03:00
log . Println ( es_index_name )
2024-11-06 10:16:00 +03:00
result := EsSearch ( es_index_name , es_request_time_range )
result_index [ j ] = json_records { es_index_name , result }
log . Println ( "ES index name:" , es_index_name , "; Query time period:" , es_request_time_range , "; Records count:" , result )
if zabbix_send == true {
ZabbixSender ( zabbix_host , ` indices.records_count[ ` + es_index_name + ` ] ` , strconv . Itoa ( result ) )
ZabbixSender ( zabbix_host , ` indices.records_count_time_range[ ` + es_index_name + ` ] ` , strconv . Itoa ( es_request_time_range ) )
}
j ++
}
case "es-index-records-count" :
// records count for one index
result := EsSearch ( es_index_name , es_request_time_range )
if zabbix_send == true {
ZabbixSender ( zabbix_host , ` indices.records_count[ ` + es_index_name + ` ] ` , strconv . Itoa ( result ) )
ZabbixSender ( zabbix_host , ` indices.records_count_time_range[ ` + es_index_name + ` ] ` , strconv . Itoa ( es_request_time_range ) )
}
log . Println ( "ES index name:" , es_index_name , "; Query time period:" , es_request_time_range , "; Records count:" , result )
}
}