zimbra-alarm/zimbra-alarm.go

1108 lines
42 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//------------------------------------------------------------------------------
// Getting and monitoring Elasticsearch indices and send the metrics into zabbix
//-------------------------------------------------------------------------------
// Author: Sergey Kalinin
//-------------------------------------------------------------------------------
// ZABBIX_SERVER="zabbix"
// ZABBIX_PORT="10051"
// ZABBIX_HOST="elastic cluster"
// ELASTICSEARCH_URL="https://user:pass@elastic:9200"
//-------------------------------------------------------------------------------
package main
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"log"
"net"
"net/netip"
"net/http"
"os"
"regexp"
"strconv"
"strings"
"time"
"reflect"
"github.com/adubkov/go-zabbix"
// "github.com/elastic/go-elasticsearch/v7"
// "github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgtype"
)
type json_records struct {
INDEX string
RECORD_COUNT int
}
type Mail_records struct {
id int32
client_name string
client_account string
//remote_ip pgtype.Inet
remote_ip string
country_code string
check_time time.Time
session_time string
mail_reason string
}
type ESIndex struct {
es_index_name string
es_request_time_range string
es_request_rows_quantity int
}
// type ESQuery struct {
// Query struct {
// Bool struct {
// Filter []struct {
// Range struct {
// Timestamp struct {
// Gt string `json:"gt"`
// } `json:"@timestamp"`
// } `json:"range"`
// Match struct {
// MailCommand string `json:"mail.command"`
// } `json:"match"`
// } `json:"filter"`
// } `json:"bool"`
// } `json:"query"`
// Size int `json:"size"`
// }
type ESQuery struct {
Query struct {
Bool struct {
Filter []map[string]interface{} `json:"filter"`
} `json:"bool"`
} `json:"query"`
Size int `json:"size"`
}
type MailSession struct {
id int32
mail_account_id int32
sessions_count int32
country_code string
remote_ip netip.Prefix
last_time string
check_time time.Time
alarm_type int
}
type Config struct {
sessions_count int32
country_code string
}
var Exclude_index_list string
var conn *pgx.Conn
// Определяем входит ли строка в slice
func StringContains(a []string, x string) bool {
for _, n := range a {
if x == n {
return true
}
}
return false
}
func isNil(i interface{}) bool {
if i == nil {
return true
}
switch reflect.TypeOf(i).Kind() {
case reflect.Ptr, reflect.Map, reflect.Array, reflect.Chan, reflect.Slice:
return reflect.ValueOf(i).IsNil()
}
return false
}
// Проверка вхождения имени индекса (префикса) в список исключений
func IndexPrefixExclude(index_name string) bool {
exclude_index_name := strings.Split(Exclude_index_list, ",")
// log.Println(Exclude_index_list)
for _, n := range exclude_index_name {
if strings.HasPrefix(index_name, n) {
return true
}
}
return false
}
func EsSearch(index_name string, query ESQuery) map[string]interface{} {
var (
r map[string]interface{}
)
cfg := opensearch.Config{
// Addresses: []string{
// "http://localhost:9200",
// "http://localhost:9201",
// },
// Username: "foo",
// Password: "bar",
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: 60 * time.Second,
DialContext: (&net.Dialer{Timeout: time.Second}).DialContext,
TLSClientConfig: &tls.Config{
MaxVersion: tls.VersionTLS13,
InsecureSkipVerify: true,
},
},
}
es, err := opensearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
// Check response status
if res.IsError() {
log.Fatalf("Error: %s", res.String())
}
var buf bytes.Buffer
// query := map[string]interface{}{
// "query": map[string]interface{}{
// "range": map[string]interface{}{
// "@timestamp": map[string]interface{}{
// // "gt": "now-1h",
// "gt": request_time_range,
// },
// }
// "match": map[string]interface{}{
// "mail.command": "Auth",
// },
// },
// "size": es_request_rows_quantity,
// "sort": map[string]interface{}{
// "@timestamp": map[string]interface{}{
// "order": "desc",
// },
// },
// }
a, _ := json.Marshal(query)
fmt.Println(string(a))
if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Fatalf("Error encoding query: %s", err)
}
// Perform the search request.
res, err = es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithIndex(index_name+"*"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
} else {
// Print the response status and error information.
log.Fatalf("[%s] %s: %s",
res.Status(),
e["error"].(map[string]interface{})["type"],
e["error"].(map[string]interface{})["reason"],
)
}
}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Printf("Error parsing the response body:", err)
}
// json_data, _ := json.Marshal(r)
// fmt.Println(string(json_data))
return r
}
func EsIndexDiscover(es_index_list []string) string {
result_discovery_json := string("{\"data\": [")
for _, value := range es_index_list {
// result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\",\"{#ITEM_TYPE}\":\"Records count\"" + "},"
result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\"" + "},"
}
result_discovery_json = strings.TrimRight(result_discovery_json, ",") + "]}"
// fmt.Println(result_discovery_json)
return (result_discovery_json)
}
// func EsIndexRecordsCount(es_index_list []string, es_request_time_range string, es_request_rows_quantity int) {
// // fmt.Println(es_request_time_range)
// result_index := make(map[int]json_records)
// j := int(0)
// for _, value := range es_index_list {
// r := EsSearch(value, es_request_time_range, es_request_rows_quantity)
// result := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
//
// result_index[j] = json_records{value, result}
// j++
// }
// log.Println(result_index)
// }
//
func EsClusterInfo(es *opensearch.Client) {
var (
r map[string]interface{}
)
// 1. Get cluster info
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
// Check response status
if res.IsError() {
log.Fatalf("Error: %s", res.String())
}
// Deserialize the response into a map.
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// Print client and server version numbers.
fmt.Printf("Client: %s\n", opensearch.Version)
fmt.Printf("Client: %s\n", opensearch.Version)
fmt.Printf("Server: %s\n", r["version"].(map[string]interface{})["number"])
}
func EsIndexFieldExists(es *opensearch.Client, index_name string, field_name string) bool {
// Check if the filed exists into index
var (
fields_request map[string]interface{}
fields []string
indices []string
field_exists bool
)
fields = append(fields, field_name)
indices = append(indices, index_name)
req := opensearchapi.FieldCapsRequest{
// Index: []string{},
Index: indices,
Fields: fields,
}
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("ERROR: %s", err)
}
if err := json.NewDecoder(res.Body).Decode(&fields_request); err != nil {
log.Fatalf("->Error parsing the response body: %s", err)
}
for i, f := range fields_request {
if i == "fields" {
// if f.(map[string]interface{})[`@timestamp`] == nil {
if f.(map[string]interface{})[field_name] == nil {
field_exists = false
} else {
field_exists = true
}
}
}
return (field_exists)
}
func EsGetIndices(es *opensearch.Client) []string {
//Get indices list
// log.Println("sjsjsjsj")
var (
index_request []map[string]interface{}
es_index_list []string
)
req := opensearchapi.CatIndicesRequest{
// Index: []string{},
Format: "JSON",
// ExpandWildcards,
}
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("ERROR: %s", err)
}
// log.Println(res)
if err := json.NewDecoder(res.Body).Decode(&index_request); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
for _, hit := range index_request {
m := regexp.MustCompile("-[0-9]")
// позиция вхождения регулярки в имени индекса
i := m.FindStringIndex(hit["index"].(string))
// наименование индекса
index_name := hit["index"].(string)
// log.Println(index_name)
// Checking the @timestamp field exists
// if !EsIndexFieldExists(es, index_name, `@timestamp`) {
// continue
// // fmt.Println(EsIndexFieldExists(es, index_name))
// }
// if strings.HasPrefix(index_name, "apdex") {
// continue
// // fmt.Println(EsIndexFieldExists(es, index_name))
// }
// Итоговое наименование индекса
var index_name_short string
// Исключаем индексы от кибаны (.kibana-*) и обрезаем цифры (даты и версии)
if !IndexPrefixExclude(index_name) {
if i != nil {
index_name_short = strings.TrimSpace(index_name[0:i[0]])
} else {
index_name_short = strings.TrimSpace(index_name)
}
if !StringContains(es_index_list, index_name_short) {
es_index_list = append(es_index_list, index_name_short)
}
} else {
log.Println("Index", index_name, "has name from exclude lists")
}
}
log.Println(es_index_list)
return (es_index_list)
}
func ZabbixSender(zabbix_host string, metric_name string, metric_value string) bool {
var (
metrics []*zabbix.Metric
zabbix_port int
err error
)
zabbix_server := os.Getenv("ZABBIX_SERVER")
// Read and checkin zabbix server and port
if zabbix_server == "" {
log.Println("Login error: make sure environment variables `ZABBIX_SERVER` are defined")
os.Exit(1)
}
if os.Getenv("ZABBIX_PORT") == "" {
log.Println("Login error: make sure environment variables `ZABBIX_PORT` are defined")
os.Exit(1)
}
if zabbix_port, err = strconv.Atoi(os.Getenv("ZABBIX_PORT")); err != nil {
log.Println(zabbix_port, "Zabbix port value error")
}
metrics = append(metrics, zabbix.NewMetric(zabbix_host, metric_name, metric_value, time.Now().Unix()))
metrics = append(metrics, zabbix.NewMetric(zabbix_host, "status", "OK"))
// Create instance of Packet class
packet := zabbix.NewPacket(metrics)
// fmt.Println(zabbix_host, metric_name, metric_value, metrics)
// Send packet to zabbix
z := zabbix.NewSender(zabbix_server, zabbix_port)
res, err := z.Send(packet)
log.Println(string(res))
return true
}
func PgSearch(conn *pgx.Conn, mail_data Mail_records) int32 {
var id int32
var client_name pgtype.Varchar
// var remote_ip pgtype.CIDR
// var country_code pgtype.Varchar
// fmt.Println("Select data from DB:", mail_data.client_name)
err := conn.QueryRow(context.Background(), "select id, client_name from mail_accounts where client_name = $1", mail_data.client_name).Scan(&id, &client_name)
//err = conn.QueryRow(context.Background(), "select id, client_name, remote_ip, country_code from mail_accounts where")
if err != nil {
fmt.Fprintf(os.Stderr, "Query select account failed: %v\n", err)
// id = PgInsertAccount(conn, mail_data)
log.Println("Select", mail_data.client_name, "error:", err)
if err.Error() == "no rows in result set" {
id = 0
} else {
os.Exit(1)
}
}
// fmt.Println(">>>", id, client_name.String)
return id
}
func PgInsertAccount(conn *pgx.Conn, mail_data Mail_records) int32 {
var id int32
// var client_name pgtype.Varchar
// var remote_ip pgtype.CIDR
// var country_code pgtype.Varchar
// fmt.Println("Insert data into DB")
err := conn.QueryRow(context.Background(), "insert into mail_accounts(client_name) values($1) returning id", mail_data.client_name).Scan(&id)
if err != nil {
fmt.Fprintf(os.Stderr, "Query insert account failed: %v\n", err)
}
//fmt.Println(id)
return id
}
func PgSearchMailSession(conn *pgx.Conn, mail_data Mail_records) (int32, int32) {
var id int32
var session_count int32
// var client_name pgtype.Varchar
// var remote_ip pgtype.CIDR
var country_code pgtype.Varchar
// log.Println("Select data from DB:", "select id, sessions_count, country_code from mail_sessions where mail_account_id =", mail_data.id, "and remote_ip =",mail_data.remote_ip, "and last_time =", mail_data.session_time, "and last_check_timestamp =", mail_data.check_time)
err := conn.QueryRow(context.Background(), "select id, sessions_count, country_code from mail_sessions where mail_account_id = $1 and remote_ip = $2 and last_check_timestamp = $3", mail_data.id, mail_data.remote_ip, mail_data.check_time).Scan(&id, &session_count, &country_code)
//err = conn.QueryRow(context.Background(), "select id, client_name, remote_ip, country_code from mail_accounts where")\
// log.Println("Selected session data: ", id, session_count, country_code)
if err != nil {
log.Println("Query session PgSearchMailSession failed:", err)
// id = PgInsertAccount(conn, mail_data)
// log.Println("Select", mail_data.client_name, "error:", err)
if err.Error() == "no rows in result set" {
id = 0
} else {
os.Exit(1)
}
}
// fmt.Println(">>>", id, client_name.String)
return id, session_count
}
// Alarm_type
// 1 "Неразрешенный код страны" "alarm"
// 2 "Количество сессий больше заданного" "alarm"
// 3 "Вход в аккаунт более чем с одного IP" "alarm"
// 4 "Вход более чем в один аккаунт" "alarm"
func PgSearchMailSessionAnomaly(conn *pgx.Conn, last_check_timestamp time.Time, conf Config) []MailSession {
// log.Println("Select data from DB:", "select id, mail_account_id, sessions_count, country_code, remote_ip, last_time from mail_sessions where last_check_timestamp =", last_check_timestamp)
rows, err_select := conn.Query(context.Background(), "select id, mail_account_id, sessions_count, country_code, remote_ip, last_time from mail_sessions where last_check_timestamp = $1", last_check_timestamp)
if err_select != nil {
log.Println("Query sessions failed:", err_select)
os.Exit(1)
}
var res []MailSession
for rows.Next() {
var s MailSession
values, _ := rows.Values()
s.id = values[0].(int32)
s.mail_account_id = values[1].(int32)
s.sessions_count = values[2].(int32)
s.country_code = strings.TrimSpace(values[3].(string))
s.remote_ip = values[4].(netip.Prefix)
s.last_time = values[5].(string)
s.check_time = last_check_timestamp
log.Println(s.id, s.mail_account_id, s.sessions_count, s.country_code, s.remote_ip, s.last_time)
if s.country_code != conf.country_code && s.country_code != "" {
s.alarm_type = 1
log.Println("ALARM!!!", s.mail_account_id, s.sessions_count, s.country_code, s.remote_ip)
res = append(res, s)
}
if s.sessions_count >= conf.sessions_count {
// s.alarm_type = "Количество соединений больше " + string(conf.sessions_count)
s.alarm_type = 2
log.Println("ALARM!!!", s.mail_account_id, s.sessions_count, s.country_code, s.remote_ip)
res = append(res, s)
// alarm_id := PgInsertMailAlarm(conn, s, "Код страны GeoIP не разрешен")
// log.Println("Alarm insert with id:", alarm_id)
}
}
return res
}
func PgSelectAlarm(conn *pgx.Conn, mail_session MailSession) (int32) {
var id int32
// log.Println("Select data from DB:", "select id, sessions_count, country_code from mail_sessions where mail_account_id =", mail_data.id, "and remote_ip =",mail_data.remote_ip, "and last_time =", mail_data.session_time, "and last_check_timestamp =", mail_data.check_time)
err := conn.QueryRow(context.Background(), "select id from alarm where mail_account_id = $1 and remote_ip = $2 and alarm_type = $3", mail_session.mail_account_id, mail_session.remote_ip, mail_session.alarm_type).Scan(&id)
log.Println("Selected alarm data: ", mail_session.mail_account_id, mail_session.remote_ip)
if err != nil {
log.Println("Select alarm with account_id", mail_session.mail_account_id, " and remote ip", mail_session.remote_ip, "error:", err)
if err.Error() == "no rows in result set" {
id = 0
} else {
os.Exit(1)
}
}
return id
}
func PgSelectAlarmAccount(conn *pgx.Conn, mail_session MailSession) int32 {
var s int32
var i int32
// log.Println("Select data from DB:", "select id, sessions_count, country_code from mail_sessions where mail_account_id =", mail_data.id, "and remote_ip =",mail_data.remote_ip, "and last_time =", mail_data.session_time, "and last_check_timestamp =", mail_data.check_time)
query := fmt.Sprintf("select id from alarm where mail_account_id = %v and alarm_type = %v", mail_session.mail_account_id, mail_session.alarm_type)
log.Println("PgSelectAlarmAccount execute query:", query)
rows, err_select := conn.Query(context.Background(), query)
if err_select != nil {
log.Println("Query alarms PgSelectAlarmAccount failed:", err_select)
os.Exit(1)
}
if len(rows.FieldDescriptions()) != 0 {
i = 0
var id int32
for rows.Next() {
values, _ := rows.Values()
id = values[0].(int32)
log.Println("\t found records with id:", id)
i++
}
// если выбрана только одна запись возвращаем ее id,
// если несколько - то возвращаем количество записей
if i == 1 {
s = id
} else {
s = i
}
} else {
s = 0
}
return s
}
// Выборка записей по конкретному IP
// запросы разные в зависимости от типа тревоги
func PgSelectAlarmRemoteIP(conn *pgx.Conn, mail_session MailSession) (int32) {
var s int32
var query string
// В зависимости от типа тревоги функция возвращает либо количество записей либо ID записи
switch mail_session.alarm_type {
case 1:
// По уму тут можно упростить добавив в запрос count() хотя не факт что прокатит
query = fmt.Sprintf("select mail_account_id, remote_ip, alarm_type from alarm where remote_ip = '%v' and alarm_type = %v group by mail_account_id, remote_ip, alarm_type", mail_session.remote_ip, mail_session.alarm_type)
log.Println("PgSelectAlarmRemoteIP execute query:",query)
rows, err_select := conn.Query(context.Background(), query)
if err_select != nil {
log.Println("Query alarms PgSelectAlarmRemoteIP failed:", err_select)
os.Exit(1)
}
if len(rows.FieldDescriptions()) != 0 {
var i, id int32
i = 0
for rows.Next() {
values, _ := rows.Values()
id = values[0].(int32)
log.Println("\t found records with id:", id)
i++
}
s = i
} else {
s = 0
}
case 4:
query = fmt.Sprintf("select id from alarm where remote_ip = '%v' and alarm_type = %v", mail_session.remote_ip, mail_session.alarm_type)
log.Println(query)
var id int32
err := conn.QueryRow(context.Background(), query).Scan(&id)
if err != nil {
log.Println("Query alarms PgSelectAlarmRemoteIP failed:", mail_session.remote_ip, "error:", err)
if err.Error() == "no rows in result set" {
id = 0
} else {
os.Exit(1)
}
} else {
s = id
}
}
return s
}
func PgInsertAlarm(conn *pgx.Conn, mail_data MailSession, conf Config) int32 {
var id int32
log.Println("Insert data", mail_data, "into alarm DB")
err := conn.QueryRow(context.Background(), "insert into alarm(mail_account_id, remote_ip, country_code, last_time, last_check_timestamp, alarm_type, alarm_action) values($1, $2, $3, $4, $5, $6, false) returning id", mail_data.mail_account_id, mail_data.remote_ip, mail_data.country_code, mail_data.last_time, mail_data.check_time, mail_data.alarm_type).Scan(&id)
if err != nil {
log.Printf("Insert alarm data failed:", err)
}
return id
}
func PgInsertAlarmRemoteIP(conn *pgx.Conn, mail_data MailSession, conf Config) int32 {
var id int32
log.Println("Insert data", mail_data, "into alarm DB")
err := conn.QueryRow(context.Background(), "insert into alarm(remote_ip, country_code, last_time, last_check_timestamp, alarm_type, alarm_action) values($1, $2, $3, $4, $5, false) returning id", mail_data.remote_ip, mail_data.country_code, mail_data.last_time, mail_data.check_time, mail_data.alarm_type).Scan(&id)
if err != nil {
log.Printf("Insert alarm data failed:", err)
}
return id
}
func PgInsertAlarmAccount(conn *pgx.Conn, mail_data MailSession, conf Config) int32 {
var id int32
log.Println("Insert data", mail_data, "into alarm DB")
err := conn.QueryRow(context.Background(), "insert into alarm(mail_account_id, last_time, last_check_timestamp, alarm_type, alarm_action) values($1, $2, $3, $4, false) returning id", mail_data.mail_account_id, mail_data.last_time, mail_data.check_time, mail_data.alarm_type).Scan(&id)
if err != nil {
log.Printf("Insert alarm data failed:", err)
}
return id
}
func PgUpdateAlarm(conn *pgx.Conn, mail_session MailSession, conf Config, id int32) int32 {
log.Println("Update alarm data", mail_session)
err := conn.QueryRow(context.Background(), "update alarm set last_time=$1, last_check_timestamp=$2 where id=$3 returning id", mail_session.last_time, mail_session.check_time, id).Scan(&id)
if err != nil {
log.Printf("Update alarm data failed: %v\n", err)
}
return id
}
func PgInsertMailSession(conn *pgx.Conn, mail_data Mail_records) int32 {
var id int32
sessions_count := 1
// log.Println("Insert data", mail_data, "into session DB")
err := conn.QueryRow(context.Background(), "insert into mail_sessions(mail_account_id, remote_ip, sessions_count, last_time, country_code, last_check_timestamp, mail_reason) values($1, $2, $3, $4, $5, $6, $7) returning id", mail_data.id, mail_data.remote_ip, sessions_count, mail_data.session_time, mail_data.country_code, mail_data.check_time, mail_data.mail_reason).Scan(&id)
if err != nil {
log.Printf("Insert session data failed:", err)
}
return id
}
func PgUpdateMailSession(conn *pgx.Conn, mail_data Mail_records, session_id int32, sessions_count int32) int32 {
var id int32
sessions_count++
// log.Println("Update data", mail_data, "into session DB")
err := conn.QueryRow(context.Background(), "update mail_sessions set sessions_count=$1, last_time=$2, last_check_timestamp=$4, mail_reason=$5 where id=$3 returning id", sessions_count, mail_data.session_time, session_id, mail_data.check_time, mail_data.mail_reason).Scan(&id)
if err != nil {
log.Printf("Update session data failed: %v\n", err)
}
return id
}
func doit(mail Mail_records, es_index ESIndex, query ESQuery, c chan int) int {
// Запрос данных из соответствующего индекса и выборка полей mail.{} и geoip.{}
fmt.Println(es_index)
fmt.Println(es_index.es_index_name)
r := EsSearch(es_index.es_index_name, query)
result := r["hits"].(map[string]interface{})["hits"].([]interface{})
conn, err := pgx.Connect(context.Background(), os.Getenv("DATABASE_URL"))
// fmt.Println(*conn)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
}
for i := range result {
mail_res := result[i].(map[string]interface{})["_source"].(map[string]interface{})["mail"]
geoip :=result[i].(map[string]interface{})["_source"].(map[string]interface{})["geoip"]
if !isNil(mail_res) {
//fmt.Println(">>>>>>", mail.(map[string]interface{}))
// fmt.Println(">>>>>>", mail_res.(map[string]interface{})["client_account"])
if !isNil(mail_res.(map[string]interface{})["client_account"]) {
mail.client_account = mail_res.(map[string]interface{})["client_account"].(string)
}
// fmt.Println("", mail_res.(map[string]interface{})["client_name"])
if !isNil(mail_res.(map[string]interface{})["client_name"]) {
mail.client_name = mail_res.(map[string]interface{})["client_name"].(string)
}
// fmt.Println("", mail_res.(map[string]interface{})["remote_ip"])
if !isNil( mail_res.(map[string]interface{})["remote_ip"]) {
mail.remote_ip = mail_res.(map[string]interface{})["remote_ip"].(string)
}
if !isNil( mail_res.(map[string]interface{})["log_datetime"]) {
datetime := mail_res.(map[string]interface{})["log_datetime"].(string)
// tmpl := "2022-08-18 16:17:24,446"
mail.session_time = datetime
// log.Println(">>>>>>>>>>>>>>>>", mail.session_time, datetime)
// 2022-08-18 16:17:24,446
}
if !isNil(mail_res.(map[string]interface{})["reason"]) {
mail.mail_reason = mail_res.(map[string]interface{})["reason"].(string)
} }
if !isNil(geoip) {
// fmt.Println("", geoip.(map[string]interface{})["country_iso_code"].(string))
mail.country_code = geoip.(map[string]interface{})["country_iso_code"].(string)
}
if mail.client_name == "" && mail.client_account != "" {
mail.client_name = mail.client_account
}
if (mail.client_name != "") && mail.remote_ip != "" {
log.Println(mail.client_name, mail.remote_ip, mail.country_code, mail.mail_reason)
// fmt.Println(mail.client_name, mail.client_account, mail.remote_ip, mail.country_code)
res := PgSearch(conn, mail)
if res == 0 {
mail.id = PgInsertAccount(conn, mail)
} else {
mail.id = res
}
// log.Println("Insert account", mail.client_name, mail.client_account," sucess with id", mail.id)
id, session_count := PgSearchMailSession(conn, mail)
if id == 0 {
PgInsertMailSession(conn, mail)
} else {
PgUpdateMailSession(conn, mail, id, session_count)
}
}
mail.client_account = ""
mail.client_name = ""
mail.remote_ip = ""
mail.country_code = ""
mail_res = nil
}
defer conn.Close(context.Background())
// Вывод статуса, количество выбранных записей и времени запроса
hits := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
log.Println("ES index name:", es_index.es_index_name, "; Query time period:", es_index.es_request_time_range, "; Records count:", hits)
c <- hits
return hits
}
//---------------------------------------------------------------------------
func main() {
var (
operation string
es_index_name string
es_request_time_range string
es_request_rows_quantity int
zabbix_send bool
pg_insert bool
// zabbix_port int
)
flag.StringVar(&es_index_name, "indexname", "", "Elasticsearch index name patterns by comma separated, (like \"filebeat\")")
flag.StringVar(&es_request_time_range, "timerange", "1h", "Elasticsearch time range for records count (like 1m, 10h, e.t.c.)")
flag.IntVar(&es_request_rows_quantity, "request-rows", 1, "How many rows will return")
flag.BoolVar(&zabbix_send, "zabbix-send", false, "Send metrics or discovery data into zabbix")
flag.BoolVar(&pg_insert, "pg-insert", false, "Write data into postgres dsatabase")
flag.StringVar(&Exclude_index_list, "exclude-index", ".", "Elasticsearch index name comma-separated list for exluded, (like \"filebeat,mailindex,e.t.c\")")
flag.StringVar(&operation, "operation", "es-cluster-info", "Opertation type, must be:\n\tes-cluster-info - ES Cluster information (version e.t.c)\n\tes-get-indices - geting all index\n\tes-indices-discover - getting es index pattern list\n\tes-records-count - getting the number of records for a time range for all index pattern\n\tes-index-records-count - getting records count for one index (used with -indexname option\n\tes-request-data - select data from index")
flag.Parse()
fmt.Println(zabbix_send)
zabbix_host := os.Getenv("ZABBIX_HOST")
if zabbix_host == "" && zabbix_send == true {
fmt.Println("Send error: make sure environment variables `ZABBIX_HOST` was set")
os.Exit(1)
}
if os.Getenv("ELASTICSEARCH_URL") == "" {
fmt.Println("Send error: make sure environment variables `ELASTICSEARCH_URL` was set")
os.Exit(1)
}
// if es_request_time_range > 24 {
// log.Println("Error: Time range must be a number between 1 and 24")
// os.Exit(1)
// }
if pg_insert {
if os.Getenv("PGHOST") == "" {
fmt.Println("Send error: make sure environment variables `PGHOST` was set")
os.Exit(1)
}
if os.Getenv("PGUSER") == "" {
fmt.Println("Send error: make sure environment variables `PGUSER` was set")
os.Exit(1)
}
if os.Getenv("PGPASSWORD") == "" {
fmt.Println("Send error: make sure environment variables `PGPASSWORD` was set")
os.Exit(1)
}
if os.Getenv("PGDATABASE") == "" {
fmt.Println("Send error: make sure environment variables `PGDATABASE` was set")
os.Exit(1)
}
}
cfg := opensearch.Config{
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: 60 * time.Second,
DialContext: (&net.Dialer{Timeout: time.Second}).DialContext,
TLSClientConfig: &tls.Config{
MaxVersion: tls.VersionTLS13,
InsecureSkipVerify: true,
},
},
}
es, err := opensearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// fmt.Println(operation)
switch operation {
case "es-cluster-info":
EsClusterInfo(es)
case "es-get-indices":
// fmt.Println(operation)
for _, index := range EsGetIndices(es) {
fmt.Println(index)
}
case "es-indices-discover":
result_discovery_json := EsIndexDiscover(EsGetIndices(es))
// fmt.Println(zabbix_send)
if zabbix_send == true {
ZabbixSender(zabbix_host, "indices", result_discovery_json)
}
fmt.Println(result_discovery_json)
case "es-records-count":
// records count for all indices
// EsIndexRecordsCount(EsGetIndices(es), es_request_time_range)
// result_index := make(map[int]json_records)
// j := int(0)
// for _, es_index_name := range EsGetIndices(es) {
// r := EsSearch(es_index_name, es_request_time_range, es_request_rows_quantity)
// result := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
// result_index[j] = json_records{es_index_name, result}
// log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result)
// if zabbix_send == true {
// ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result))
// ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, es_request_time_range)
// }
// j++
// }
case "es-index-records-count":
// records count for one index
// r := EsSearch(es_index_name, es_request_time_range, es_request_rows_quantity)
// result := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
// if zabbix_send == true {
// ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result))
// ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, es_request_time_range)
// }
// log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result)
case "es-request-data":
var conf Config
conf.sessions_count = 50
conf.country_code = "RU"
var mail Mail_records
mail.check_time = time.Unix(time.Now().Unix(), 0)
last_check_timestamp := mail.check_time
// канал для получания информации от процессов (routine)
c := make(chan int)
var query_auth ESQuery
var query_invalid ESQuery
request_time_range := "now-" + es_request_time_range
s := `{
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gt": "now-5m"
}
}
},
{
"match": {
"mail.command": "Authentication"
}
},
{
"match_phrase": {
"log.file.path": "/opt/zimbra/log/mailbox.log"
}
}
]
}
},
"size": 1000
}`
err = json.Unmarshal([]byte(s), &query_auth)
if err != nil {
fmt.Println("JSON query error:",err)
}
err = json.Unmarshal([]byte(s), &query_invalid)
if err != nil {
fmt.Println("JSON query error:",err)
}
query_auth.Query.Bool.Filter[0] = map[string]interface{}{
"range": map[string]interface{}{
"@timestamp": map[string]string{
"gt": request_time_range,
},
},
}
query_auth.Query.Bool.Filter[1] = map[string]interface{}{
"match": map[string]string{
"mail.command": "authentication",
},
}
query_auth.Query.Bool.Filter[2] = map[string]interface{}{
"match_phrase": map[string]string{
"log.file.path": "/opt/zimbra/log/mailbox.log",
},
}
//
query_auth.Size = es_request_rows_quantity
query_invalid.Query.Bool.Filter[0] = map[string]interface{}{
"range": map[string]interface{}{
"@timestamp": map[string]string{
"gt": request_time_range,
},
},
}
query_invalid.Query.Bool.Filter[2] = map[string]interface{}{
"match_phrase": map[string]string{
"log.file.path": "/opt/zimbra/log/mailbox.log",
},
}
//
query_invalid.Size = es_request_rows_quantity
query_invalid.Query.Bool.Filter[1] = map[string]interface{}{
"match": map[string]string{
"mail.reason" : "*password",
},
}
// Получаем список индексов и делаем список индексов согласно списка шаблонов имени
// заданного с коммандной строки es_index_name_prefix
es_indicies_all := EsGetIndices(es)
var es_indicies_filtered_list []string
for _, finded_index := range es_indicies_all {
// fmt.Println(finded_index)
for _, pattern := range strings.Split(es_index_name, ",") {
if strings.HasPrefix(finded_index, pattern) {
es_indicies_filtered_list = append(es_indicies_filtered_list, finded_index)
fmt.Println(">>",finded_index)
}
}
// maillog_mailbox4_filebeat-2023.10.16
// es_indicies_filtered = strings.TrimPrefix(s, "¡¡¡Hello, ")
}
// запускаем параллельно работы по каждому индексу
for _, index := range es_indicies_filtered_list {
// fmt.Println(index)
var es_index ESIndex
es_index.es_index_name = index
es_index.es_request_time_range = es_request_time_range
es_index.es_request_rows_quantity = es_request_rows_quantity
go doit(mail, es_index, query_auth, c)
go doit(mail, es_index, query_invalid, c)
// doit(mail, es_index, query_auth, c)
// doit(mail, es_index, query_invalid, c)
}
// Читаем записи из канала для каждого индекса и обрабатываем далее
// for _, index := range strings.Split(es_index_name, ",") {
for _, index := range es_indicies_filtered_list {
hits := <-c // Получает значение от канала
log.Println("request for", index, "has finished with", hits, "records")
}
conn, err := pgx.Connect(context.Background(), os.Getenv("DATABASE_URL"))
fmt.Println(*conn)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
}
// Выбираем аномалии согласно конфига
result := PgSearchMailSessionAnomaly(conn, last_check_timestamp, conf)
// --- Типы тревог ----
// 1 "Неразрешенный код страны"
// 2 "Количество сессий больше заданного"
// 3 "Вход в аккаунт более чем с одного IP"
// 4 "Вход более чем в один аккаунт"
// Задачи:
// 1. выбрать текущие сессии с geoip не равным RU (mail_session, alarm_type = 1)
// 2. выбрать текущие сессии с количеством больше заданного (mail_sessions, alarm_type = 2)
// 3. выбрать тревоги за все периоды по каждому ip из текущих сессий
// - входы с одного адреса в несколько аккаунтов (alarm, alarm_type = 4)
// 4. Выбрать тревоги за все периоды по каждому аккаунту из текущих сессий
// - входы в один аккаунт с нескольких адресов (alarm, alarm_type = 3)
// Если тревога для данного адреса или аккаунта заданного типа уже есть
// то просто обновить запись с новой меткой времени.
// При выборке тревог выставить alarm_action = false
if len(result) != 0 {
log.Println("Anomaly finded", result)
for i := range result {
// Смотрим есть ли уже запись для данной аномалии
// если есть то обновляем если нет - добавляем
id := PgSelectAlarm(conn, result[i])
if id == 0 {
log.Println("Alarm insert with id", PgInsertAlarm(conn, result[i], conf))
} else {
log.Println("Alarm type",result[i].alarm_type,"with id", PgUpdateAlarm(conn, result[i], conf, id), "was updated")
}
// Выбираем список тревог для определенного аккаунта и типа тревоги = 1.
// Если возвращается больше 1-й записи то генерим тревогу с типом 3 для этого аккаунта
id1 := PgSelectAlarmAccount(conn, result[i])
log.Println("Alarm records for account id:", result[i].mail_account_id, "found:", id1)
if id1 > 1 {
result[i].alarm_type = 3
id = PgSelectAlarmAccount(conn, result[i])
if id == 0 {
log.Println("Alarm insert with id", PgInsertAlarmAccount(conn, result[i], conf))
} else {
log.Println("Alarm type",result[i].alarm_type,"with id", PgUpdateAlarm(conn, result[i], conf, id), "was updated")
}
}
result[i].alarm_type = 1
id2 := PgSelectAlarmRemoteIP(conn, result[i])
log.Println("PgSelectAlarmRemoteIP with ip:",result[i].remote_ip, id2)
if id2 > 1 {
result[i].alarm_type = 4
id = PgSelectAlarmRemoteIP(conn, result[i])
if id == 0 {
log.Println("Alarm insert with id", PgInsertAlarmRemoteIP(conn, result[i], conf))
} else {
log.Println("Alarm type",result[i].alarm_type,"with id", PgUpdateAlarm(conn, result[i], conf, id), "was updated")
}
}
}
}
defer conn.Close(context.Background())
}
}