//------------------------------------------------------------------------------ // Getting and monitoring Elasticsearch indices and send the metrics into zabbix //------------------------------------------------------------------------------- // Author: Sergey Kalinin //------------------------------------------------------------------------------- // ZABBIX_SERVER="zabbix" // ZABBIX_PORT="10051" // ZABBIX_HOST="elastic cluster" // ELASTICSEARCH_URL="https://user:pass@elastic:9200" //------------------------------------------------------------------------------- package main import ( "bytes" "context" "crypto/tls" "encoding/json" "flag" "fmt" "log" "net" "net/netip" "net/http" "os" "regexp" "strconv" "strings" "time" "reflect" "github.com/adubkov/go-zabbix" // "github.com/elastic/go-elasticsearch/v7" // "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/opensearch-project/opensearch-go/v2" "github.com/opensearch-project/opensearch-go/v2/opensearchapi" "github.com/jackc/pgx/v5" "github.com/jackc/pgtype" ) type json_records struct { INDEX string RECORD_COUNT int } type Mail_records struct { id int32 client_name string client_account string //remote_ip pgtype.Inet remote_ip string country_code string check_time time.Time session_time string mail_reason string } type ESIndex struct { es_index_name string es_request_time_range string es_request_rows_quantity int } // type ESQuery struct { // Query struct { // Bool struct { // Filter []struct { // Range struct { // Timestamp struct { // Gt string `json:"gt"` // } `json:"@timestamp"` // } `json:"range"` // Match struct { // MailCommand string `json:"mail.command"` // } `json:"match"` // } `json:"filter"` // } `json:"bool"` // } `json:"query"` // Size int `json:"size"` // } type ESQuery struct { Query struct { Bool struct { Filter []map[string]interface{} `json:"filter"` } `json:"bool"` } `json:"query"` Size int `json:"size"` } type MailSession struct { id int32 mail_account_id int32 sessions_count int32 country_code string remote_ip netip.Prefix last_time string check_time time.Time alarm_type int } type Config struct { sessions_count int32 country_code string } var Exclude_index_list string var conn *pgx.Conn // Определяем входит ли строка в slice func StringContains(a []string, x string) bool { for _, n := range a { if x == n { return true } } return false } func isNil(i interface{}) bool { if i == nil { return true } switch reflect.TypeOf(i).Kind() { case reflect.Ptr, reflect.Map, reflect.Array, reflect.Chan, reflect.Slice: return reflect.ValueOf(i).IsNil() } return false } // Проверка вхождения имени индекса (префикса) в список исключений func IndexPrefixExclude(index_name string) bool { exclude_index_name := strings.Split(Exclude_index_list, ",") // log.Println(Exclude_index_list) for _, n := range exclude_index_name { if strings.HasPrefix(index_name, n) { return true } } return false } func EsSearch(index_name string, query ESQuery) map[string]interface{} { var ( r map[string]interface{} ) cfg := opensearch.Config{ // Addresses: []string{ // "http://localhost:9200", // "http://localhost:9201", // }, // Username: "foo", // Password: "bar", Transport: &http.Transport{ MaxIdleConnsPerHost: 10, ResponseHeaderTimeout: 60 * time.Second, DialContext: (&net.Dialer{Timeout: time.Second}).DialContext, TLSClientConfig: &tls.Config{ MaxVersion: tls.VersionTLS13, InsecureSkipVerify: true, }, }, } es, err := opensearch.NewClient(cfg) if err != nil { log.Fatalf("Error creating the client: %s", err) } res, err := es.Info() if err != nil { log.Fatalf("Error getting response: %s", err) } defer res.Body.Close() // Check response status if res.IsError() { log.Fatalf("Error: %s", res.String()) } var buf bytes.Buffer // query := map[string]interface{}{ // "query": map[string]interface{}{ // "range": map[string]interface{}{ // "@timestamp": map[string]interface{}{ // // "gt": "now-1h", // "gt": request_time_range, // }, // } // "match": map[string]interface{}{ // "mail.command": "Auth", // }, // }, // "size": es_request_rows_quantity, // "sort": map[string]interface{}{ // "@timestamp": map[string]interface{}{ // "order": "desc", // }, // }, // } a, _ := json.Marshal(query) fmt.Println(string(a)) if err := json.NewEncoder(&buf).Encode(query); err != nil { log.Fatalf("Error encoding query: %s", err) } // Perform the search request. res, err = es.Search( es.Search.WithContext(context.Background()), es.Search.WithIndex(index_name+"*"), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), es.Search.WithPretty(), ) if err != nil { log.Fatalf("Error getting response: %s", err) } defer res.Body.Close() if res.IsError() { var e map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&e); err != nil { log.Fatalf("Error parsing the response body: %s", err) } else { // Print the response status and error information. log.Fatalf("[%s] %s: %s", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["reason"], ) } } if err := json.NewDecoder(res.Body).Decode(&r); err != nil { log.Printf("Error parsing the response body:", err) } // json_data, _ := json.Marshal(r) // fmt.Println(string(json_data)) return r } func EsIndexDiscover(es_index_list []string) string { result_discovery_json := string("{\"data\": [") for _, value := range es_index_list { // result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\",\"{#ITEM_TYPE}\":\"Records count\"" + "}," result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\"" + "}," } result_discovery_json = strings.TrimRight(result_discovery_json, ",") + "]}" // fmt.Println(result_discovery_json) return (result_discovery_json) } // func EsIndexRecordsCount(es_index_list []string, es_request_time_range string, es_request_rows_quantity int) { // // fmt.Println(es_request_time_range) // result_index := make(map[int]json_records) // j := int(0) // for _, value := range es_index_list { // r := EsSearch(value, es_request_time_range, es_request_rows_quantity) // result := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)) // // result_index[j] = json_records{value, result} // j++ // } // log.Println(result_index) // } // func EsClusterInfo(es *opensearch.Client) { var ( r map[string]interface{} ) // 1. Get cluster info res, err := es.Info() if err != nil { log.Fatalf("Error getting response: %s", err) } defer res.Body.Close() // Check response status if res.IsError() { log.Fatalf("Error: %s", res.String()) } // Deserialize the response into a map. if err := json.NewDecoder(res.Body).Decode(&r); err != nil { log.Fatalf("Error parsing the response body: %s", err) } // Print client and server version numbers. fmt.Printf("Client: %s\n", opensearch.Version) fmt.Printf("Client: %s\n", opensearch.Version) fmt.Printf("Server: %s\n", r["version"].(map[string]interface{})["number"]) } func EsIndexFieldExists(es *opensearch.Client, index_name string, field_name string) bool { // Check if the filed exists into index var ( fields_request map[string]interface{} fields []string indices []string field_exists bool ) fields = append(fields, field_name) indices = append(indices, index_name) req := opensearchapi.FieldCapsRequest{ // Index: []string{}, Index: indices, Fields: fields, } res, err := req.Do(context.Background(), es) if err != nil { log.Fatalf("ERROR: %s", err) } if err := json.NewDecoder(res.Body).Decode(&fields_request); err != nil { log.Fatalf("->Error parsing the response body: %s", err) } for i, f := range fields_request { if i == "fields" { // if f.(map[string]interface{})[`@timestamp`] == nil { if f.(map[string]interface{})[field_name] == nil { field_exists = false } else { field_exists = true } } } return (field_exists) } func EsGetIndices(es *opensearch.Client) []string { //Get indices list // log.Println("sjsjsjsj") var ( index_request []map[string]interface{} es_index_list []string ) req := opensearchapi.CatIndicesRequest{ // Index: []string{}, Format: "JSON", // ExpandWildcards, } res, err := req.Do(context.Background(), es) if err != nil { log.Fatalf("ERROR: %s", err) } // log.Println(res) if err := json.NewDecoder(res.Body).Decode(&index_request); err != nil { log.Fatalf("Error parsing the response body: %s", err) } for _, hit := range index_request { m := regexp.MustCompile("-[0-9]") // позиция вхождения регулярки в имени индекса i := m.FindStringIndex(hit["index"].(string)) // наименование индекса index_name := hit["index"].(string) // log.Println(index_name) // Checking the @timestamp field exists // if !EsIndexFieldExists(es, index_name, `@timestamp`) { // continue // // fmt.Println(EsIndexFieldExists(es, index_name)) // } // if strings.HasPrefix(index_name, "apdex") { // continue // // fmt.Println(EsIndexFieldExists(es, index_name)) // } // Итоговое наименование индекса var index_name_short string // Исключаем индексы от кибаны (.kibana-*) и обрезаем цифры (даты и версии) if !IndexPrefixExclude(index_name) { if i != nil { index_name_short = strings.TrimSpace(index_name[0:i[0]]) } else { index_name_short = strings.TrimSpace(index_name) } if !StringContains(es_index_list, index_name_short) { es_index_list = append(es_index_list, index_name_short) } } else { log.Println("Index", index_name, "has name from exclude lists") } } log.Println(es_index_list) return (es_index_list) } func ZabbixSender(zabbix_host string, metric_name string, metric_value string) bool { var ( metrics []*zabbix.Metric zabbix_port int err error ) zabbix_server := os.Getenv("ZABBIX_SERVER") // Read and checkin zabbix server and port if zabbix_server == "" { log.Println("Login error: make sure environment variables `ZABBIX_SERVER` are defined") os.Exit(1) } if os.Getenv("ZABBIX_PORT") == "" { log.Println("Login error: make sure environment variables `ZABBIX_PORT` are defined") os.Exit(1) } if zabbix_port, err = strconv.Atoi(os.Getenv("ZABBIX_PORT")); err != nil { log.Println(zabbix_port, "Zabbix port value error") } metrics = append(metrics, zabbix.NewMetric(zabbix_host, metric_name, metric_value, time.Now().Unix())) metrics = append(metrics, zabbix.NewMetric(zabbix_host, "status", "OK")) // Create instance of Packet class packet := zabbix.NewPacket(metrics) // fmt.Println(zabbix_host, metric_name, metric_value, metrics) // Send packet to zabbix z := zabbix.NewSender(zabbix_server, zabbix_port) res, err := z.Send(packet) log.Println(string(res)) return true } func PgSearch(conn *pgx.Conn, mail_data Mail_records) int32 { var id int32 var client_name pgtype.Varchar // var remote_ip pgtype.CIDR // var country_code pgtype.Varchar // fmt.Println("Select data from DB:", mail_data.client_name) err := conn.QueryRow(context.Background(), "select id, client_name from mail_accounts where client_name = $1", mail_data.client_name).Scan(&id, &client_name) //err = conn.QueryRow(context.Background(), "select id, client_name, remote_ip, country_code from mail_accounts where") if err != nil { fmt.Fprintf(os.Stderr, "Query select account failed: %v\n", err) // id = PgInsertAccount(conn, mail_data) log.Println("Select", mail_data.client_name, "error:", err) if err.Error() == "no rows in result set" { id = 0 } else { os.Exit(1) } } // fmt.Println(">>>", id, client_name.String) return id } func PgInsertAccount(conn *pgx.Conn, mail_data Mail_records) int32 { var id int32 // var client_name pgtype.Varchar // var remote_ip pgtype.CIDR // var country_code pgtype.Varchar // fmt.Println("Insert data into DB") err := conn.QueryRow(context.Background(), "insert into mail_accounts(client_name) values($1) returning id", mail_data.client_name).Scan(&id) if err != nil { fmt.Fprintf(os.Stderr, "Query insert account failed: %v\n", err) } //fmt.Println(id) return id } func PgSearchMailSession(conn *pgx.Conn, mail_data Mail_records) (int32, int32) { var id int32 var session_count int32 // var client_name pgtype.Varchar // var remote_ip pgtype.CIDR var country_code pgtype.Varchar // log.Println("Select data from DB:", "select id, sessions_count, country_code from mail_sessions where mail_account_id =", mail_data.id, "and remote_ip =",mail_data.remote_ip, "and last_time =", mail_data.session_time, "and last_check_timestamp =", mail_data.check_time) err := conn.QueryRow(context.Background(), "select id, sessions_count, country_code from mail_sessions where mail_account_id = $1 and remote_ip = $2 and last_check_timestamp = $3", mail_data.id, mail_data.remote_ip, mail_data.check_time).Scan(&id, &session_count, &country_code) //err = conn.QueryRow(context.Background(), "select id, client_name, remote_ip, country_code from mail_accounts where")\ // log.Println("Selected session data: ", id, session_count, country_code) if err != nil { log.Println("Query session PgSearchMailSession failed:", err) // id = PgInsertAccount(conn, mail_data) // log.Println("Select", mail_data.client_name, "error:", err) if err.Error() == "no rows in result set" { id = 0 } else { os.Exit(1) } } // fmt.Println(">>>", id, client_name.String) return id, session_count } // Alarm_type // 1 "Неразрешенный код страны" "alarm" // 2 "Количество сессий больше заданного" "alarm" // 3 "Вход в аккаунт более чем с одного IP" "alarm" // 4 "Вход более чем в один аккаунт" "alarm" func PgSearchMailSessionAnomaly(conn *pgx.Conn, last_check_timestamp time.Time, conf Config) []MailSession { // log.Println("Select data from DB:", "select id, mail_account_id, sessions_count, country_code, remote_ip, last_time from mail_sessions where last_check_timestamp =", last_check_timestamp) rows, err_select := conn.Query(context.Background(), "select id, mail_account_id, sessions_count, country_code, remote_ip, last_time from mail_sessions where last_check_timestamp = $1", last_check_timestamp) if err_select != nil { log.Println("Query sessions failed:", err_select) os.Exit(1) } var res []MailSession for rows.Next() { var s MailSession values, _ := rows.Values() s.id = values[0].(int32) s.mail_account_id = values[1].(int32) s.sessions_count = values[2].(int32) s.country_code = strings.TrimSpace(values[3].(string)) s.remote_ip = values[4].(netip.Prefix) s.last_time = values[5].(string) s.check_time = last_check_timestamp log.Println(s.id, s.mail_account_id, s.sessions_count, s.country_code, s.remote_ip, s.last_time) if s.country_code != conf.country_code && s.country_code != "" { s.alarm_type = 1 log.Println("ALARM!!!", s.mail_account_id, s.sessions_count, s.country_code, s.remote_ip) res = append(res, s) } if s.sessions_count >= conf.sessions_count { // s.alarm_type = "Количество соединений больше " + string(conf.sessions_count) s.alarm_type = 2 log.Println("ALARM!!!", s.mail_account_id, s.sessions_count, s.country_code, s.remote_ip) res = append(res, s) // alarm_id := PgInsertMailAlarm(conn, s, "Код страны GeoIP не разрешен") // log.Println("Alarm insert with id:", alarm_id) } } return res } func PgSelectAlarm(conn *pgx.Conn, mail_session MailSession) (int32) { var id int32 // log.Println("Select data from DB:", "select id, sessions_count, country_code from mail_sessions where mail_account_id =", mail_data.id, "and remote_ip =",mail_data.remote_ip, "and last_time =", mail_data.session_time, "and last_check_timestamp =", mail_data.check_time) err := conn.QueryRow(context.Background(), "select id from alarm where mail_account_id = $1 and remote_ip = $2 and alarm_type = $3", mail_session.mail_account_id, mail_session.remote_ip, mail_session.alarm_type).Scan(&id) log.Println("Selected alarm data: ", mail_session.mail_account_id, mail_session.remote_ip) if err != nil { log.Println("Select alarm with account_id", mail_session.mail_account_id, " and remote ip", mail_session.remote_ip, "error:", err) if err.Error() == "no rows in result set" { id = 0 } else { os.Exit(1) } } return id } func PgSelectAlarmAccount(conn *pgx.Conn, mail_session MailSession) int32 { var s int32 var i int32 // log.Println("Select data from DB:", "select id, sessions_count, country_code from mail_sessions where mail_account_id =", mail_data.id, "and remote_ip =",mail_data.remote_ip, "and last_time =", mail_data.session_time, "and last_check_timestamp =", mail_data.check_time) query := fmt.Sprintf("select id from alarm where mail_account_id = %v and alarm_type = %v", mail_session.mail_account_id, mail_session.alarm_type) log.Println("PgSelectAlarmAccount execute query:", query) rows, err_select := conn.Query(context.Background(), query) if err_select != nil { log.Println("Query alarms PgSelectAlarmAccount failed:", err_select) os.Exit(1) } if len(rows.FieldDescriptions()) != 0 { i = 0 var id int32 for rows.Next() { values, _ := rows.Values() id = values[0].(int32) log.Println("\t found records with id:", id) i++ } // если выбрана только одна запись возвращаем ее id, // если несколько - то возвращаем количество записей if i == 1 { s = id } else { s = i } } else { s = 0 } return s } // Выборка записей по конкретному IP // запросы разные в зависимости от типа тревоги func PgSelectAlarmRemoteIP(conn *pgx.Conn, mail_session MailSession) (int32) { var s int32 var query string // В зависимости от типа тревоги функция возвращает либо количество записей либо ID записи switch mail_session.alarm_type { case 1: // По уму тут можно упростить добавив в запрос count() хотя не факт что прокатит query = fmt.Sprintf("select mail_account_id, remote_ip, alarm_type from alarm where remote_ip = '%v' and alarm_type = %v group by mail_account_id, remote_ip, alarm_type", mail_session.remote_ip, mail_session.alarm_type) log.Println("PgSelectAlarmRemoteIP execute query:",query) rows, err_select := conn.Query(context.Background(), query) if err_select != nil { log.Println("Query alarms PgSelectAlarmRemoteIP failed:", err_select) os.Exit(1) } if len(rows.FieldDescriptions()) != 0 { var i, id int32 i = 0 for rows.Next() { values, _ := rows.Values() id = values[0].(int32) log.Println("\t found records with id:", id) i++ } s = i } else { s = 0 } case 4: query = fmt.Sprintf("select id from alarm where remote_ip = '%v' and alarm_type = %v", mail_session.remote_ip, mail_session.alarm_type) log.Println(query) var id int32 err := conn.QueryRow(context.Background(), query).Scan(&id) if err != nil { log.Println("Query alarms PgSelectAlarmRemoteIP failed:", mail_session.remote_ip, "error:", err) if err.Error() == "no rows in result set" { id = 0 } else { os.Exit(1) } } else { s = id } } return s } func PgInsertAlarm(conn *pgx.Conn, mail_data MailSession, conf Config) int32 { var id int32 log.Println("Insert data", mail_data, "into alarm DB") err := conn.QueryRow(context.Background(), "insert into alarm(mail_account_id, remote_ip, country_code, last_time, last_check_timestamp, alarm_type, alarm_action) values($1, $2, $3, $4, $5, $6, false) returning id", mail_data.mail_account_id, mail_data.remote_ip, mail_data.country_code, mail_data.last_time, mail_data.check_time, mail_data.alarm_type).Scan(&id) if err != nil { log.Printf("Insert alarm data failed:", err) } return id } func PgInsertAlarmRemoteIP(conn *pgx.Conn, mail_data MailSession, conf Config) int32 { var id int32 log.Println("Insert data", mail_data, "into alarm DB") err := conn.QueryRow(context.Background(), "insert into alarm(remote_ip, country_code, last_time, last_check_timestamp, alarm_type, alarm_action) values($1, $2, $3, $4, $5, false) returning id", mail_data.remote_ip, mail_data.country_code, mail_data.last_time, mail_data.check_time, mail_data.alarm_type).Scan(&id) if err != nil { log.Printf("Insert alarm data failed:", err) } return id } func PgInsertAlarmAccount(conn *pgx.Conn, mail_data MailSession, conf Config) int32 { var id int32 log.Println("Insert data", mail_data, "into alarm DB") err := conn.QueryRow(context.Background(), "insert into alarm(mail_account_id, last_time, last_check_timestamp, alarm_type, alarm_action) values($1, $2, $3, $4, false) returning id", mail_data.mail_account_id, mail_data.last_time, mail_data.check_time, mail_data.alarm_type).Scan(&id) if err != nil { log.Printf("Insert alarm data failed:", err) } return id } func PgUpdateAlarm(conn *pgx.Conn, mail_session MailSession, conf Config, id int32) int32 { log.Println("Update alarm data", mail_session) err := conn.QueryRow(context.Background(), "update alarm set last_time=$1, last_check_timestamp=$2 where id=$3 returning id", mail_session.last_time, mail_session.check_time, id).Scan(&id) if err != nil { log.Printf("Update alarm data failed: %v\n", err) } return id } func PgInsertMailSession(conn *pgx.Conn, mail_data Mail_records) int32 { var id int32 sessions_count := 1 // log.Println("Insert data", mail_data, "into session DB") err := conn.QueryRow(context.Background(), "insert into mail_sessions(mail_account_id, remote_ip, sessions_count, last_time, country_code, last_check_timestamp, mail_reason) values($1, $2, $3, $4, $5, $6, $7) returning id", mail_data.id, mail_data.remote_ip, sessions_count, mail_data.session_time, mail_data.country_code, mail_data.check_time, mail_data.mail_reason).Scan(&id) if err != nil { log.Printf("Insert session data failed:", err) } return id } func PgUpdateMailSession(conn *pgx.Conn, mail_data Mail_records, session_id int32, sessions_count int32) int32 { var id int32 sessions_count++ // log.Println("Update data", mail_data, "into session DB") err := conn.QueryRow(context.Background(), "update mail_sessions set sessions_count=$1, last_time=$2, last_check_timestamp=$4, mail_reason=$5 where id=$3 returning id", sessions_count, mail_data.session_time, session_id, mail_data.check_time, mail_data.mail_reason).Scan(&id) if err != nil { log.Printf("Update session data failed: %v\n", err) } return id } func doit(mail Mail_records, es_index ESIndex, query ESQuery, c chan int) int { // Запрос данных из соответствующего индекса и выборка полей mail.{} и geoip.{} fmt.Println(es_index) fmt.Println(es_index.es_index_name) r := EsSearch(es_index.es_index_name, query) result := r["hits"].(map[string]interface{})["hits"].([]interface{}) conn, err := pgx.Connect(context.Background(), os.Getenv("DATABASE_URL")) // fmt.Println(*conn) if err != nil { fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err) os.Exit(1) } for i := range result { mail_res := result[i].(map[string]interface{})["_source"].(map[string]interface{})["mail"] geoip :=result[i].(map[string]interface{})["_source"].(map[string]interface{})["geoip"] if !isNil(mail_res) { //fmt.Println(">>>>>>", mail.(map[string]interface{})) // fmt.Println(">>>>>>", mail_res.(map[string]interface{})["client_account"]) if !isNil(mail_res.(map[string]interface{})["client_account"]) { mail.client_account = mail_res.(map[string]interface{})["client_account"].(string) } // fmt.Println("", mail_res.(map[string]interface{})["client_name"]) if !isNil(mail_res.(map[string]interface{})["client_name"]) { mail.client_name = mail_res.(map[string]interface{})["client_name"].(string) } // fmt.Println("", mail_res.(map[string]interface{})["remote_ip"]) if !isNil( mail_res.(map[string]interface{})["remote_ip"]) { mail.remote_ip = mail_res.(map[string]interface{})["remote_ip"].(string) } if !isNil( mail_res.(map[string]interface{})["log_datetime"]) { datetime := mail_res.(map[string]interface{})["log_datetime"].(string) // tmpl := "2022-08-18 16:17:24,446" mail.session_time = datetime // log.Println(">>>>>>>>>>>>>>>>", mail.session_time, datetime) // 2022-08-18 16:17:24,446 } if !isNil(mail_res.(map[string]interface{})["reason"]) { mail.mail_reason = mail_res.(map[string]interface{})["reason"].(string) } } if !isNil(geoip) { // fmt.Println("", geoip.(map[string]interface{})["country_iso_code"].(string)) mail.country_code = geoip.(map[string]interface{})["country_iso_code"].(string) } if mail.client_name == "" && mail.client_account != "" { mail.client_name = mail.client_account } if (mail.client_name != "") && mail.remote_ip != "" { log.Println(mail.client_name, mail.remote_ip, mail.country_code, mail.mail_reason) // fmt.Println(mail.client_name, mail.client_account, mail.remote_ip, mail.country_code) res := PgSearch(conn, mail) if res == 0 { mail.id = PgInsertAccount(conn, mail) } else { mail.id = res } // log.Println("Insert account", mail.client_name, mail.client_account," sucess with id", mail.id) id, session_count := PgSearchMailSession(conn, mail) if id == 0 { PgInsertMailSession(conn, mail) } else { PgUpdateMailSession(conn, mail, id, session_count) } } mail.client_account = "" mail.client_name = "" mail.remote_ip = "" mail.country_code = "" mail_res = nil } defer conn.Close(context.Background()) // Вывод статуса, количество выбранных записей и времени запроса hits := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)) log.Println("ES index name:", es_index.es_index_name, "; Query time period:", es_index.es_request_time_range, "; Records count:", hits) c <- hits return hits } //--------------------------------------------------------------------------- func main() { var ( operation string es_index_name string es_request_time_range string es_request_rows_quantity int zabbix_send bool pg_insert bool // zabbix_port int ) flag.StringVar(&es_index_name, "indexname", "", "Elasticsearch index name patterns by comma separated, (like \"filebeat\")") flag.StringVar(&es_request_time_range, "timerange", "1h", "Elasticsearch time range for records count (like 1m, 10h, e.t.c.)") flag.IntVar(&es_request_rows_quantity, "request-rows", 1, "How many rows will return") flag.BoolVar(&zabbix_send, "zabbix-send", false, "Send metrics or discovery data into zabbix") flag.BoolVar(&pg_insert, "pg-insert", false, "Write data into postgres dsatabase") flag.StringVar(&Exclude_index_list, "exclude-index", ".", "Elasticsearch index name comma-separated list for exluded, (like \"filebeat,mailindex,e.t.c\")") flag.StringVar(&operation, "operation", "es-cluster-info", "Opertation type, must be:\n\tes-cluster-info - ES Cluster information (version e.t.c)\n\tes-get-indices - geting all index\n\tes-indices-discover - getting es index pattern list\n\tes-records-count - getting the number of records for a time range for all index pattern\n\tes-index-records-count - getting records count for one index (used with -indexname option\n\tes-request-data - select data from index") flag.Parse() fmt.Println(zabbix_send) zabbix_host := os.Getenv("ZABBIX_HOST") if zabbix_host == "" && zabbix_send == true { fmt.Println("Send error: make sure environment variables `ZABBIX_HOST` was set") os.Exit(1) } if os.Getenv("ELASTICSEARCH_URL") == "" { fmt.Println("Send error: make sure environment variables `ELASTICSEARCH_URL` was set") os.Exit(1) } // if es_request_time_range > 24 { // log.Println("Error: Time range must be a number between 1 and 24") // os.Exit(1) // } if pg_insert { if os.Getenv("PGHOST") == "" { fmt.Println("Send error: make sure environment variables `PGHOST` was set") os.Exit(1) } if os.Getenv("PGUSER") == "" { fmt.Println("Send error: make sure environment variables `PGUSER` was set") os.Exit(1) } if os.Getenv("PGPASSWORD") == "" { fmt.Println("Send error: make sure environment variables `PGPASSWORD` was set") os.Exit(1) } if os.Getenv("PGDATABASE") == "" { fmt.Println("Send error: make sure environment variables `PGDATABASE` was set") os.Exit(1) } } cfg := opensearch.Config{ Transport: &http.Transport{ MaxIdleConnsPerHost: 10, ResponseHeaderTimeout: 60 * time.Second, DialContext: (&net.Dialer{Timeout: time.Second}).DialContext, TLSClientConfig: &tls.Config{ MaxVersion: tls.VersionTLS13, InsecureSkipVerify: true, }, }, } es, err := opensearch.NewClient(cfg) if err != nil { log.Fatalf("Error creating the client: %s", err) } // fmt.Println(operation) switch operation { case "es-cluster-info": EsClusterInfo(es) case "es-get-indices": // fmt.Println(operation) for _, index := range EsGetIndices(es) { fmt.Println(index) } case "es-indices-discover": result_discovery_json := EsIndexDiscover(EsGetIndices(es)) // fmt.Println(zabbix_send) if zabbix_send == true { ZabbixSender(zabbix_host, "indices", result_discovery_json) } fmt.Println(result_discovery_json) case "es-records-count": // records count for all indices // EsIndexRecordsCount(EsGetIndices(es), es_request_time_range) // result_index := make(map[int]json_records) // j := int(0) // for _, es_index_name := range EsGetIndices(es) { // r := EsSearch(es_index_name, es_request_time_range, es_request_rows_quantity) // result := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)) // result_index[j] = json_records{es_index_name, result} // log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result) // if zabbix_send == true { // ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result)) // ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, es_request_time_range) // } // j++ // } case "es-index-records-count": // records count for one index // r := EsSearch(es_index_name, es_request_time_range, es_request_rows_quantity) // result := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)) // if zabbix_send == true { // ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result)) // ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, es_request_time_range) // } // log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result) case "es-request-data": var conf Config conf.sessions_count = 50 conf.country_code = "RU" var mail Mail_records mail.check_time = time.Unix(time.Now().Unix(), 0) last_check_timestamp := mail.check_time // канал для получания информации от процессов (routine) c := make(chan int) var query_auth ESQuery var query_invalid ESQuery request_time_range := "now-" + es_request_time_range s := `{ "query": { "bool": { "filter": [ { "range": { "@timestamp": { "gt": "now-5m" } } }, { "match": { "mail.command": "Authentication" } }, { "match_phrase": { "log.file.path": "/opt/zimbra/log/mailbox.log" } } ] } }, "size": 1000 }` err = json.Unmarshal([]byte(s), &query_auth) if err != nil { fmt.Println("JSON query error:",err) } err = json.Unmarshal([]byte(s), &query_invalid) if err != nil { fmt.Println("JSON query error:",err) } query_auth.Query.Bool.Filter[0] = map[string]interface{}{ "range": map[string]interface{}{ "@timestamp": map[string]string{ "gt": request_time_range, }, }, } query_auth.Query.Bool.Filter[1] = map[string]interface{}{ "match": map[string]string{ "mail.command": "authentication", }, } query_auth.Query.Bool.Filter[2] = map[string]interface{}{ "match_phrase": map[string]string{ "log.file.path": "/opt/zimbra/log/mailbox.log", }, } // query_auth.Size = es_request_rows_quantity query_invalid.Query.Bool.Filter[0] = map[string]interface{}{ "range": map[string]interface{}{ "@timestamp": map[string]string{ "gt": request_time_range, }, }, } query_invalid.Query.Bool.Filter[2] = map[string]interface{}{ "match_phrase": map[string]string{ "log.file.path": "/opt/zimbra/log/mailbox.log", }, } // query_invalid.Size = es_request_rows_quantity query_invalid.Query.Bool.Filter[1] = map[string]interface{}{ "match": map[string]string{ "mail.reason" : "*password", }, } // Получаем список индексов и делаем список индексов согласно списка шаблонов имени // заданного с коммандной строки es_index_name_prefix es_indicies_all := EsGetIndices(es) var es_indicies_filtered_list []string for _, finded_index := range es_indicies_all { // fmt.Println(finded_index) for _, pattern := range strings.Split(es_index_name, ",") { if strings.HasPrefix(finded_index, pattern) { es_indicies_filtered_list = append(es_indicies_filtered_list, finded_index) fmt.Println(">>",finded_index) } } // maillog_mailbox4_filebeat-2023.10.16 // es_indicies_filtered = strings.TrimPrefix(s, "¡¡¡Hello, ") } // запускаем параллельно работы по каждому индексу for _, index := range es_indicies_filtered_list { // fmt.Println(index) var es_index ESIndex es_index.es_index_name = index es_index.es_request_time_range = es_request_time_range es_index.es_request_rows_quantity = es_request_rows_quantity go doit(mail, es_index, query_auth, c) go doit(mail, es_index, query_invalid, c) // doit(mail, es_index, query_auth, c) // doit(mail, es_index, query_invalid, c) } // Читаем записи из канала для каждого индекса и обрабатываем далее // for _, index := range strings.Split(es_index_name, ",") { for _, index := range es_indicies_filtered_list { hits := <-c // Получает значение от канала log.Println("request for", index, "has finished with", hits, "records") } conn, err := pgx.Connect(context.Background(), os.Getenv("DATABASE_URL")) fmt.Println(*conn) if err != nil { fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err) os.Exit(1) } // Выбираем аномалии согласно конфига result := PgSearchMailSessionAnomaly(conn, last_check_timestamp, conf) // --- Типы тревог ---- // 1 "Неразрешенный код страны" // 2 "Количество сессий больше заданного" // 3 "Вход в аккаунт более чем с одного IP" // 4 "Вход более чем в один аккаунт" // Задачи: // 1. выбрать текущие сессии с geoip не равным RU (mail_session, alarm_type = 1) // 2. выбрать текущие сессии с количеством больше заданного (mail_sessions, alarm_type = 2) // 3. выбрать тревоги за все периоды по каждому ip из текущих сессий // - входы с одного адреса в несколько аккаунтов (alarm, alarm_type = 4) // 4. Выбрать тревоги за все периоды по каждому аккаунту из текущих сессий // - входы в один аккаунт с нескольких адресов (alarm, alarm_type = 3) // Если тревога для данного адреса или аккаунта заданного типа уже есть // то просто обновить запись с новой меткой времени. // При выборке тревог выставить alarm_action = false if len(result) != 0 { log.Println("Anomaly finded", result) for i := range result { // Смотрим есть ли уже запись для данной аномалии // если есть то обновляем если нет - добавляем id := PgSelectAlarm(conn, result[i]) if id == 0 { log.Println("Alarm insert with id", PgInsertAlarm(conn, result[i], conf)) } else { log.Println("Alarm type",result[i].alarm_type,"with id", PgUpdateAlarm(conn, result[i], conf, id), "was updated") } // Выбираем список тревог для определенного аккаунта и типа тревоги = 1. // Если возвращается больше 1-й записи то генерим тревогу с типом 3 для этого аккаунта id1 := PgSelectAlarmAccount(conn, result[i]) log.Println("Alarm records for account id:", result[i].mail_account_id, "found:", id1) if id1 > 1 { result[i].alarm_type = 3 id = PgSelectAlarmAccount(conn, result[i]) if id == 0 { log.Println("Alarm insert with id", PgInsertAlarmAccount(conn, result[i], conf)) } else { log.Println("Alarm type",result[i].alarm_type,"with id", PgUpdateAlarm(conn, result[i], conf, id), "was updated") } } result[i].alarm_type = 1 id2 := PgSelectAlarmRemoteIP(conn, result[i]) log.Println("PgSelectAlarmRemoteIP with ip:",result[i].remote_ip, id2) if id2 > 1 { result[i].alarm_type = 4 id = PgSelectAlarmRemoteIP(conn, result[i]) if id == 0 { log.Println("Alarm insert with id", PgInsertAlarmRemoteIP(conn, result[i], conf)) } else { log.Println("Alarm type",result[i].alarm_type,"with id", PgUpdateAlarm(conn, result[i], conf, id), "was updated") } } } } defer conn.Close(context.Background()) } }