Реализована пакетная отправка данных в эластики соответствующие настройки. Произведена чистка кода

This commit is contained in:
Калинин Сергей Валерьевич 2024-10-28 16:28:37 +03:00
parent 44c6b59a40
commit 595095cfc0

View File

@ -1,27 +1,12 @@
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------
// Программа для анализа и записи журналов 1С // Программа для анализа и записи журналов 1С
// Версия: 1.0.0 // Версия: 1.2.0
// Автор: Сергей Калинин // Автор: Сергей Калинин, svk@nuk-svk.ru
// https://git.nuk-svk.ru/svk/1c-logprocessor
// Лицензия: GPLv3
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------
// Использование log-processor: // Использование log-processor:
// -dict-file string // см. README.md
// Файл со словарём (DICT_FILE) (default "1Cv8.lgf")
// -dir-in string
// Каталог для исходных файлов (DIR_IN) (default "in")
// -dir-log string
// Каталог для лога работы (DIR_LOG) (default "log")
// -dir-out string
// Каталог для обработанных файлов (DIR_OUT) (default "out")
// -dir-temp string
// Каталог для временных файлов (TEMP) (default "/tmp")
// -log-file-ext string
// Расширение файлов с журналами (обрабатываемых логов) (LOG_FILE_EXT) (default ".lgp")
// -object-types string
// Список типов объектов для выборки разделённый запятой (OBJECT_TYPES) (default "1,2,3,4,5,6,7,8")
// -out-format string
// Выводить данные в формате JSON (OUT_FORMAT) (default "json")
// -send-to-es
// Отправлять данные в ElasticSearch (SEND_TO_ES)
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// ELASTICSEARCH_URL="https://user:pass@elastic:9200" // ELASTICSEARCH_URL="https://user:pass@elastic:9200"
@ -38,22 +23,27 @@ import (
"time" "time"
"io" "io"
// "runtime" // "runtime"
"path/filepath" "path/filepath"
"regexp" "regexp"
"strings" "strings"
"strconv" "strconv"
"encoding/json" "encoding/json"
// "github.com/hpcloud/tail" // "github.com/hpcloud/tail"
"net" "net"
"sort" "sort"
"crypto/tls" "crypto/tls"
"net/http" "net/http"
"context" "context"
"gopkg.in/ini.v1" "gopkg.in/ini.v1"
opensearch "github.com/opensearch-project/opensearch-go/v2" opensearch "github.com/opensearch-project/opensearch-go/v2"
opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"github.com/kardianos/service" "github.com/kardianos/service"
) )
const (
Version = "1.2.0"
)
var ( var (
DirCfg string DirCfg string
FileCfg string FileCfg string
@ -91,6 +81,7 @@ var (
InFile string InFile string
WorkLogOut string WorkLogOut string
ServiceName string ServiceName string
// PrintVersion bool
) )
type Config struct { type Config struct {
@ -108,8 +99,8 @@ type Config struct {
EsPassword string `ini:"esPassword"` EsPassword string `ini:"esPassword"`
EsIndexPrefix string `ini:"esIndexPrefix"` EsIndexPrefix string `ini:"esIndexPrefix"`
EsSkipVerify bool `ini:"esSkipVerify"` EsSkipVerify bool `ini:"esSkipVerify"`
// EsBulk bool `ini:"esBulk"` EsBulk bool `ini:"esBulk"`
// EsBulkRecordsQuantity int `ini:"esBulkQuantity"` EsBulkRecordsQuantity int `ini:"esBulkQuantity"`
OutFormat string `ini:"outFormat"` OutFormat string `ini:"outFormat"`
WriteOutFile bool `ini:"writeOutFile"` WriteOutFile bool `ini:"writeOutFile"`
TimeZoneOffset string `ini:"timeZoneOffset"` TimeZoneOffset string `ini:"timeZoneOffset"`
@ -251,6 +242,10 @@ esPassword=user_password
esSkipVerify=false esSkipVerify=false
; Перфикс индекса в ElasticSearch ; Перфикс индекса в ElasticSearch
esIndexPrefix=test_log esIndexPrefix=test_log
; Пакетная вставка данных
esBulk=true
; Количество записей в одном "пакете"
esBulkRecordsQuantity=10
[Processing] [Processing]
; Включение вывода обработанной информации в файл ; Включение вывода обработанной информации в файл
@ -269,25 +264,25 @@ excludeEvent=
// Список файлов в каталоге // Список файлов в каталоге
func getDirectoryContents(dir string) []string { func getDirectoryContents(dir string) []string {
var names []string var names []string
entries, err := os.ReadDir(dir); entries, err := os.ReadDir(dir);
if err != nil { if err != nil {
log.Println("Ошибка получения содержимого каталога: ", dir, err); log.Println("Ошибка получения содержимого каталога: ", dir, err);
} else { } else {
if Debug { if Debug {
log.Println("Get the directory content:", dir) log.Println("Get the directory content:", dir)
} }
} }
//iterate through the directory entities and print out their name. //iterate through the directory entities and print out their name.
for _, entry := range(entries) { for _, entry := range(entries) {
if filepath.Ext(entry.Name()) == LogFileExtention { if filepath.Ext(entry.Name()) == LogFileExtention {
// fmt.Println(entry.Name()); // fmt.Println(entry.Name());
if Debug { if Debug {
log.Println("Finded file:", entry.Name()) log.Println("Finded file:", entry.Name())
} }
names = append(names, entry.Name()) names = append(names, entry.Name())
} }
} }
return names return names
} }
func getLastModifyFile(dir string) []string { func getLastModifyFile(dir string) []string {
@ -354,7 +349,7 @@ func getLastModifyFile(dir string) []string {
} else { } else {
if Debug { if Debug {
log.Println("Finding file was not regular:", fi) log.Println("Finding file was not regular:", fi)
} }
} }
} }
} }
@ -420,11 +415,11 @@ func readDictFile(fileName string) map[int]map[int]string {
l string l string
res map[int]map[int]string res map[int]map[int]string
) )
res = make(map[int]map[int]string) res = make(map[int]map[int]string)
objectTypesList := strings.Split(ObjectTypes, ",") objectTypesList := strings.Split(ObjectTypes, ",")
f, err := os.Open(fileName) f, err := os.Open(fileName)
if err != nil { if err != nil {
log.Printf("error opening file: %v\n",err) log.Printf("error opening file: %v\n",err)
@ -436,7 +431,7 @@ func readDictFile(fileName string) map[int]map[int]string {
} }
r := bufio.NewReader(f) r := bufio.NewReader(f)
s, e := Readln(r) s, e := Readln(r)
// регулярное выражение для получения данных // регулярное выражение для получения данных
regexpSplitData := regexp.MustCompile(`\{(?P<objType>\d+),(.*,|)(?P<objData>.+),(?P<objNumber>\d+)\},$`) regexpSplitData := regexp.MustCompile(`\{(?P<objType>\d+),(.*,|)(?P<objData>.+),(?P<objNumber>\d+)\},$`)
// регулярное выражение определения однострочной записи // регулярное выражение определения однострочной записи
@ -444,16 +439,16 @@ func readDictFile(fileName string) map[int]map[int]string {
// регулярное выражение для многострочных записей соответстве нно начало строки и конец // регулярное выражение для многострочных записей соответстве нно начало строки и конец
// regexpMultilineBegin := regexp.MustCompile(`^.+(\},)$`) // regexpMultilineBegin := regexp.MustCompile(`^.+(\},)$`)
regexpMultilineEnd := regexp.MustCompile(`^.+},$`) regexpMultilineEnd := regexp.MustCompile(`^.+},$`)
for e == nil { for e == nil {
// log.Println(s) // log.Println(s)
s,e = Readln(r) s,e = Readln(r)
l = strings.TrimSpace(s) l = strings.TrimSpace(s)
// strings.TrimRight(outLine, ",") // strings.TrimRight(outLine, ",")
// regexp.MustCompile("[*,%_]{1}").Split(refString, -1) // regexp.MustCompile("[*,%_]{1}").Split(refString, -1)
// Обработка многострочных записей // Обработка многострочных записей
matchedString := regexpString.MatchString(l) matchedString := regexpString.MatchString(l)
// matchedBegin := regexpMultilineBegin.MatchString(l) // matchedBegin := regexpMultilineBegin.MatchString(l)
@ -471,10 +466,10 @@ func readDictFile(fileName string) map[int]map[int]string {
} }
if implContains(objectTypesList, result["objType"]) == false { if implContains(objectTypesList, result["objType"]) == false {
continue continue
} }
objTypeInt, _ := strconv.Atoi(result["objType"]) objTypeInt, _ := strconv.Atoi(result["objType"])
objNumberInt, _ := strconv.Atoi(result["objNumber"]) objNumberInt, _ := strconv.Atoi(result["objNumber"])
if res[objTypeInt] == nil { if res[objTypeInt] == nil {
res[objTypeInt] = make(map[int]string) res[objTypeInt] = make(map[int]string)
} }
res[objTypeInt][objNumberInt] = strings.Trim(result["objData"], "{}\"") res[objTypeInt][objNumberInt] = strings.Trim(result["objData"], "{}\"")
@ -486,11 +481,11 @@ func readDictFile(fileName string) map[int]map[int]string {
recBegin = true recBegin = true
// log.Println("multilineline begin") // log.Println("multilineline begin")
} }
if recBegin { if recBegin {
outLine = outLine + l outLine = outLine + l
} }
if recBegin && matchedEnd { if recBegin && matchedEnd {
recBegin = false recBegin = false
s = outLine s = outLine
@ -506,10 +501,10 @@ func readDictFile(fileName string) map[int]map[int]string {
} }
if implContains(objectTypesList, result["objType"]) == false { if implContains(objectTypesList, result["objType"]) == false {
continue continue
} }
objTypeInt, _ := strconv.Atoi(result["objType"]) objTypeInt, _ := strconv.Atoi(result["objType"])
objNumberInt, _ := strconv.Atoi(result["objNumber"]) objNumberInt, _ := strconv.Atoi(result["objNumber"])
if res[objTypeInt] == nil { if res[objTypeInt] == nil {
res[objTypeInt] = make(map[int]string) res[objTypeInt] = make(map[int]string)
} }
res[objTypeInt][objNumberInt] = strings.Trim(result["objData"], "{}") res[objTypeInt][objNumberInt] = strings.Trim(result["objData"], "{}")
@ -541,10 +536,10 @@ func createOutFile(fileName string) *os.File {
} }
} }
// defer fOut.Close() // defer fOut.Close()
fOutInfo, _ := os.Stat(fileOutPath) fOutInfo, _ := os.Stat(fileOutPath)
fOutSize := fOutInfo.Size() fOutSize := fOutInfo.Size()
_, err = fOut.Seek(fOutSize, 0) _, err = fOut.Seek(fOutSize, 0)
if err != nil { if err != nil {
log.Printf("Seek error on %s: %s", fileOutPath, err) log.Printf("Seek error on %s: %s", fileOutPath, err)
@ -562,7 +557,7 @@ func writeOutCSVFile(outChannel *os.File, m map[string]string) {
keys = append(keys, k) keys = append(keys, k)
} }
sort.Strings(keys) sort.Strings(keys)
for _, k := range keys { for _, k := range keys {
// strings.Join(m[k], ";") // strings.Join(m[k], ";")
result = result + m[k] result = result + m[k]
@ -606,12 +601,12 @@ func writeOutJSONFile(outChannel *os.File, line map[string]string) {
return return
} }
a_json, err := json.MarshalIndent(line, "", " ") a_json, err := json.MarshalIndent(line, "", " ")
if err != nil { if err != nil {
log.Printf("Error encoding: %s", line, "with error %s", err) log.Printf("Error encoding: %s", line, "with error %s", err)
} }
if err != nil{ if err != nil{
log.Printf("Error encoding query: %s", err) log.Printf("Error encoding query: %s", err)
} }
outChannel.Write(a_json) outChannel.Write(a_json)
} }
@ -661,14 +656,14 @@ func getDuration(transactionTime int, recordTimeStamp string) string {
} }
beginTime := time.Unix(int64(transactionTime), 0) beginTime := time.Unix(int64(transactionTime), 0)
zone, _ := beginTime.Zone() zone, _ := beginTime.Zone()
// fmt.Println("The second time is", beginTime, zone, offset) // fmt.Println("The second time is", beginTime, zone, offset)
t := fmt.Sprintf("%s-%s-%s %s:%s:%s %s", recordTimeStamp[0:4], recordTimeStamp[4:6], recordTimeStamp[6:8], recordTimeStamp[8:10], recordTimeStamp[10:12], recordTimeStamp[12:14], zone) t := fmt.Sprintf("%s-%s-%s %s:%s:%s %s", recordTimeStamp[0:4], recordTimeStamp[4:6], recordTimeStamp[6:8], recordTimeStamp[8:10], recordTimeStamp[10:12], recordTimeStamp[12:14], zone)
endTime, err := time.Parse("2006-01-02 15:04:05 MST", t) endTime, err := time.Parse("2006-01-02 15:04:05 MST", t)
if err != nil { if err != nil {
log.Println("Ошибка преобразования времени 'func getDuration()':", endTime, err) log.Println("Ошибка преобразования времени 'func getDuration()':", endTime, err)
} }
duration := endTime.Unix() - int64(transactionTime) duration := endTime.Unix() - int64(transactionTime)
// fmt.Println(transactionTime, recordTimeStamp, t, duration) // fmt.Println(transactionTime, recordTimeStamp, t, duration)
return strconv.Itoa(int(duration)) return strconv.Itoa(int(duration))
@ -805,7 +800,7 @@ func parseString (outLine string, logOffset int64) map[string]string {
} }
return nil return nil
} }
} }
} }
case "Metadata": case "Metadata":
resultData = getDataFromDict(5, findedData[i]) resultData = getDataFromDict(5, findedData[i])
@ -857,13 +852,13 @@ func parseString (outLine string, logOffset int64) map[string]string {
// Проверка вхождения строки в срез (массив строк) // Проверка вхождения строки в срез (массив строк)
func implContains(sl []string, name string) bool { func implContains(sl []string, name string) bool {
// iterate over the array and compare given string to each element // iterate over the array and compare given string to each element
for _, value := range sl { for _, value := range sl {
if value == name { if value == name {
return true return true
} }
} }
return false return false
} }
// Запуск чтения файлов // Запуск чтения файлов
@ -871,6 +866,11 @@ func runOperations () {
var client *opensearch.Client var client *opensearch.Client
if InFile != "" { if InFile != "" {
if SendToEs {
client = esConnect()
EsIndexName = EsIndexPrefix + "-" + getDate(InFile)
esCreateIndex(EsIndexName, client)
}
tail(client, InFile, os.Stdout) tail(client, InFile, os.Stdout)
return return
} }
@ -936,45 +936,46 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) {
recBegin = false recBegin = false
outLine string outLine string
l string l string
// bulkCount int bulkCount int
// arrRecords []map[string]string arrRecords []map[string]string
fOut *os.File fOut *os.File
indexName string
// matchedEnd bool // matchedEnd bool
) )
// bulkCount = 0 // bulkCount = 0
filesList := getDirectoryContents(DirIn) filesList := getDirectoryContents(DirIn)
filePath := filepath.Join(DirIn, fileName) filePath := filepath.Join(DirIn, fileName)
// Открываем временный файл и определяем позицию указателя в исходном файле // Открываем временный файл и определяем позицию указателя в исходном файле
tempFile, offsetPossition := createTempFile(fileName) tempFile, offsetPossition := createTempFile(fileName)
// Определяем размер файла и сравниваем с позицией при открытии // Определяем размер файла и сравниваем с позицией при открытии
// если равны значит файл открылся на конце // если равны значит файл открылся на конце
fInfo, _ := os.Stat(filePath) fInfo, _ := os.Stat(filePath)
fSize := fInfo.Size() fSize := fInfo.Size()
log.Println(filePath, "size:", fSize) log.Println(filePath, "size:", fSize)
if offsetPossition == fSize { if offsetPossition == fSize {
log.Println("Достигнут конец файла", fileName, "Размер:", fSize, "Позиция:", offsetPossition) log.Println("Достигнут конец файла", fileName, "Размер:", fSize, "Позиция:", offsetPossition)
} }
f, err := os.Open(filePath) f, err := os.Open(filePath)
if err != nil { if err != nil {
log.Println("Ошибка открытия файла:", filePath, err) log.Println("Ошибка открытия файла:", filePath, err)
} }
defer f.Close() defer f.Close()
// Переместим указатель в файле на последнюю позицию при закрытии программы // Переместим указатель в файле на последнюю позицию при закрытии программы
pos, err := f.Seek(offsetPossition, 0) pos, err := f.Seek(offsetPossition, 0)
if err != nil { if err != nil {
log.Printf("Seek error on %s: %s", fileName, err) log.Printf("Seek error on %s: %s", fileName, err)
} else { } else {
log.Println("Начинаем с позиции:", pos) log.Println("Начинаем с позиции:", pos)
} }
r := bufio.NewReader(f) r := bufio.NewReader(f)
info, err := f.Stat() info, err := f.Stat()
if err != nil { if err != nil {
@ -986,23 +987,18 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) {
fOut = createOutFile(fileName) fOut = createOutFile(fileName)
// writeOutCSVHeader(fOut) // writeOutCSVHeader(fOut)
} }
oldSize := info.Size() oldSize := info.Size()
// Регулярные выражения для определения начала и конца записи // Регулярные выражения для определения начала и конца записи
regexpBeginRecord := regexp.MustCompile(`^\{\d{14},\w{1},$`) regexpBeginRecord := regexp.MustCompile(`^\{\d{14},\w{1},$`)
regexpEndRecord := regexp.MustCompile(`^\},$`) regexpEndRecord := regexp.MustCompile(`^\},$`)
// var (й
// isPrefix bool = true
// // err error = nil
// line []byte
// )
for { for {
line, err := Readln(r) line, err := Readln(r)
// for line, _, err := r.ReadLine(); err != io.EOF; line, _, err = r.ReadLine() { // for line, _, err := r.ReadLine(); err != io.EOF; line, _, err = r.ReadLine() {
for err == nil { for err == nil {
// fmt.Println(line) // fmt.Println(line)
// определяем текущую позицию записи в файле и пишем в файл // определяем текущую позицию записи в файле и пишем в файл
logOffset, _ := f.Seek(0, os.SEEK_CUR) logOffset, _ := f.Seek(0, os.SEEK_CUR)
writeTempFile(tempFile, logOffset) writeTempFile(tempFile, logOffset)
@ -1031,6 +1027,7 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) {
result := parseString(outLine, logOffset) result := parseString(outLine, logOffset)
if Debug { if Debug {
log.Println(len(result), result) log.Println(len(result), result)
log.Println("Префикс индекса:", EsIndexPrefix, result["index_day_prefix"])
} }
// Пишем выходные файлы // Пишем выходные файлы
if WriteOutFile { if WriteOutFile {
@ -1042,25 +1039,30 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) {
} }
// Отправляем данные в ElasticSearch / OpenSearch // Отправляем данные в ElasticSearch / OpenSearch
if SendToEs { if SendToEs {
// if result["index_day_prefix"] == "" { if result["index_day_prefix"] != "" {
// continue indexName = EsIndexPrefix + "-" + result["index_day_prefix"]
}
// if Debug {
// log.Println("Index name:", indexName)
// // log.Println("Пакетная обработка:", EsBulk)
// } // }
indexName := EsIndexPrefix + "-" + result["index_day_prefix"] // Пакетная обработка, запускается если выставлена опция командной строки
// пакетная отправка данных в эластик // иначе отправляется по одной строке
// addRes := esAddRecord(client, indexName, result) if EsBulk {
if Debug { if bulkCount < EsBulkRecordsQuantity {
log.Println("Index name:", indexName) if len(result) != 0 {
} arrRecords = append(arrRecords, result)
go esAddRecord(client, indexName, result) bulkCount++
}
} else {
go esAddRecordsBulk(client, indexName, arrRecords)
bulkCount = 0
arrRecords = nil
}
} else {
go esAddRecord(client, indexName, result)
}
time.Sleep(TailSleep) time.Sleep(TailSleep)
// time.Sleep(5 * time.Millisecond)
// Если запись не добавилась делаем повторную попытку через 5 сек.
// if !addRes {
// time.Sleep(5 * time.Second)
// addRes = esAddRecord(client, indexName, result)
// log.Println("Повторная попытка добавления записи:", result, addRes)
// }
} }
writeTempFile(tempFile, logOffset) writeTempFile(tempFile, logOffset)
if Debug { if Debug {
@ -1087,7 +1089,7 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) {
if InFile != "" { if InFile != "" {
return return
} }
// Запускаем чтение каталога и получепние списка с новыми файлами // Запускаем чтение каталога и получепние списка с новыми файлами
for { for {
time.Sleep(time.Second) time.Sleep(time.Second)
@ -1117,7 +1119,14 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) {
} }
} }
} }
// Если получен сигнал о наличии новго файла то прекращаем читать старый
// и отправляем пакет данных в эластик (чтобы не потерять последние записи)
if shutUp { if shutUp {
if SendToEs && EsBulk {
go esAddRecordsBulk(client, indexName, arrRecords)
bulkCount = 0
arrRecords = nil
}
break break
} }
} }
@ -1131,14 +1140,14 @@ func createWorkDir() {
for _, dir := range dirs { for _, dir := range dirs {
// fmt.Println(dir) // fmt.Println(dir)
if !directoryExists(dir) { if !directoryExists(dir) {
err := os.Mkdir(dir, 0755) err := os.Mkdir(dir, 0755)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} else { } else {
log.Println("Создан каталог", dir) log.Println("Создан каталог", dir)
} }
} }
} }
} }
// CloseIdleConnections() // CloseIdleConnections()
func esConnect() *opensearch.Client { func esConnect() *opensearch.Client {
@ -1184,21 +1193,24 @@ func esConnect() *opensearch.Client {
// Создаем индекс в ES // Создаем индекс в ES
func esCreateIndex(indexName string, client *opensearch.Client) { func esCreateIndex(indexName string, client *opensearch.Client) {
mapping := strings.NewReader(IndexTemplate) mapping := strings.NewReader(IndexTemplate)
// Create an index with non-default settings. // Create an index with non-default settings.
createIndex := opensearchapi.IndicesCreateRequest{ createIndex := opensearchapi.IndicesCreateRequest{
Index: indexName, Index: indexName,
Body: mapping, Body: mapping,
} }
if Debug { if Debug {
log.Println("Create index:", createIndex) log.Println("Create index:", createIndex)
} }
createIndexResponse, err := createIndex.Do(context.Background(), client) createIndexResponse, err := createIndex.Do(context.Background(), client)
if err != nil { if err != nil {
log.Println("failed to create index ", err, createIndexResponse) log.Println("failed to create index ", err, createIndexResponse)
os.Exit(1) os.Exit(1)
} }
fmt.Println(createIndexResponse.StatusCode) if Debug {
log.Println("Create index:", createIndex)
}
fmt.Println(createIndexResponse.StatusCode)
if strings.Contains(createIndexResponse.String(), "already exists") { if strings.Contains(createIndexResponse.String(), "already exists") {
log.Println(createIndexResponse.IsError()) log.Println(createIndexResponse.IsError())
log.Println("Index", indexName, "already exists") log.Println("Index", indexName, "already exists")
@ -1207,35 +1219,6 @@ func esCreateIndex(indexName string, client *opensearch.Client) {
} }
} }
func esAddRecordsBulk(client *opensearch.Client, indexName string, arrLine []map[string]string) {
// Perform bulk operations.
for _, obj := range arrLine {
aJson, err := json.Marshal(obj)
fmt.Println(aJson)
if err != nil{
log.Printf("Error encoding query: %s", err)
}
}
// blk, err := client.Bulk(
// strings.NewReader(`
// { "index" : { "_index" : "go-test-index1", "_id" : "2" } }
// { "title" : "Interstellar", "director" : "Christopher Nolan", "year" : "2014"}
// { "create" : { "_index" : "go-test-index1", "_id" : "3" } }
// { "title" : "Star Trek Beyond", "director" : "Justin Lin", "year" : "2015"}
// { "update" : {"_id" : "3", "_index" : "go-test-index1" } }
// { "doc" : {"year" : "2016"} }
// `),
// )
//
// if err != nil {
// fmt.Println("failed to perform bulk operations", err)
// os.Exit(1)
// }
// fmt.Println("Performing bulk operations")
// fmt.Println(blk)
}
// Вставка записи в индекс ES // Вставка записи в индекс ES
func esAddRecord(client *opensearch.Client, indexName string, line map[string]string) bool { func esAddRecord(client *opensearch.Client, indexName string, line map[string]string) bool {
@ -1246,21 +1229,21 @@ func esAddRecord(client *opensearch.Client, indexName string, line map[string]st
} }
var operResult bool var operResult bool
aJson, err := json.MarshalIndent(line, "", " ") aJson, err := json.MarshalIndent(line, "", " ")
if err != nil { if err != nil {
log.Printf("Error encoding: %s", line, "with error %s", err) log.Printf("Error encoding: %s", line, "with error %s", err)
} }
if Debug { if Debug {
log.Println(line) log.Println(line)
} }
document := strings.NewReader(string(aJson)) document := strings.NewReader(string(aJson))
if Debug { if Debug {
log.Println(document) log.Println(document)
} }
req := opensearchapi.IndexRequest{ req := opensearchapi.IndexRequest{
Index: indexName, Index: indexName,
Body: document, Body: document,
} }
if Debug { if Debug {
log.Println(req) log.Println(req)
} }
@ -1273,13 +1256,108 @@ func esAddRecord(client *opensearch.Client, indexName string, line map[string]st
} else { } else {
operResult = true operResult = true
} }
if Debug {
log.Println(insertResponse) // Проверям результат добавления записи, если код возврата не равен 200
// то повторяем несколько раз
if insertResponse.StatusCode == 200 && operResult {
log.Println("Пакет данных добавлен")
} else {
// Ждем две секунды перед повторной отправкой
j := 1
for j <= 5 {
time.Sleep(2 * time.Second)
insertResponse, err := req.Do(context.Background(), client)
if err != nil {
log.Println("Ошибка повторного добавления записи в opensearch (процедура esAddRecordsBulk):", err)
}
if insertResponse.StatusCode == 200 {
operResult = true
break
}
j++
}
} }
if Debug {
log.Printf("Ответ от %s при добавлении записи: %s", EsUrl, insertResponse)
}
defer insertResponse.Body.Close() defer insertResponse.Body.Close()
return operResult return operResult
} }
// Пакетная Вставка записуй в индекс ES
func esAddRecordsBulk(client *opensearch.Client, indexName string, arr []map[string]string) bool {
if Debug {
log.Println("================= Отправка пакета данных ===================")
log.Println("Индекс:", indexName)
// log.Println(arr)
}
if len(arr) == 0 {
return false
}
var (
operResult bool
resultJson string
)
resultJson = ""
for _, line := range arr {
// if Debug {
// log.Println("----", i, line)
// }
if len(line) != 0 {
resultJson += fmt.Sprintf("{\"create\": { \"_index\": \"%s\"}}\n", indexName)
aJson, err := json.Marshal(line)
if err != nil {
log.Printf("Error encoding: %s", line, "with error %s", err)
} else {
resultJson += fmt.Sprintf("%s\n", string(aJson))
}
}
}
// _ = strings.NewReader(string(aJson))
if Debug {
log.Println(string(resultJson))
}
r := strings.NewReader(resultJson)
bulkResp, err := client.Bulk(r)
if err != nil {
log.Println("Ошибка добавления пакета данных в opensearch (процедура esAddRecordsBulk):", err)
operResult = false
} else {
operResult = true
}
// Проверям результат добавления записи, если код возврата не равен 200
// то повторяем несколько раз
if bulkResp.StatusCode == 200 && operResult {
log.Println("Пакет данных добавлен")
} else {
// Ждем две секунды перед повторной отправкой
j := 1
for j <= 5 {
time.Sleep(2 * time.Second)
bulkResp, err = client.Bulk(r)
if err != nil {
log.Println("Ошибка повторного добавления пакета данных в opensearch (процедура esAddRecordsBulk):", err)
}
if bulkResp.StatusCode == 200 {
operResult = true
break
}
j++
}
}
if Debug {
log.Printf("Ответ от %s при добавлении пакета данных: %s", EsUrl, bulkResp)
}
defer bulkResp.Body.Close()
return operResult
}
// Читаем конфиг и определяем переменные // Читаем конфиг и определяем переменные
func readConfigFile(fileName string) { func readConfigFile(fileName string) {
cfg, err := ini.LoadSources(ini.LoadOptions{ cfg, err := ini.LoadSources(ini.LoadOptions{
@ -1288,7 +1366,7 @@ func readConfigFile(fileName string) {
if err != nil { if err != nil {
log.Println("Ошибка чтения файла конфигурации ", fileName, ": ", err) log.Println("Ошибка чтения файла конфигурации ", fileName, ": ", err)
} }
config := Config{} config := Config{}
cfg.MapTo(&config) cfg.MapTo(&config)
DirIn = cfg.Section("General").Key("dirIn").String() DirIn = cfg.Section("General").Key("dirIn").String()
@ -1308,6 +1386,8 @@ func readConfigFile(fileName string) {
EsPassword = cfg.Section("ElasticSearch").Key("esPassword").String() EsPassword = cfg.Section("ElasticSearch").Key("esPassword").String()
EsSkipVerify = cfg.Section("ElasticSearch").Key("esSkipVerify").MustBool() EsSkipVerify = cfg.Section("ElasticSearch").Key("esSkipVerify").MustBool()
EsIndexPrefix = cfg.Section("ElasticSearch").Key("esIndexPrefix").String() EsIndexPrefix = cfg.Section("ElasticSearch").Key("esIndexPrefix").String()
EsBulk = cfg.Section("ElasticSearch").Key("esBulk").MustBool()
EsBulkRecordsQuantity, _ = strconv.Atoi(cfg.Section("ElasticSearch").Key("esBulkRecordsQuantity").String())
} }
WriteOutFile = cfg.Section("Processing").Key("writeOutFile").MustBool() WriteOutFile = cfg.Section("Processing").Key("writeOutFile").MustBool()
@ -1329,7 +1409,7 @@ func writeConfigFile(fileName string) {
log.Println("Ошибка записи в выходной файл:", err) log.Println("Ошибка записи в выходной файл:", err)
} else { } else {
log.Println("Создан файл конфигурации:", fileName) log.Println("Создан файл конфигурации:", fileName)
} }
} }
// Создание каталога с конфигами // Создание каталога с конфигами
func createConfDir(dir string) { func createConfDir(dir string) {
@ -1387,14 +1467,14 @@ func RunWinService() {
func main() { func main() {
// Читаем опции коммандной строки // Читаем опции коммандной строки
flag.BoolVar(&RunAsWinService, "run-win-service", false, "Запуск как службы Windows") flag.BoolVar(&RunAsWinService, "run-win-service", false, "Запуск как службы Windows")
flag.StringVar(&DirCfg, "dir-config", ".config", "Каталог для файлов настройки (DIR_CFG)") flag.StringVar(&DirCfg, "dir-config", ".config", "Каталог для файлов настройки (DIR_CFG)")
flag.StringVar(&FileCfg, "config", "config.ini", "Файл настроек") flag.StringVar(&FileCfg, "config", "config.ini", "Файл настроек")
flag.BoolVar(&CreateFileCfg, "create-config", false, "Создать файл настроек") flag.BoolVar(&CreateFileCfg, "create-config", false, "Создать файл настроек")
flag.BoolVar(&Debug, "debug", false, "Выводить отладочную информацию") flag.BoolVar(&Debug, "debug", false, "Выводить отладочную информацию")
flag.StringVar(&DirIn, "dir-in", "in", "Каталог для исходных файлов (DIR_IN)") flag.StringVar(&DirIn, "dir-in", "in", "Каталог для исходных файлов (DIR_IN)")
flag.StringVar(&DirOut, "dir-out", "out", "Каталог для обработанных файлов (DIR_OUT)") flag.StringVar(&DirOut, "dir-out", "out", "Каталог для обработанных файлов (DIR_OUT)")
flag.StringVar(&DirTemp, "dir-temp", "tmp", "Каталог для временных файлов (TEMP)") flag.StringVar(&DirTemp, "dir-temp", "tmp", "Каталог для временных файлов (TEMP)")
flag.StringVar(&DirLog, "dir-worklog", "log", "Каталог для лога работы (DIR_LOG)") flag.StringVar(&DirLog, "dir-worklog", "log", "Каталог для лога работы (DIR_LOG)")
flag.StringVar(&DictFile, "dict-file", "1Cv8.lgf", "Файл со словарём (DICT_FILE)") flag.StringVar(&DictFile, "dict-file", "1Cv8.lgf", "Файл со словарём (DICT_FILE)")
flag.BoolVar(&SendToEs, "es-send", false, "Отправлять данные в ElasticSearch") flag.BoolVar(&SendToEs, "es-send", false, "Отправлять данные в ElasticSearch")
flag.StringVar(&EsUrl, "es-url", "", "Адрес узла Elastic Search (ELASTICSEARCH_URL)") flag.StringVar(&EsUrl, "es-url", "", "Адрес узла Elastic Search (ELASTICSEARCH_URL)")
@ -1402,20 +1482,26 @@ func main() {
flag.StringVar(&EsPassword, "es-password", "", "Пароль пользователя Elastic Search (ELASTICSEARCH_PASSWORD)") flag.StringVar(&EsPassword, "es-password", "", "Пароль пользователя Elastic Search (ELASTICSEARCH_PASSWORD)")
flag.StringVar(&EsIndexPrefix, "es-index-prefix", "", "Префикс имени индекса Elastic Search (ELASTICSEARCH_INDEX_PREFIX)") flag.StringVar(&EsIndexPrefix, "es-index-prefix", "", "Префикс имени индекса Elastic Search (ELASTICSEARCH_INDEX_PREFIX)")
flag.BoolVar(&EsDiscoverNode, "es-discover-node", false, "Получать список узлов кластера ElasticSearch") flag.BoolVar(&EsDiscoverNode, "es-discover-node", false, "Получать список узлов кластера ElasticSearch")
// flag.IntVar(&EsBulkRecordsQuantity, "es-bulk-quantity", 10, "Количество записей в одном запросе для пакетной вставки") flag.IntVar(&EsBulkRecordsQuantity, "es-bulk-quantity", 10, "Количество записей в одном запросе для пакетной вставки")
// flag.BoolVar(&EsBulk, "es-bulk", false, "Пакетная вставка записей в Elastic Search") flag.BoolVar(&EsBulk, "es-bulk", false, "Пакетная вставка записей в Elastic Search")
flag.BoolVar(&EsSkipVerify, "es-skip-verify", false, "Пропустить проверку сертификатов при подключении к Elastic Search") flag.BoolVar(&EsSkipVerify, "es-skip-verify", false, "Пропустить проверку сертификатов при подключении к Elastic Search")
flag.BoolVar(&WriteOutFile, "write-out-file", false, "Запись обработанных данных в файл") flag.BoolVar(&WriteOutFile, "write-out-file", false, "Запись обработанных данных в файл")
flag.StringVar(&OutFormat, "out-format", "csv", "Формат данных на выходе (csv, json) (OUT_FORMAT)") flag.StringVar(&OutFormat, "out-format", "csv", "Формат данных на выходе (csv, json) (OUT_FORMAT)")
flag.StringVar(&LogFileExtention, "log-file-ext", ".lgp", "Расширение файлов с журналами (обрабатываемых логов) (LOG_FILE_EXT)") flag.StringVar(&LogFileExtention, "log-file-ext", ".lgp", "Расширение файлов с журналами (обрабатываемых логов) (LOG_FILE_EXT)")
flag.StringVar(&ObjectTypes, "object-types", "1,2,3,4,5,6,7,8", "Список типов объектов словаря для выборки, разделённый запятой (OBJECT_TYPES)") flag.StringVar(&ObjectTypes, "object-types", "1,2,3,4,5,6,7,8", "Список типов объектов словаря для выборки, разделённый запятой (OBJECT_TYPES)")
flag.StringVar(&TimeZoneOffset, "tz-offset", "+03:00", "Сдвиг по времени от UTC") flag.StringVar(&TimeZoneOffset, "tz-offset", "+03:00", "Сдвиг по времени от UTC")
flag.StringVar(&Duration, "tail-sleep", "5ms", "Задержка чтения входного файла. Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h')") flag.StringVar(&Duration, "tail-sleep", "5ms", "Задержка чтения входного файла. Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h')")
flag.StringVar(&InFile, "file", "", "Имя файла для обработки. Если требуется обработать один файл") flag.StringVar(&InFile, "file", "", "Имя файла для обработки. Если требуется обработать один файл")
flag.StringVar(&WorkLogOut, "worklog-out", "file", "Направление вывода журнала работы программы (console, file)") flag.StringVar(&WorkLogOut, "worklog-out", "file", "Направление вывода журнала работы программы (console, file)")
// flag.BoolVar(&Version, "version", false, "Версия программы")
flag.BoolFunc("version", "Версия программы", func(s string) error {
fmt.Println("Версия:", Version)
os.Exit(0)
return nil
})
flag.Parse()
flag.Parse()
configFile := filepath.Join(DirCfg, FileCfg) configFile := filepath.Join(DirCfg, FileCfg)
if fileExists(configFile) { if fileExists(configFile) {
readConfigFile(configFile) readConfigFile(configFile)
@ -1431,38 +1517,34 @@ func main() {
// os.Exit(1) // os.Exit(1)
// Установим задержку чтения входного файла // Установим задержку чтения входного файла
TailSleep, _ = time.ParseDuration(Duration) TailSleep, _ = time.ParseDuration(Duration)
// log.Println(DirCfg, DirIn, "send es:", SendToEs, OutFormat, WriteOutFile, EsUrl, "esuser:", EsUser, Duration, TailSleep) // log.Println(DirCfg, DirIn, "send es:", SendToEs, OutFormat, WriteOutFile, EsUrl, "esuser:", EsUser, Duration, TailSleep)
// Проверка и установка переменных для работы // Проверка и установка переменных для работы
if DirIn == "" && os.Getenv("DIR_IN") == "" { if DirIn == "" && os.Getenv("DIR_IN") == "" {
fmt.Println("Make sure environment variables `DIR_IN`, or used with '-dir-in' argument") fmt.Println("Make sure environment variables `DIR_IN`, or used with '-dir-in' argument")
os.Exit(1) os.Exit(1)
} else if DirIn == "" && os.Getenv("DIR_IN") != "" { } else if DirIn == "" && os.Getenv("DIR_IN") != "" {
DirIn = os.Getenv("DIR_IN") DirIn = os.Getenv("DIR_IN")
} }
if WriteOutFile { if WriteOutFile {
if DirOut == "" && os.Getenv("DIR_OUT") == "" { if DirOut == "" && os.Getenv("DIR_OUT") == "" {
fmt.Println("Make sure environment variables `DIR_OUT`, or used with '-dir-out' argument") fmt.Println("Make sure environment variables `DIR_OUT`, or used with '-dir-out' argument")
os.Exit(1) os.Exit(1)
} else if DirOut == "" && os.Getenv("DIR_OUT") != "" { } else if DirOut == "" && os.Getenv("DIR_OUT") != "" {
DirOut = os.Getenv("DIR_OUT") DirOut = os.Getenv("DIR_OUT")
} }
}
if os.Getenv("LOG_FILE_EXT") != "" {
LogFileExtention = os.Getenv("LOG_FILE_EXT")
}
if os.Getenv("DIR_LOG") != "" {
DirLog = os.Getenv("DIR_LOG")
} }
if os.Getenv("LOG_FILE_EXT") != "" {
LogFileExtention = os.Getenv("LOG_FILE_EXT")
}
// if os.Getenv("TEMP") != "" {
// DirTemp = os.Getenv("TEMP")
// }
if os.Getenv("DIR_LOG") != "" {
DirLog = os.Getenv("DIR_LOG")
}
// Проверяем параметры и переменные окружения для работы с ElasticSearch / OpenSearch // Проверяем параметры и переменные окружения для работы с ElasticSearch / OpenSearch
if SendToEs { if SendToEs {
if EsUser == "" && os.Getenv("ELASTICSEARCH_USER") == "" { if EsUser == "" && os.Getenv("ELASTICSEARCH_USER") == "" {
@ -1471,21 +1553,21 @@ func main() {
} else if EsUser == "" && os.Getenv("ELASTICSEARCH_USER") != "" { } else if EsUser == "" && os.Getenv("ELASTICSEARCH_USER") != "" {
EsUser = os.Getenv("ELASTICSEARCH_USER") EsUser = os.Getenv("ELASTICSEARCH_USER")
} }
if EsPassword == "" && os.Getenv("ELASTICSEARCH_PASSWORD") == "" { if EsPassword == "" && os.Getenv("ELASTICSEARCH_PASSWORD") == "" {
fmt.Println("Make sure environment variables `ELASTICSEARCH_PASSWORD or used with '-es-password' argument") fmt.Println("Make sure environment variables `ELASTICSEARCH_PASSWORD or used with '-es-password' argument")
os.Exit(1) os.Exit(1)
} else if EsPassword == "" && os.Getenv("ELASTICSEARCH_PASSWORD") != "" { } else if EsPassword == "" && os.Getenv("ELASTICSEARCH_PASSWORD") != "" {
EsPassword = os.Getenv("ELASTICSEARCH_PASSWORD") EsPassword = os.Getenv("ELASTICSEARCH_PASSWORD")
} }
if EsUrl == "" && os.Getenv("ELASTICSEARCH_URL") == "" { if EsUrl == "" && os.Getenv("ELASTICSEARCH_URL") == "" {
fmt.Println("Make sure environment variables `ELASTICSEARCH_URL or used with '-es-url' argument") fmt.Println("Make sure environment variables `ELASTICSEARCH_URL or used with '-es-url' argument")
os.Exit(1) os.Exit(1)
} else if EsUrl == "" && os.Getenv("ELASTICSEARCH_URL") != "" { } else if EsUrl == "" && os.Getenv("ELASTICSEARCH_URL") != "" {
EsUrl = os.Getenv("ELASTICSEARCH_URL") EsUrl = os.Getenv("ELASTICSEARCH_URL")
} }
if EsIndexPrefix == "" && os.Getenv("ELASTICSEARCH_INDEX_PREFIX") == "" { if EsIndexPrefix == "" && os.Getenv("ELASTICSEARCH_INDEX_PREFIX") == "" {
fmt.Println("Make sure environment variables `ELASTICSEARCH_INDEX_PREFIX` or used with '-es-index-prefix' argument") fmt.Println("Make sure environment variables `ELASTICSEARCH_INDEX_PREFIX` or used with '-es-index-prefix' argument")
os.Exit(1) os.Exit(1)
@ -1493,9 +1575,9 @@ func main() {
EsIndexPrefix = os.Getenv("ELASTICSEARCH_INDEX_PREFIX") EsIndexPrefix = os.Getenv("ELASTICSEARCH_INDEX_PREFIX")
} }
} }
createWorkDir() createWorkDir()
// Определим куда выводим лог работы // Определим куда выводим лог работы
switch WorkLogOut { switch WorkLogOut {
case "file": case "file":
@ -1518,7 +1600,7 @@ func main() {
// readDictFile(dictFile) // readDictFile(dictFile)
DictObjects = readDictFile(DictFile) DictObjects = readDictFile(DictFile)
// Получаем временную зону и часовой пояс // Получаем временную зону и часовой пояс
// Если зону возвращает UTC то присваеваем вручную из опций командной строки // Если зону возвращает UTC то присваеваем вручную из опций командной строки
// Хреновый хак но пока решения нет. Так как на винде возвращает почему-то нули // Хреновый хак но пока решения нет. Так как на винде возвращает почему-то нули
@ -1526,14 +1608,14 @@ func main() {
if LocalTZOffset == "+00:00" { if LocalTZOffset == "+00:00" {
LocalTZOffset = TimeZoneOffset LocalTZOffset = TimeZoneOffset
} }
log.Println(DirCfg, FileCfg, DirIn, DirOut, DirTemp, DirLog, DictFile) log.Println(DirCfg, FileCfg, DirIn, DirOut, DirTemp, DirLog, DictFile)
// Запуск как сервиса // Запуск как сервиса
if RunAsWinService { if RunAsWinService {
RunWinService() RunWinService()
} else { } else {
runOperations() runOperations()
} }
log.Println("Работа программы завершена") log.Println("Работа программы завершена")
} }