zimbra-alarm/zimbra-alarm.go

1108 lines
42 KiB
Go
Raw Normal View History

//------------------------------------------------------------------------------
// Getting and monitoring Elasticsearch indices and send the metrics into zabbix
//-------------------------------------------------------------------------------
// Author: Sergey Kalinin
//-------------------------------------------------------------------------------
// ZABBIX_SERVER="zabbix"
// ZABBIX_PORT="10051"
// ZABBIX_HOST="elastic cluster"
// ELASTICSEARCH_URL="https://user:pass@elastic:9200"
//-------------------------------------------------------------------------------
package main
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"log"
"net"
"net/netip"
"net/http"
"os"
"regexp"
"strconv"
"strings"
"time"
"reflect"
"github.com/adubkov/go-zabbix"
// "github.com/elastic/go-elasticsearch/v7"
// "github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgtype"
)
type json_records struct {
INDEX string
RECORD_COUNT int
}
type Mail_records struct {
id int32
client_name string
client_account string
//remote_ip pgtype.Inet
remote_ip string
country_code string
check_time time.Time
session_time string
mail_reason string
}
type ESIndex struct {
es_index_name string
es_request_time_range string
es_request_rows_quantity int
}
// type ESQuery struct {
// Query struct {
// Bool struct {
// Filter []struct {
// Range struct {
// Timestamp struct {
// Gt string `json:"gt"`
// } `json:"@timestamp"`
// } `json:"range"`
// Match struct {
// MailCommand string `json:"mail.command"`
// } `json:"match"`
// } `json:"filter"`
// } `json:"bool"`
// } `json:"query"`
// Size int `json:"size"`
// }
type ESQuery struct {
Query struct {
Bool struct {
Filter []map[string]interface{} `json:"filter"`
} `json:"bool"`
} `json:"query"`
Size int `json:"size"`
}
type MailSession struct {
id int32
mail_account_id int32
sessions_count int32
country_code string
remote_ip netip.Prefix
last_time string
check_time time.Time
alarm_type int
}
type Config struct {
sessions_count int32
country_code string
}
var Exclude_index_list string
var conn *pgx.Conn
// Определяем входит ли строка в slice
func StringContains(a []string, x string) bool {
for _, n := range a {
if x == n {
return true
}
}
return false
}
func isNil(i interface{}) bool {
if i == nil {
return true
}
switch reflect.TypeOf(i).Kind() {
case reflect.Ptr, reflect.Map, reflect.Array, reflect.Chan, reflect.Slice:
return reflect.ValueOf(i).IsNil()
}
return false
}
// Проверка вхождения имени индекса (префикса) в список исключений
func IndexPrefixExclude(index_name string) bool {
exclude_index_name := strings.Split(Exclude_index_list, ",")
// log.Println(Exclude_index_list)
for _, n := range exclude_index_name {
if strings.HasPrefix(index_name, n) {
return true
}
}
return false
}
func EsSearch(index_name string, query ESQuery) map[string]interface{} {
var (
r map[string]interface{}
)
cfg := opensearch.Config{
// Addresses: []string{
// "http://localhost:9200",
// "http://localhost:9201",
// },
// Username: "foo",
// Password: "bar",
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: 60 * time.Second,
DialContext: (&net.Dialer{Timeout: time.Second}).DialContext,
TLSClientConfig: &tls.Config{
MaxVersion: tls.VersionTLS13,
InsecureSkipVerify: true,
},
},
}
es, err := opensearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
// Check response status
if res.IsError() {
log.Fatalf("Error: %s", res.String())
}
var buf bytes.Buffer
// query := map[string]interface{}{
// "query": map[string]interface{}{
// "range": map[string]interface{}{
// "@timestamp": map[string]interface{}{
// // "gt": "now-1h",
// "gt": request_time_range,
// },
// }
// "match": map[string]interface{}{
// "mail.command": "Auth",
// },
// },
// "size": es_request_rows_quantity,
// "sort": map[string]interface{}{
// "@timestamp": map[string]interface{}{
// "order": "desc",
// },
// },
// }
a, _ := json.Marshal(query)
fmt.Println(string(a))
if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Fatalf("Error encoding query: %s", err)
}
// Perform the search request.
res, err = es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithIndex(index_name+"*"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
} else {
// Print the response status and error information.
log.Fatalf("[%s] %s: %s",
res.Status(),
e["error"].(map[string]interface{})["type"],
e["error"].(map[string]interface{})["reason"],
)
}
}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Printf("Error parsing the response body:", err)
}
// json_data, _ := json.Marshal(r)
// fmt.Println(string(json_data))
return r
}
func EsIndexDiscover(es_index_list []string) string {
result_discovery_json := string("{\"data\": [")
for _, value := range es_index_list {
// result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\",\"{#ITEM_TYPE}\":\"Records count\"" + "},"
result_discovery_json = result_discovery_json + "{\"{#INDEX}\":\"" + value + "\"" + "},"
}
result_discovery_json = strings.TrimRight(result_discovery_json, ",") + "]}"
// fmt.Println(result_discovery_json)
return (result_discovery_json)
}
// func EsIndexRecordsCount(es_index_list []string, es_request_time_range string, es_request_rows_quantity int) {
// // fmt.Println(es_request_time_range)
// result_index := make(map[int]json_records)
// j := int(0)
// for _, value := range es_index_list {
// r := EsSearch(value, es_request_time_range, es_request_rows_quantity)
// result := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
//
// result_index[j] = json_records{value, result}
// j++
// }
// log.Println(result_index)
// }
//
func EsClusterInfo(es *opensearch.Client) {
var (
r map[string]interface{}
)
// 1. Get cluster info
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
// Check response status
if res.IsError() {
log.Fatalf("Error: %s", res.String())
}
// Deserialize the response into a map.
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// Print client and server version numbers.
fmt.Printf("Client: %s\n", opensearch.Version)
fmt.Printf("Client: %s\n", opensearch.Version)
fmt.Printf("Server: %s\n", r["version"].(map[string]interface{})["number"])
}
func EsIndexFieldExists(es *opensearch.Client, index_name string, field_name string) bool {
// Check if the filed exists into index
var (
fields_request map[string]interface{}
fields []string
indices []string
field_exists bool
)
fields = append(fields, field_name)
indices = append(indices, index_name)
req := opensearchapi.FieldCapsRequest{
// Index: []string{},
Index: indices,
Fields: fields,
}
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("ERROR: %s", err)
}
if err := json.NewDecoder(res.Body).Decode(&fields_request); err != nil {
log.Fatalf("->Error parsing the response body: %s", err)
}
for i, f := range fields_request {
if i == "fields" {
// if f.(map[string]interface{})[`@timestamp`] == nil {
if f.(map[string]interface{})[field_name] == nil {
field_exists = false
} else {
field_exists = true
}
}
}
return (field_exists)
}
func EsGetIndices(es *opensearch.Client) []string {
//Get indices list
// log.Println("sjsjsjsj")
var (
index_request []map[string]interface{}
es_index_list []string
)
req := opensearchapi.CatIndicesRequest{
// Index: []string{},
Format: "JSON",
// ExpandWildcards,
}
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("ERROR: %s", err)
}
// log.Println(res)
if err := json.NewDecoder(res.Body).Decode(&index_request); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
for _, hit := range index_request {
m := regexp.MustCompile("-[0-9]")
// позиция вхождения регулярки в имени индекса
i := m.FindStringIndex(hit["index"].(string))
// наименование индекса
index_name := hit["index"].(string)
// log.Println(index_name)
// Checking the @timestamp field exists
// if !EsIndexFieldExists(es, index_name, `@timestamp`) {
// continue
// // fmt.Println(EsIndexFieldExists(es, index_name))
// }
// if strings.HasPrefix(index_name, "apdex") {
// continue
// // fmt.Println(EsIndexFieldExists(es, index_name))
// }
// Итоговое наименование индекса
var index_name_short string
// Исключаем индексы от кибаны (.kibana-*) и обрезаем цифры (даты и версии)
if !IndexPrefixExclude(index_name) {
if i != nil {
index_name_short = strings.TrimSpace(index_name[0:i[0]])
} else {
index_name_short = strings.TrimSpace(index_name)
}
if !StringContains(es_index_list, index_name_short) {
es_index_list = append(es_index_list, index_name_short)
}
} else {
log.Println("Index", index_name, "has name from exclude lists")
}
}
log.Println(es_index_list)
return (es_index_list)
}
func ZabbixSender(zabbix_host string, metric_name string, metric_value string) bool {
var (
metrics []*zabbix.Metric
zabbix_port int
err error
)
zabbix_server := os.Getenv("ZABBIX_SERVER")
// Read and checkin zabbix server and port
if zabbix_server == "" {
log.Println("Login error: make sure environment variables `ZABBIX_SERVER` are defined")
os.Exit(1)
}
if os.Getenv("ZABBIX_PORT") == "" {
log.Println("Login error: make sure environment variables `ZABBIX_PORT` are defined")
os.Exit(1)
}
if zabbix_port, err = strconv.Atoi(os.Getenv("ZABBIX_PORT")); err != nil {
log.Println(zabbix_port, "Zabbix port value error")
}
metrics = append(metrics, zabbix.NewMetric(zabbix_host, metric_name, metric_value, time.Now().Unix()))
metrics = append(metrics, zabbix.NewMetric(zabbix_host, "status", "OK"))
// Create instance of Packet class
packet := zabbix.NewPacket(metrics)
// fmt.Println(zabbix_host, metric_name, metric_value, metrics)
// Send packet to zabbix
z := zabbix.NewSender(zabbix_server, zabbix_port)
res, err := z.Send(packet)
log.Println(string(res))
return true
}
func PgSearch(conn *pgx.Conn, mail_data Mail_records) int32 {
var id int32
var client_name pgtype.Varchar
// var remote_ip pgtype.CIDR
// var country_code pgtype.Varchar
// fmt.Println("Select data from DB:", mail_data.client_name)
err := conn.QueryRow(context.Background(), "select id, client_name from mail_accounts where client_name = $1", mail_data.client_name).Scan(&id, &client_name)
//err = conn.QueryRow(context.Background(), "select id, client_name, remote_ip, country_code from mail_accounts where")
if err != nil {
fmt.Fprintf(os.Stderr, "Query select account failed: %v\n", err)
// id = PgInsertAccount(conn, mail_data)
log.Println("Select", mail_data.client_name, "error:", err)
if err.Error() == "no rows in result set" {
id = 0
} else {
os.Exit(1)
}
}
// fmt.Println(">>>", id, client_name.String)
return id
}
func PgInsertAccount(conn *pgx.Conn, mail_data Mail_records) int32 {
var id int32
// var client_name pgtype.Varchar
// var remote_ip pgtype.CIDR
// var country_code pgtype.Varchar
// fmt.Println("Insert data into DB")
err := conn.QueryRow(context.Background(), "insert into mail_accounts(client_name) values($1) returning id", mail_data.client_name).Scan(&id)
if err != nil {
fmt.Fprintf(os.Stderr, "Query insert account failed: %v\n", err)
}
//fmt.Println(id)
return id
}
func PgSearchMailSession(conn *pgx.Conn, mail_data Mail_records) (int32, int32) {
var id int32
var session_count int32
// var client_name pgtype.Varchar
// var remote_ip pgtype.CIDR
var country_code pgtype.Varchar
// log.Println("Select data from DB:", "select id, sessions_count, country_code from mail_sessions where mail_account_id =", mail_data.id, "and remote_ip =",mail_data.remote_ip, "and last_time =", mail_data.session_time, "and last_check_timestamp =", mail_data.check_time)
err := conn.QueryRow(context.Background(), "select id, sessions_count, country_code from mail_sessions where mail_account_id = $1 and remote_ip = $2 and last_check_timestamp = $3", mail_data.id, mail_data.remote_ip, mail_data.check_time).Scan(&id, &session_count, &country_code)
//err = conn.QueryRow(context.Background(), "select id, client_name, remote_ip, country_code from mail_accounts where")\
// log.Println("Selected session data: ", id, session_count, country_code)
if err != nil {
log.Println("Query session PgSearchMailSession failed:", err)
// id = PgInsertAccount(conn, mail_data)
// log.Println("Select", mail_data.client_name, "error:", err)
if err.Error() == "no rows in result set" {
id = 0
} else {
os.Exit(1)
}
}
// fmt.Println(">>>", id, client_name.String)
return id, session_count
}
// Alarm_type
// 1 "Неразрешенный код страны" "alarm"
// 2 "Количество сессий больше заданного" "alarm"
// 3 "Вход в аккаунт более чем с одного IP" "alarm"
// 4 "Вход более чем в один аккаунт" "alarm"
func PgSearchMailSessionAnomaly(conn *pgx.Conn, last_check_timestamp time.Time, conf Config) []MailSession {
// log.Println("Select data from DB:", "select id, mail_account_id, sessions_count, country_code, remote_ip, last_time from mail_sessions where last_check_timestamp =", last_check_timestamp)
rows, err_select := conn.Query(context.Background(), "select id, mail_account_id, sessions_count, country_code, remote_ip, last_time from mail_sessions where last_check_timestamp = $1", last_check_timestamp)
if err_select != nil {
log.Println("Query sessions failed:", err_select)
os.Exit(1)
}
var res []MailSession
for rows.Next() {
var s MailSession
values, _ := rows.Values()
s.id = values[0].(int32)
s.mail_account_id = values[1].(int32)
s.sessions_count = values[2].(int32)
s.country_code = strings.TrimSpace(values[3].(string))
s.remote_ip = values[4].(netip.Prefix)
s.last_time = values[5].(string)
s.check_time = last_check_timestamp
log.Println(s.id, s.mail_account_id, s.sessions_count, s.country_code, s.remote_ip, s.last_time)
if s.country_code != conf.country_code && s.country_code != "" {
s.alarm_type = 1
log.Println("ALARM!!!", s.mail_account_id, s.sessions_count, s.country_code, s.remote_ip)
res = append(res, s)
}
if s.sessions_count >= conf.sessions_count {
// s.alarm_type = "Количество соединений больше " + string(conf.sessions_count)
s.alarm_type = 2
log.Println("ALARM!!!", s.mail_account_id, s.sessions_count, s.country_code, s.remote_ip)
res = append(res, s)
// alarm_id := PgInsertMailAlarm(conn, s, "Код страны GeoIP не разрешен")
// log.Println("Alarm insert with id:", alarm_id)
}
}
return res
}
func PgSelectAlarm(conn *pgx.Conn, mail_session MailSession) (int32) {
var id int32
// log.Println("Select data from DB:", "select id, sessions_count, country_code from mail_sessions where mail_account_id =", mail_data.id, "and remote_ip =",mail_data.remote_ip, "and last_time =", mail_data.session_time, "and last_check_timestamp =", mail_data.check_time)
err := conn.QueryRow(context.Background(), "select id from alarm where mail_account_id = $1 and remote_ip = $2 and alarm_type = $3", mail_session.mail_account_id, mail_session.remote_ip, mail_session.alarm_type).Scan(&id)
log.Println("Selected alarm data: ", mail_session.mail_account_id, mail_session.remote_ip)
if err != nil {
log.Println("Select alarm with account_id", mail_session.mail_account_id, " and remote ip", mail_session.remote_ip, "error:", err)
if err.Error() == "no rows in result set" {
id = 0
} else {
os.Exit(1)
}
}
return id
}
func PgSelectAlarmAccount(conn *pgx.Conn, mail_session MailSession) int32 {
var s int32
var i int32
// log.Println("Select data from DB:", "select id, sessions_count, country_code from mail_sessions where mail_account_id =", mail_data.id, "and remote_ip =",mail_data.remote_ip, "and last_time =", mail_data.session_time, "and last_check_timestamp =", mail_data.check_time)
query := fmt.Sprintf("select id from alarm where mail_account_id = %v and alarm_type = %v", mail_session.mail_account_id, mail_session.alarm_type)
log.Println("PgSelectAlarmAccount execute query:", query)
rows, err_select := conn.Query(context.Background(), query)
if err_select != nil {
log.Println("Query alarms PgSelectAlarmAccount failed:", err_select)
os.Exit(1)
}
if len(rows.FieldDescriptions()) != 0 {
i = 0
var id int32
for rows.Next() {
values, _ := rows.Values()
id = values[0].(int32)
log.Println("\t found records with id:", id)
i++
}
// если выбрана только одна запись возвращаем ее id,
// если несколько - то возвращаем количество записей
if i == 1 {
s = id
} else {
s = i
}
} else {
s = 0
}
return s
}
// Выборка записей по конкретному IP
// запросы разные в зависимости от типа тревоги
func PgSelectAlarmRemoteIP(conn *pgx.Conn, mail_session MailSession) (int32) {
var s int32
var query string
// В зависимости от типа тревоги функция возвращает либо количество записей либо ID записи
switch mail_session.alarm_type {
case 1:
// По уму тут можно упростить добавив в запрос count() хотя не факт что прокатит
query = fmt.Sprintf("select mail_account_id, remote_ip, alarm_type from alarm where remote_ip = '%v' and alarm_type = %v group by mail_account_id, remote_ip, alarm_type", mail_session.remote_ip, mail_session.alarm_type)
log.Println("PgSelectAlarmRemoteIP execute query:",query)
rows, err_select := conn.Query(context.Background(), query)
if err_select != nil {
log.Println("Query alarms PgSelectAlarmRemoteIP failed:", err_select)
os.Exit(1)
}
if len(rows.FieldDescriptions()) != 0 {
var i, id int32
i = 0
for rows.Next() {
values, _ := rows.Values()
id = values[0].(int32)
log.Println("\t found records with id:", id)
i++
}
s = i
} else {
s = 0
}
case 4:
query = fmt.Sprintf("select id from alarm where remote_ip = '%v' and alarm_type = %v", mail_session.remote_ip, mail_session.alarm_type)
log.Println(query)
var id int32
err := conn.QueryRow(context.Background(), query).Scan(&id)
if err != nil {
log.Println("Query alarms PgSelectAlarmRemoteIP failed:", mail_session.remote_ip, "error:", err)
if err.Error() == "no rows in result set" {
id = 0
} else {
os.Exit(1)
}
} else {
s = id
}
}
return s
}
func PgInsertAlarm(conn *pgx.Conn, mail_data MailSession, conf Config) int32 {
var id int32
log.Println("Insert data", mail_data, "into alarm DB")
err := conn.QueryRow(context.Background(), "insert into alarm(mail_account_id, remote_ip, country_code, last_time, last_check_timestamp, alarm_type, alarm_action) values($1, $2, $3, $4, $5, $6, false) returning id", mail_data.mail_account_id, mail_data.remote_ip, mail_data.country_code, mail_data.last_time, mail_data.check_time, mail_data.alarm_type).Scan(&id)
if err != nil {
log.Printf("Insert alarm data failed:", err)
}
return id
}
func PgInsertAlarmRemoteIP(conn *pgx.Conn, mail_data MailSession, conf Config) int32 {
var id int32
log.Println("Insert data", mail_data, "into alarm DB")
err := conn.QueryRow(context.Background(), "insert into alarm(remote_ip, country_code, last_time, last_check_timestamp, alarm_type, alarm_action) values($1, $2, $3, $4, $5, false) returning id", mail_data.remote_ip, mail_data.country_code, mail_data.last_time, mail_data.check_time, mail_data.alarm_type).Scan(&id)
if err != nil {
log.Printf("Insert alarm data failed:", err)
}
return id
}
func PgInsertAlarmAccount(conn *pgx.Conn, mail_data MailSession, conf Config) int32 {
var id int32
log.Println("Insert data", mail_data, "into alarm DB")
err := conn.QueryRow(context.Background(), "insert into alarm(mail_account_id, last_time, last_check_timestamp, alarm_type, alarm_action) values($1, $2, $3, $4, false) returning id", mail_data.mail_account_id, mail_data.last_time, mail_data.check_time, mail_data.alarm_type).Scan(&id)
if err != nil {
log.Printf("Insert alarm data failed:", err)
}
return id
}
func PgUpdateAlarm(conn *pgx.Conn, mail_session MailSession, conf Config, id int32) int32 {
log.Println("Update alarm data", mail_session)
err := conn.QueryRow(context.Background(), "update alarm set last_time=$1, last_check_timestamp=$2 where id=$3 returning id", mail_session.last_time, mail_session.check_time, id).Scan(&id)
if err != nil {
log.Printf("Update alarm data failed: %v\n", err)
}
return id
}
func PgInsertMailSession(conn *pgx.Conn, mail_data Mail_records) int32 {
var id int32
sessions_count := 1
// log.Println("Insert data", mail_data, "into session DB")
err := conn.QueryRow(context.Background(), "insert into mail_sessions(mail_account_id, remote_ip, sessions_count, last_time, country_code, last_check_timestamp, mail_reason) values($1, $2, $3, $4, $5, $6, $7) returning id", mail_data.id, mail_data.remote_ip, sessions_count, mail_data.session_time, mail_data.country_code, mail_data.check_time, mail_data.mail_reason).Scan(&id)
if err != nil {
log.Printf("Insert session data failed:", err)
}
return id
}
func PgUpdateMailSession(conn *pgx.Conn, mail_data Mail_records, session_id int32, sessions_count int32) int32 {
var id int32
sessions_count++
// log.Println("Update data", mail_data, "into session DB")
err := conn.QueryRow(context.Background(), "update mail_sessions set sessions_count=$1, last_time=$2, last_check_timestamp=$4, mail_reason=$5 where id=$3 returning id", sessions_count, mail_data.session_time, session_id, mail_data.check_time, mail_data.mail_reason).Scan(&id)
if err != nil {
log.Printf("Update session data failed: %v\n", err)
}
return id
}
func doit(mail Mail_records, es_index ESIndex, query ESQuery, c chan int) int {
// Запрос данных из соответствующего индекса и выборка полей mail.{} и geoip.{}
fmt.Println(es_index)
fmt.Println(es_index.es_index_name)
r := EsSearch(es_index.es_index_name, query)
result := r["hits"].(map[string]interface{})["hits"].([]interface{})
conn, err := pgx.Connect(context.Background(), os.Getenv("DATABASE_URL"))
// fmt.Println(*conn)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
}
for i := range result {
mail_res := result[i].(map[string]interface{})["_source"].(map[string]interface{})["mail"]
geoip :=result[i].(map[string]interface{})["_source"].(map[string]interface{})["geoip"]
if !isNil(mail_res) {
//fmt.Println(">>>>>>", mail.(map[string]interface{}))
// fmt.Println(">>>>>>", mail_res.(map[string]interface{})["client_account"])
if !isNil(mail_res.(map[string]interface{})["client_account"]) {
mail.client_account = mail_res.(map[string]interface{})["client_account"].(string)
}
// fmt.Println("", mail_res.(map[string]interface{})["client_name"])
if !isNil(mail_res.(map[string]interface{})["client_name"]) {
mail.client_name = mail_res.(map[string]interface{})["client_name"].(string)
}
// fmt.Println("", mail_res.(map[string]interface{})["remote_ip"])
if !isNil( mail_res.(map[string]interface{})["remote_ip"]) {
mail.remote_ip = mail_res.(map[string]interface{})["remote_ip"].(string)
}
if !isNil( mail_res.(map[string]interface{})["log_datetime"]) {
datetime := mail_res.(map[string]interface{})["log_datetime"].(string)
// tmpl := "2022-08-18 16:17:24,446"
mail.session_time = datetime
// log.Println(">>>>>>>>>>>>>>>>", mail.session_time, datetime)
// 2022-08-18 16:17:24,446
}
if !isNil(mail_res.(map[string]interface{})["reason"]) {
mail.mail_reason = mail_res.(map[string]interface{})["reason"].(string)
} }
if !isNil(geoip) {
// fmt.Println("", geoip.(map[string]interface{})["country_iso_code"].(string))
mail.country_code = geoip.(map[string]interface{})["country_iso_code"].(string)
}
if mail.client_name == "" && mail.client_account != "" {
mail.client_name = mail.client_account
}
if (mail.client_name != "") && mail.remote_ip != "" {
log.Println(mail.client_name, mail.remote_ip, mail.country_code, mail.mail_reason)
// fmt.Println(mail.client_name, mail.client_account, mail.remote_ip, mail.country_code)
res := PgSearch(conn, mail)
if res == 0 {
mail.id = PgInsertAccount(conn, mail)
} else {
mail.id = res
}
// log.Println("Insert account", mail.client_name, mail.client_account," sucess with id", mail.id)
id, session_count := PgSearchMailSession(conn, mail)
if id == 0 {
PgInsertMailSession(conn, mail)
} else {
PgUpdateMailSession(conn, mail, id, session_count)
}
}
mail.client_account = ""
mail.client_name = ""
mail.remote_ip = ""
mail.country_code = ""
mail_res = nil
}
defer conn.Close(context.Background())
// Вывод статуса, количество выбранных записей и времени запроса
hits := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
log.Println("ES index name:", es_index.es_index_name, "; Query time period:", es_index.es_request_time_range, "; Records count:", hits)
c <- hits
return hits
}
//---------------------------------------------------------------------------
func main() {
var (
operation string
es_index_name string
es_request_time_range string
es_request_rows_quantity int
zabbix_send bool
pg_insert bool
// zabbix_port int
)
flag.StringVar(&es_index_name, "indexname", "", "Elasticsearch index name patterns by comma separated, (like \"filebeat\")")
flag.StringVar(&es_request_time_range, "timerange", "1h", "Elasticsearch time range for records count (like 1m, 10h, e.t.c.)")
flag.IntVar(&es_request_rows_quantity, "request-rows", 1, "How many rows will return")
flag.BoolVar(&zabbix_send, "zabbix-send", false, "Send metrics or discovery data into zabbix")
flag.BoolVar(&pg_insert, "pg-insert", false, "Write data into postgres dsatabase")
flag.StringVar(&Exclude_index_list, "exclude-index", ".", "Elasticsearch index name comma-separated list for exluded, (like \"filebeat,mailindex,e.t.c\")")
flag.StringVar(&operation, "operation", "es-cluster-info", "Opertation type, must be:\n\tes-cluster-info - ES Cluster information (version e.t.c)\n\tes-get-indices - geting all index\n\tes-indices-discover - getting es index pattern list\n\tes-records-count - getting the number of records for a time range for all index pattern\n\tes-index-records-count - getting records count for one index (used with -indexname option\n\tes-request-data - select data from index")
flag.Parse()
fmt.Println(zabbix_send)
zabbix_host := os.Getenv("ZABBIX_HOST")
if zabbix_host == "" && zabbix_send == true {
fmt.Println("Send error: make sure environment variables `ZABBIX_HOST` was set")
os.Exit(1)
}
if os.Getenv("ELASTICSEARCH_URL") == "" {
fmt.Println("Send error: make sure environment variables `ELASTICSEARCH_URL` was set")
os.Exit(1)
}
// if es_request_time_range > 24 {
// log.Println("Error: Time range must be a number between 1 and 24")
// os.Exit(1)
// }
if pg_insert {
if os.Getenv("PGHOST") == "" {
fmt.Println("Send error: make sure environment variables `PGHOST` was set")
os.Exit(1)
}
if os.Getenv("PGUSER") == "" {
fmt.Println("Send error: make sure environment variables `PGUSER` was set")
os.Exit(1)
}
if os.Getenv("PGPASSWORD") == "" {
fmt.Println("Send error: make sure environment variables `PGPASSWORD` was set")
os.Exit(1)
}
if os.Getenv("PGDATABASE") == "" {
fmt.Println("Send error: make sure environment variables `PGDATABASE` was set")
os.Exit(1)
}
}
cfg := opensearch.Config{
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: 60 * time.Second,
DialContext: (&net.Dialer{Timeout: time.Second}).DialContext,
TLSClientConfig: &tls.Config{
MaxVersion: tls.VersionTLS13,
InsecureSkipVerify: true,
},
},
}
es, err := opensearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// fmt.Println(operation)
switch operation {
case "es-cluster-info":
EsClusterInfo(es)
case "es-get-indices":
// fmt.Println(operation)
for _, index := range EsGetIndices(es) {
fmt.Println(index)
}
case "es-indices-discover":
result_discovery_json := EsIndexDiscover(EsGetIndices(es))
// fmt.Println(zabbix_send)
if zabbix_send == true {
ZabbixSender(zabbix_host, "indices", result_discovery_json)
}
fmt.Println(result_discovery_json)
case "es-records-count":
// records count for all indices
// EsIndexRecordsCount(EsGetIndices(es), es_request_time_range)
// result_index := make(map[int]json_records)
// j := int(0)
// for _, es_index_name := range EsGetIndices(es) {
// r := EsSearch(es_index_name, es_request_time_range, es_request_rows_quantity)
// result := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
// result_index[j] = json_records{es_index_name, result}
// log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result)
// if zabbix_send == true {
// ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result))
// ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, es_request_time_range)
// }
// j++
// }
case "es-index-records-count":
// records count for one index
// r := EsSearch(es_index_name, es_request_time_range, es_request_rows_quantity)
// result := int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
// if zabbix_send == true {
// ZabbixSender(zabbix_host, `indices.records_count[`+es_index_name+`]`, strconv.Itoa(result))
// ZabbixSender(zabbix_host, `indices.records_count_time_range[`+es_index_name+`]`, es_request_time_range)
// }
// log.Println("ES index name:", es_index_name, "; Query time period:", es_request_time_range, "; Records count:", result)
case "es-request-data":
var conf Config
conf.sessions_count = 50
conf.country_code = "RU"
var mail Mail_records
mail.check_time = time.Unix(time.Now().Unix(), 0)
last_check_timestamp := mail.check_time
// канал для получания информации от процессов (routine)
c := make(chan int)
var query_auth ESQuery
var query_invalid ESQuery
request_time_range := "now-" + es_request_time_range
s := `{
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gt": "now-5m"
}
}
},
{
"match": {
"mail.command": "Authentication"
}
},
{
"match_phrase": {
"log.file.path": "/opt/zimbra/log/mailbox.log"
}
}
]
}
},
"size": 1000
}`
err = json.Unmarshal([]byte(s), &query_auth)
if err != nil {
fmt.Println("JSON query error:",err)
}
err = json.Unmarshal([]byte(s), &query_invalid)
if err != nil {
fmt.Println("JSON query error:",err)
}
query_auth.Query.Bool.Filter[0] = map[string]interface{}{
"range": map[string]interface{}{
"@timestamp": map[string]string{
"gt": request_time_range,
},
},
}
query_auth.Query.Bool.Filter[1] = map[string]interface{}{
"match": map[string]string{
"mail.command": "authentication",
},
}
query_auth.Query.Bool.Filter[2] = map[string]interface{}{
"match_phrase": map[string]string{
"log.file.path": "/opt/zimbra/log/mailbox.log",
},
}
//
query_auth.Size = es_request_rows_quantity
query_invalid.Query.Bool.Filter[0] = map[string]interface{}{
"range": map[string]interface{}{
"@timestamp": map[string]string{
"gt": request_time_range,
},
},
}
query_invalid.Query.Bool.Filter[2] = map[string]interface{}{
"match_phrase": map[string]string{
"log.file.path": "/opt/zimbra/log/mailbox.log",
},
}
//
query_invalid.Size = es_request_rows_quantity
query_invalid.Query.Bool.Filter[1] = map[string]interface{}{
"match": map[string]string{
"mail.reason" : "*password",
},
}
// Получаем список индексов и делаем список индексов согласно списка шаблонов имени
// заданного с коммандной строки es_index_name_prefix
es_indicies_all := EsGetIndices(es)
var es_indicies_filtered_list []string
for _, finded_index := range es_indicies_all {
// fmt.Println(finded_index)
for _, pattern := range strings.Split(es_index_name, ",") {
if strings.HasPrefix(finded_index, pattern) {
es_indicies_filtered_list = append(es_indicies_filtered_list, finded_index)
fmt.Println(">>",finded_index)
}
}
// maillog_mailbox4_filebeat-2023.10.16
// es_indicies_filtered = strings.TrimPrefix(s, "¡¡¡Hello, ")
}
// запускаем параллельно работы по каждому индексу
for _, index := range es_indicies_filtered_list {
// fmt.Println(index)
var es_index ESIndex
es_index.es_index_name = index
es_index.es_request_time_range = es_request_time_range
es_index.es_request_rows_quantity = es_request_rows_quantity
go doit(mail, es_index, query_auth, c)
go doit(mail, es_index, query_invalid, c)
// doit(mail, es_index, query_auth, c)
// doit(mail, es_index, query_invalid, c)
}
// Читаем записи из канала для каждого индекса и обрабатываем далее
// for _, index := range strings.Split(es_index_name, ",") {
for _, index := range es_indicies_filtered_list {
hits := <-c // Получает значение от канала
log.Println("request for", index, "has finished with", hits, "records")
}
conn, err := pgx.Connect(context.Background(), os.Getenv("DATABASE_URL"))
fmt.Println(*conn)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
}
// Выбираем аномалии согласно конфига
result := PgSearchMailSessionAnomaly(conn, last_check_timestamp, conf)
// --- Типы тревог ----
// 1 "Неразрешенный код страны"
// 2 "Количество сессий больше заданного"
// 3 "Вход в аккаунт более чем с одного IP"
// 4 "Вход более чем в один аккаунт"
// Задачи:
// 1. выбрать текущие сессии с geoip не равным RU (mail_session, alarm_type = 1)
// 2. выбрать текущие сессии с количеством больше заданного (mail_sessions, alarm_type = 2)
// 3. выбрать тревоги за все периоды по каждому ip из текущих сессий
// - входы с одного адреса в несколько аккаунтов (alarm, alarm_type = 4)
// 4. Выбрать тревоги за все периоды по каждому аккаунту из текущих сессий
// - входы в один аккаунт с нескольких адресов (alarm, alarm_type = 3)
// Если тревога для данного адреса или аккаунта заданного типа уже есть
// то просто обновить запись с новой меткой времени.
// При выборке тревог выставить alarm_action = false
if len(result) != 0 {
log.Println("Anomaly finded", result)
for i := range result {
// Смотрим есть ли уже запись для данной аномалии
// если есть то обновляем если нет - добавляем
id := PgSelectAlarm(conn, result[i])
if id == 0 {
log.Println("Alarm insert with id", PgInsertAlarm(conn, result[i], conf))
} else {
log.Println("Alarm type",result[i].alarm_type,"with id", PgUpdateAlarm(conn, result[i], conf, id), "was updated")
}
// Выбираем список тревог для определенного аккаунта и типа тревоги = 1.
// Если возвращается больше 1-й записи то генерим тревогу с типом 3 для этого аккаунта
id1 := PgSelectAlarmAccount(conn, result[i])
log.Println("Alarm records for account id:", result[i].mail_account_id, "found:", id1)
if id1 > 1 {
result[i].alarm_type = 3
id = PgSelectAlarmAccount(conn, result[i])
if id == 0 {
log.Println("Alarm insert with id", PgInsertAlarmAccount(conn, result[i], conf))
} else {
log.Println("Alarm type",result[i].alarm_type,"with id", PgUpdateAlarm(conn, result[i], conf, id), "was updated")
}
}
result[i].alarm_type = 1
id2 := PgSelectAlarmRemoteIP(conn, result[i])
log.Println("PgSelectAlarmRemoteIP with ip:",result[i].remote_ip, id2)
if id2 > 1 {
result[i].alarm_type = 4
id = PgSelectAlarmRemoteIP(conn, result[i])
if id == 0 {
log.Println("Alarm insert with id", PgInsertAlarmRemoteIP(conn, result[i], conf))
} else {
log.Println("Alarm type",result[i].alarm_type,"with id", PgUpdateAlarm(conn, result[i], conf, id), "was updated")
}
}
}
}
defer conn.Close(context.Background())
}
}