Добавлен фильтр по событиям Event. Изменена процедура чтения строк. Изменен порядок чтения файлов
parent
23fe77e2d2
commit
cdc3a59b9c
24
README.md
24
README.md
|
@ -2,7 +2,8 @@
|
||||||
|
|
||||||
Программа для обработки журналов 1С.
|
Программа для обработки журналов 1С.
|
||||||
|
|
||||||
В качестве входных данных используются файлы журналов "20230126000001.lgp". В качестве словаря для подстановки значений используется файл "1Cv8.lgf".
|
В качестве входных данных используются файлы журналов "20230126000001.lgp" (Для обработки берется файл имя которого совпадает с текущей датой).
|
||||||
|
В качестве словаря для подстановки значений используется файл "1Cv8.lgf".
|
||||||
|
|
||||||
Обработанные данные могут быть сохранены в файл как в JSON так и в CSV форматах. Также возможна отправка обработанных данных в ElasticSearch.
|
Обработанные данные могут быть сохранены в файл как в JSON так и в CSV форматах. Также возможна отправка обработанных данных в ElasticSearch.
|
||||||
|
|
||||||
|
@ -12,10 +13,14 @@
|
||||||
|
|
||||||
С целью ускорения работы запуск процесса отправки записей в эластик сделан параллельно, т.е. каждая запись отправляется в отдельном потоке (goroutine) поэтому для исключения переполнения буфера соединений в процедуру чтения файла (func tail()) введена задержка в 5 миллисекунд (time.Sleep(5 * time.Millisecond)). Это костыль и пока нормального решения я не придумал.
|
С целью ускорения работы запуск процесса отправки записей в эластик сделан параллельно, т.е. каждая запись отправляется в отдельном потоке (goroutine) поэтому для исключения переполнения буфера соединений в процедуру чтения файла (func tail()) введена задержка в 5 миллисекунд (time.Sleep(5 * time.Millisecond)). Это костыль и пока нормального решения я не придумал.
|
||||||
|
|
||||||
|
Типы ообъектов в словаре
|
||||||
|
1 – пользователи; 2 – компьютеры; 3 – приложения; 4 – события; 5 – метаданные; 6 – серверы; 7 – основные порты; 8 – вспомогательные порты.
|
||||||
|
|
||||||
|
|
||||||
## Опции командной строки
|
## Опции командной строки
|
||||||
|
|
||||||
```
|
```
|
||||||
Usage of log-processor:
|
Usage of /tmp/go-build847101717/b001/exe/log-processor:
|
||||||
-config string
|
-config string
|
||||||
Файл настроек (default "config.ini")
|
Файл настроек (default "config.ini")
|
||||||
-create-config
|
-create-config
|
||||||
|
@ -87,6 +92,13 @@ log-processor -dir-config ~/tmp/123 -config processor.ini -create-config
|
||||||
|
|
||||||
Программа позволяет отправлять обработанные данные в ElasticSearch/OpenSearch опция "-es-send", сохранять в файлах формата csv и json: "-write-out-file" и "-out-format" соответсвенно, выводить в консоль - "-debug". Для всех операций используются соответствующие ключи запуска или параметры в файле настроек.
|
Программа позволяет отправлять обработанные данные в ElasticSearch/OpenSearch опция "-es-send", сохранять в файлах формата csv и json: "-write-out-file" и "-out-format" соответсвенно, выводить в консоль - "-debug". Для всех операций используются соответствующие ключи запуска или параметры в файле настроек.
|
||||||
|
|
||||||
|
Для уменьшения количества записей можно задать список событий, которые будут исключены из итоговой выдачи (как в файл так и в эластик), разделенный точкой с запятой. Для этого используется параметр в файле конфигурации "excludeEvent":
|
||||||
|
|
||||||
|
```
|
||||||
|
excludeEvent=_$Transaction$_.Begin;\
|
||||||
|
_$Transaction$_.Commit
|
||||||
|
```
|
||||||
|
|
||||||
## Запуск как служба Windows
|
## Запуск как служба Windows
|
||||||
|
|
||||||
Перед запуском службу следует зарегистрировать в системе средствами Windows:
|
Перед запуском службу следует зарегистрировать в системе средствами Windows:
|
||||||
|
@ -98,6 +110,12 @@ r-config C:\1c-logprocessor\.config\ -run-win-service" DisplayName="1C log proce
|
||||||
|
|
||||||
Пути можно указать свои. Управление службой осуществляется через штатную оснастку системы.
|
Пути можно указать свои. Управление службой осуществляется через штатную оснастку системы.
|
||||||
|
|
||||||
|
## Автоматическая сборка и доставка
|
||||||
|
|
||||||
|
Для автоматизации сборки и доставки программы используется Gitlab CI.
|
||||||
|
|
||||||
|
При изменении файлов в репозитории будет запущен процесс сборки исполняемых файлов под разные платформы. После этого (при условии успешной сборки) файлы будут скопированы на WWW-сервис https://files.corp.samsonopt.ru и доступны по [ссылке](https://files.corp.samsonopt.ru/bin/1c-logprocessor/).
|
||||||
|
|
||||||
## Пример конфигурационного файла
|
## Пример конфигурационного файла
|
||||||
|
|
||||||
```
|
```
|
||||||
|
@ -141,4 +159,6 @@ timeZoneOffset=+03:00
|
||||||
; Задержка чтения входного файла между строками
|
; Задержка чтения входного файла между строками
|
||||||
; Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h'
|
; Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h'
|
||||||
readDuration=5ms
|
readDuration=5ms
|
||||||
|
; Список событий (поле "Event"), исключенных из выдачи, разделенный ";"
|
||||||
|
excludeEvent=
|
||||||
```
|
```
|
||||||
|
|
212
log-processor.go
212
log-processor.go
|
@ -5,7 +5,6 @@
|
||||||
// ----------------------------------------------------------------------
|
// ----------------------------------------------------------------------
|
||||||
// Лицензия: GPL(v3)
|
// Лицензия: GPL(v3)
|
||||||
//------------------------- ------------------------------------------------
|
//------------------------- ------------------------------------------------
|
||||||
|
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -25,10 +24,10 @@ import (
|
||||||
// "github.com/hpcloud/tail"
|
// "github.com/hpcloud/tail"
|
||||||
"net"
|
"net"
|
||||||
"sort"
|
"sort"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"net/http"
|
"net/http"
|
||||||
"context"
|
"context"
|
||||||
"gopkg.in/ini.v1"
|
"gopkg.in/ini.v1"
|
||||||
opensearch "github.com/opensearch-project/opensearch-go/v2"
|
opensearch "github.com/opensearch-project/opensearch-go/v2"
|
||||||
opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi"
|
opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi"
|
||||||
"github.com/kardianos/service"
|
"github.com/kardianos/service"
|
||||||
|
@ -66,6 +65,7 @@ var (
|
||||||
DictFile string
|
DictFile string
|
||||||
TailSleep time.Duration
|
TailSleep time.Duration
|
||||||
RunAsWinService bool
|
RunAsWinService bool
|
||||||
|
ExcludeEvent []string
|
||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
|
@ -88,7 +88,8 @@ type Config struct {
|
||||||
OutFormat string `ini:"outFormat"`
|
OutFormat string `ini:"outFormat"`
|
||||||
WriteOutFile bool `ini:"writeOutFile"`
|
WriteOutFile bool `ini:"writeOutFile"`
|
||||||
TimeZoneOffset string `ini:"timeZoneOffset"`
|
TimeZoneOffset string `ini:"timeZoneOffset"`
|
||||||
Duration string `ini:"Duration"`
|
Duration string `ini:"Duration"`
|
||||||
|
ExcludeEvent []string `ini:"excludeEvent"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Key struct {
|
type Key struct {
|
||||||
|
@ -233,6 +234,8 @@ timeZoneOffset=+03:00
|
||||||
; Задержка чтения входного файла между строками
|
; Задержка чтения входного файла между строками
|
||||||
; Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h'
|
; Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h'
|
||||||
readDuration=5ms
|
readDuration=5ms
|
||||||
|
; Список событий (поле "Event"), исключенных из выдачи, разделенный ";"
|
||||||
|
excludeEvent=
|
||||||
`
|
`
|
||||||
|
|
||||||
// Список файлов в каталоге
|
// Список файлов в каталоге
|
||||||
|
@ -241,11 +244,18 @@ func getDirectoryContents(dir string) []string {
|
||||||
entries, err := os.ReadDir(dir);
|
entries, err := os.ReadDir(dir);
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("Ошибка получения содержимого каталога: ", dir, err);
|
log.Println("Ошибка получения содержимого каталога: ", dir, err);
|
||||||
}
|
} else {
|
||||||
|
if Debug {
|
||||||
|
log.Println("Get the directory content:", dir)
|
||||||
|
}
|
||||||
|
}
|
||||||
//iterate through the directory entities and print out their name.
|
//iterate through the directory entities and print out their name.
|
||||||
for _, entry := range(entries) {
|
for _, entry := range(entries) {
|
||||||
if filepath.Ext(entry.Name()) == ".lgp" {
|
if filepath.Ext(entry.Name()) == LogFileExtention {
|
||||||
// fmt.Println(entry.Name());
|
// fmt.Println(entry.Name());
|
||||||
|
if Debug {
|
||||||
|
log.Println("Finded file:", entry.Name())
|
||||||
|
}
|
||||||
names = append(names, entry.Name())
|
names = append(names, entry.Name())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -261,19 +271,42 @@ func getLastModifyFile(dir string) []string {
|
||||||
var modTime time.Time
|
var modTime time.Time
|
||||||
var names []string
|
var names []string
|
||||||
for _, fi := range files {
|
for _, fi := range files {
|
||||||
// fmt.Println(fi)
|
// // fmt.Println(fi)
|
||||||
// names = append(names, fi.Name())
|
// names = append(names, fi.Name())
|
||||||
fileInfo, _ := fi.Info()
|
if Debug {
|
||||||
if fileInfo.Mode().IsRegular() {
|
log.Println(LogFileExtention, filepath.Ext(fi.Name()))
|
||||||
// fmt.Println(fi, fileInfo.ModTime())
|
}
|
||||||
if !fileInfo.ModTime().Before(modTime) {
|
if filepath.Ext(fi.Name()) == LogFileExtention {
|
||||||
if fileInfo.ModTime().After(modTime) {
|
fileInfo, _ := fi.Info()
|
||||||
modTime = fileInfo.ModTime()
|
if Debug {
|
||||||
names = names[:0]
|
log.Println("Finding file info:", fileInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
if fileInfo.Mode().IsRegular() {
|
||||||
|
if !fileInfo.ModTime().Before(modTime) {
|
||||||
|
if fileInfo.ModTime().After(modTime) {
|
||||||
|
modTime = fileInfo.ModTime()
|
||||||
|
names = names[:0]
|
||||||
|
}
|
||||||
|
// if filepath.Ext(fi.Name()) == ".lgp" {
|
||||||
|
dt := time.Now()
|
||||||
|
// Сравниваем имя последнего файла с текущей датой
|
||||||
|
// и если равны - запускаем процесс разборки
|
||||||
|
if getDate(fi.Name()) == dt.Format("2006-01-02") {
|
||||||
|
names = append(names, fi.Name())
|
||||||
|
log.Println("Текущая дата и дата файла совпадают:", getDate(fi.Name()), dt.Format("2006-01-02"))
|
||||||
|
}
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
if filepath.Ext(fi.Name()) == ".lgp" {
|
if Debug {
|
||||||
names = append(names, fi.Name())
|
log.Println("Finding file modify time:", fileInfo.ModTime())
|
||||||
|
log.Println("Last file modify time:", modTime)
|
||||||
|
log.Println("Current time:", fileInfo.ModTime().Before(modTime), fileInfo.ModTime().After(modTime))
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if Debug {
|
||||||
|
log.Println("Finding file was not regular:", fi)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -339,6 +372,10 @@ func readDictFile(fileName string) map[int]map[int]string {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error opening file: %v\n",err)
|
log.Printf("error opening file: %v\n",err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
} else {
|
||||||
|
if Debug {
|
||||||
|
log.Println("Opening file:", fileName)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
r := bufio.NewReader(f)
|
r := bufio.NewReader(f)
|
||||||
s, e := Readln(r)
|
s, e := Readln(r)
|
||||||
|
@ -460,7 +497,9 @@ func createOutFile(fileName string) *os.File {
|
||||||
|
|
||||||
func writeOutCSVFile(outChannel *os.File, m map[string]string) {
|
func writeOutCSVFile(outChannel *os.File, m map[string]string) {
|
||||||
var result string
|
var result string
|
||||||
|
if len(m) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
keys := make([]string, 0, len(m))
|
keys := make([]string, 0, len(m))
|
||||||
for k := range m {
|
for k := range m {
|
||||||
keys = append(keys, k)
|
keys = append(keys, k)
|
||||||
|
@ -506,6 +545,9 @@ func writeOutCSVFile(outChannel *os.File, m map[string]string) {
|
||||||
func writeOutJSONFile(outChannel *os.File, line map[string]string) {
|
func writeOutJSONFile(outChannel *os.File, line map[string]string) {
|
||||||
// fmt.Println(">>>>>>>>>",line)
|
// fmt.Println(">>>>>>>>>",line)
|
||||||
// a_json, err := json.Marshal(line)
|
// a_json, err := json.Marshal(line)
|
||||||
|
if len(line) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
a_json, err := json.MarshalIndent(line, "", " ")
|
a_json, err := json.MarshalIndent(line, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error encoding: %s", line, "with error %s", err)
|
log.Printf("Error encoding: %s", line, "with error %s", err)
|
||||||
|
@ -646,16 +688,19 @@ func deleteTempFile(tempFile *os.File, fileName string) {
|
||||||
log.Println("Удалён временный файл:", tempFileName)
|
log.Println("Удалён временный файл:", tempFileName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// (\d{14}),(\w{1}),\{(\w+),(?\w+)\},(\d+),(\d+),(\d+),(\d+),(\d+),(\w{1}),\"(.*)\",(\d+),\{(.+)\},\"(.*)\",(\d+),(\d+),(\d+),(\d+),(\d{1}),\{(\d|,)+\}
|
||||||
func ParseString (outLine string, logOffset int64) map[string]string {
|
func parseString (outLine string, logOffset int64) map[string]string {
|
||||||
var (
|
var (
|
||||||
resultData string
|
resultData string
|
||||||
result map[string]string
|
result map[string]string
|
||||||
)
|
)
|
||||||
|
result = nil
|
||||||
regexpLogRecord := regexp.MustCompile(`(?P<DateTime>\d{14}),(?P<TranStatus>\w{1}),\{(?P<TranDuration>\w+),(?P<TranNumber>\w+)\},(?P<Usr>\d+),(?P<ComputerName>\d+),(?P<ApplicationName>\d+),(?P<ConnectionID>\d+),(?P<Event>\d+),(?P<Importance>\w{1}),\"(?P<Comment>.*)\",(?P<Metadata>\d+),\{(?P<Data>.+)\},\"(?P<PresentData>.*)\",(?P<Server>\d+),(?P<Port>\d+),(?P<AddPort>\d+),(?P<SessionID>\d+),(?P<OtherData1>\d{1}),\{(\d|,)+\}$`)
|
regexpLogRecord := regexp.MustCompile(`(?P<DateTime>\d{14}),(?P<TranStatus>\w{1}),\{(?P<TranDuration>\w+),(?P<TranNumber>\w+)\},(?P<Usr>\d+),(?P<ComputerName>\d+),(?P<ApplicationName>\d+),(?P<ConnectionID>\d+),(?P<Event>\d+),(?P<Importance>\w{1}),\"(?P<Comment>.*)\",(?P<Metadata>\d+),\{(?P<Data>.+)\},\"(?P<PresentData>.*)\",(?P<Server>\d+),(?P<Port>\d+),(?P<AddPort>\d+),(?P<SessionID>\d+),(?P<OtherData1>\d{1}),\{(\d|,)+\}$`)
|
||||||
// Полную строку прогоняем через регексп и распихиваем сразу по полям
|
// Полную строку прогоняем через регексп и распихиваем сразу по полям
|
||||||
matchedOut := regexpLogRecord.MatchString(outLine)
|
matchedOut := regexpLogRecord.MatchString(outLine)
|
||||||
// fmt.Println(matchedOut)
|
if Debug {
|
||||||
|
log.Println(outLine)
|
||||||
|
}
|
||||||
if matchedOut {
|
if matchedOut {
|
||||||
findedData := regexpLogRecord.FindStringSubmatch(outLine)
|
findedData := regexpLogRecord.FindStringSubmatch(outLine)
|
||||||
if findedData != nil {
|
if findedData != nil {
|
||||||
|
@ -695,6 +740,15 @@ func ParseString (outLine string, logOffset int64) map[string]string {
|
||||||
resultData = getDataFromDict(4, findedData[i])
|
resultData = getDataFromDict(4, findedData[i])
|
||||||
if resultData != "not" {
|
if resultData != "not" {
|
||||||
findedData[i] = resultData
|
findedData[i] = resultData
|
||||||
|
// log.Println(">>>>>>>>",findedData[i], ExcludeEvent)
|
||||||
|
for _, word := range ExcludeEvent {
|
||||||
|
if resultData == word {
|
||||||
|
if Debug {
|
||||||
|
log.Println("The event", resultData, "coincides with the specified filter", word)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case "Metadata":
|
case "Metadata":
|
||||||
resultData = getDataFromDict(5, findedData[i])
|
resultData = getDataFromDict(5, findedData[i])
|
||||||
|
@ -769,14 +823,15 @@ func runOperations () {
|
||||||
log.Println("Новых файлов не найдено")
|
log.Println("Новых файлов не найдено")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, fileName := range lastModifyFiles {
|
for _, fileName := range lastModifyFiles {
|
||||||
|
DictObjects = readDictFile(DictFile)
|
||||||
log.Println("Запускаем обработку файла", fileName)
|
log.Println("Запускаем обработку файла", fileName)
|
||||||
if SendToEs {
|
if SendToEs {
|
||||||
EsIndexName = EsIndexPrefix + "-" + getDate(fileName)
|
EsIndexName = EsIndexPrefix + "-" + getDate(fileName)
|
||||||
client = esConnect()
|
client = esConnect()
|
||||||
esCreateIndex(EsIndexName, client)
|
esCreateIndex(EsIndexName, client)
|
||||||
}
|
}
|
||||||
tail(client, fileName, os.Stdout)
|
go tail(client, fileName, os.Stdout)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Получаем список новых файлов, и если они появились после запуска программы
|
// Получаем список новых файлов, и если они появились после запуска программы
|
||||||
|
@ -790,7 +845,8 @@ func runOperations () {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if newFilesList[0] != "" {
|
if newFilesList[0] != "" {
|
||||||
for _, fileName := range newFilesList {
|
for _, fileName := range newFilesList {
|
||||||
|
DictObjects = readDictFile(DictFile)
|
||||||
if SendToEs {
|
if SendToEs {
|
||||||
EsIndexName = EsIndexPrefix + "-" + getDate(fileName)
|
EsIndexName = EsIndexPrefix + "-" + getDate(fileName)
|
||||||
// esConnect()
|
// esConnect()
|
||||||
|
@ -800,7 +856,7 @@ func runOperations () {
|
||||||
// обработанных файлов для корректного определения новых файлов
|
// обработанных файлов для корректного определения новых файлов
|
||||||
filesList = append(filesList, fileName)
|
filesList = append(filesList, fileName)
|
||||||
// запускаем процесс чтения (обработки)
|
// запускаем процесс чтения (обработки)
|
||||||
tail(client, fileName, os.Stdout)
|
go tail(client, fileName, os.Stdout)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Println("Новых файлов не найдено")
|
log.Println("Новых файлов не найдено")
|
||||||
|
@ -814,15 +870,15 @@ func runOperations () {
|
||||||
func tail(client *opensearch.Client, fileName string, out io.Writer) {
|
func tail(client *opensearch.Client, fileName string, out io.Writer) {
|
||||||
var shutUp = false
|
var shutUp = false
|
||||||
var (
|
var (
|
||||||
recBegin bool
|
recBegin = false
|
||||||
outLine string
|
outLine string
|
||||||
l string
|
l string
|
||||||
bulkCount int
|
// bulkCount int
|
||||||
arrRecords []map[string]string
|
// arrRecords []map[string]string
|
||||||
fOut *os.File
|
fOut *os.File
|
||||||
)
|
)
|
||||||
|
|
||||||
bulkCount = 0
|
// bulkCount = 0
|
||||||
|
|
||||||
filesList := getDirectoryContents(DirIn)
|
filesList := getDirectoryContents(DirIn)
|
||||||
|
|
||||||
|
@ -870,8 +926,17 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) {
|
||||||
oldSize := info.Size()
|
oldSize := info.Size()
|
||||||
regexpBeginRecord := regexp.MustCompile(`^\{\d{14},\w{1},$`)
|
regexpBeginRecord := regexp.MustCompile(`^\{\d{14},\w{1},$`)
|
||||||
regexpEndRecord := regexp.MustCompile(`^\},$`)
|
regexpEndRecord := regexp.MustCompile(`^\},$`)
|
||||||
|
// var (
|
||||||
|
// isPrefix bool = true
|
||||||
|
// // err error = nil
|
||||||
|
// line []byte
|
||||||
|
// )
|
||||||
for {
|
for {
|
||||||
for line, prefix, err := r.ReadLine(); err != io.EOF; line, prefix, err = r.ReadLine() {
|
line, err := Readln(r)
|
||||||
|
// for line, _, err := r.ReadLine(); err != io.EOF; line, _, err = r.ReadLine() {
|
||||||
|
for err == nil {
|
||||||
|
// fmt.Println(line)
|
||||||
|
|
||||||
// определяем текущую позицию записи в файле и пишем в файл
|
// определяем текущую позицию записи в файле и пишем в файл
|
||||||
logOffset, _ := f.Seek(0, os.SEEK_CUR)
|
logOffset, _ := f.Seek(0, os.SEEK_CUR)
|
||||||
writeTempFile(tempFile, logOffset)
|
writeTempFile(tempFile, logOffset)
|
||||||
|
@ -885,15 +950,20 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) {
|
||||||
}
|
}
|
||||||
if recBegin {
|
if recBegin {
|
||||||
outLine = outLine + l
|
outLine = outLine + l
|
||||||
|
// outLine = outLine + strings.TrimSpace(l)
|
||||||
}
|
}
|
||||||
// Находим конец записи
|
// Находим конец записи
|
||||||
matchedEnd := regexpEndRecord.MatchString(l)
|
matchedEnd := regexpEndRecord.MatchString(l)
|
||||||
// fmt.Println(l)
|
// fmt.Println(recBegin, matchedEnd)
|
||||||
|
// fmt.Println(outLine)
|
||||||
if matchedEnd {
|
if matchedEnd {
|
||||||
recBegin = false
|
recBegin = false
|
||||||
outLine = strings.TrimSuffix(strings.TrimLeft(outLine, "{,"), "},")
|
outLine = strings.TrimSuffix(strings.TrimLeft(outLine, "{,"), "},")
|
||||||
// Парсим подготовленную строку
|
// Парсим подготовленную строку
|
||||||
result := ParseString(outLine, logOffset)
|
result := parseString(outLine, logOffset)
|
||||||
|
if Debug {
|
||||||
|
log.Println(len(result), result)
|
||||||
|
}
|
||||||
// Пишем выходные файлы
|
// Пишем выходные файлы
|
||||||
if WriteOutFile {
|
if WriteOutFile {
|
||||||
if OutFormat == "json" {
|
if OutFormat == "json" {
|
||||||
|
@ -904,46 +974,38 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) {
|
||||||
}
|
}
|
||||||
// Отправляем данные в ElasticSearch / OpenSearch
|
// Отправляем данные в ElasticSearch / OpenSearch
|
||||||
if SendToEs {
|
if SendToEs {
|
||||||
if result["index_day_prefix"] == "" {
|
// if result["index_day_prefix"] == "" {
|
||||||
continue
|
// continue
|
||||||
}
|
// }
|
||||||
indexName := EsIndexPrefix + "-" + result["index_day_prefix"]
|
indexName := EsIndexPrefix + "-" + result["index_day_prefix"]
|
||||||
// пакетная отправка данных в эластик
|
// пакетная отправка данных в эластик
|
||||||
if EsBulk {
|
// addRes := esAddRecord(client, indexName, result)
|
||||||
if bulkCount < EsBulkRecordsQuantity {
|
go esAddRecord(client, indexName, result)
|
||||||
bulkCount++
|
time.Sleep(TailSleep)
|
||||||
// arrRecords = append(arrRecords, result)
|
// time.Sleep(5 * time.Millisecond)
|
||||||
} else {
|
|
||||||
bulkCount = 0
|
|
||||||
// esAddRecordsBulk(client, indexName, arrRecords)
|
|
||||||
}
|
|
||||||
arrRecords = append(arrRecords, result)
|
|
||||||
} else {
|
|
||||||
// addRes := esAddRecord(client, indexName, result)
|
|
||||||
go esAddRecord(client, indexName, result)
|
|
||||||
time.Sleep(TailSleep)
|
|
||||||
// time.Sleep(5 * time.Millisecond)
|
|
||||||
|
|
||||||
// Если запись не добавилась делаем повторную попытку через 5 сек.
|
// Если запись не добавилась делаем повторную попытку через 5 сек.
|
||||||
// if !addRes {
|
// if !addRes {
|
||||||
// time.Sleep(5 * time.Second)
|
// time.Sleep(5 * time.Second)
|
||||||
// addRes = esAddRecord(client, indexName, result)
|
// addRes = esAddRecord(client, indexName, result)
|
||||||
// log.Println("Повторная попытка добавления записи:", result, addRes)
|
// log.Println("Повторная попытка добавления записи:", result, addRes)
|
||||||
// }
|
// }
|
||||||
}
|
|
||||||
}
|
}
|
||||||
writeTempFile(tempFile, logOffset)
|
writeTempFile(tempFile, logOffset)
|
||||||
if Debug {
|
if Debug {
|
||||||
fmt.Println(result)
|
log.Println(outLine)
|
||||||
|
log.Println(result)
|
||||||
}
|
}
|
||||||
outLine = ""
|
outLine = ""
|
||||||
|
result = nil
|
||||||
}
|
}
|
||||||
if prefix {
|
// if prefix {
|
||||||
// fmt.Fprint(out, string(line))
|
// fmt.Println(">",string(line))
|
||||||
} else {
|
// } else {
|
||||||
// fmt.Fprintln(out, string(line))
|
// fmt.Println(">>",string(line))
|
||||||
}
|
// }
|
||||||
// --------------------------------
|
// --------------------------------
|
||||||
|
line, err = Readln(r)
|
||||||
}
|
}
|
||||||
pos, err := f.Seek(0, io.SeekCurrent)
|
pos, err := f.Seek(0, io.SeekCurrent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1098,12 +1160,17 @@ func esAddRecordsBulk(client *opensearch.Client, indexName string, arrLine []map
|
||||||
func esAddRecord(client *opensearch.Client, indexName string, line map[string]string) bool {
|
func esAddRecord(client *opensearch.Client, indexName string, line map[string]string) bool {
|
||||||
// fmt.Println(">>>>>>>>>",indexName)
|
// fmt.Println(">>>>>>>>>",indexName)
|
||||||
// a_json, err := json.Marshal(line)
|
// a_json, err := json.Marshal(line)
|
||||||
|
if len(line) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
var operResult bool
|
var operResult bool
|
||||||
aJson, err := json.MarshalIndent(line, "", " ")
|
aJson, err := json.MarshalIndent(line, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error encoding: %s", line, "with error %s", err)
|
log.Printf("Error encoding: %s", line, "with error %s", err)
|
||||||
}
|
}
|
||||||
|
if Debug {
|
||||||
|
fmt.Println(line)
|
||||||
|
}
|
||||||
document := strings.NewReader(string(aJson))
|
document := strings.NewReader(string(aJson))
|
||||||
|
|
||||||
req := opensearchapi.IndexRequest{
|
req := opensearchapi.IndexRequest{
|
||||||
|
@ -1160,6 +1227,7 @@ func readConfigFile(fileName string) {
|
||||||
|
|
||||||
TimeZoneOffset = cfg.Section("Processing").Key("timeZoneOffset").String()
|
TimeZoneOffset = cfg.Section("Processing").Key("timeZoneOffset").String()
|
||||||
Duration = cfg.Section("Processing").Key("readDuration").String()
|
Duration = cfg.Section("Processing").Key("readDuration").String()
|
||||||
|
ExcludeEvent = strings.Split(cfg.Section("Processing").Key("excludeEvent").String(), ";")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Создание файла конфигурации
|
// Создание файла конфигурации
|
||||||
|
@ -1228,7 +1296,6 @@ func RunWinService() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// Читаем опции коммандной строки
|
// Читаем опции коммандной строки
|
||||||
flag.BoolVar(&RunAsWinService, "run-win-service", false, "Запуск как службы Windows")
|
flag.BoolVar(&RunAsWinService, "run-win-service", false, "Запуск как службы Windows")
|
||||||
|
@ -1256,6 +1323,7 @@ func main() {
|
||||||
flag.StringVar(&ObjectTypes, "object-types", "1,2,3,4,5,6,7,8", "Список типов объектов словаря для выборки, разделённый запятой (OBJECT_TYPES)")
|
flag.StringVar(&ObjectTypes, "object-types", "1,2,3,4,5,6,7,8", "Список типов объектов словаря для выборки, разделённый запятой (OBJECT_TYPES)")
|
||||||
flag.StringVar(&TimeZoneOffset, "tz-offset", "+03:00", "Сдвиг по времени от UTC")
|
flag.StringVar(&TimeZoneOffset, "tz-offset", "+03:00", "Сдвиг по времени от UTC")
|
||||||
flag.StringVar(&Duration, "tail-sleep", "5ms", "Задержка чтения входного файла. Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h')")
|
flag.StringVar(&Duration, "tail-sleep", "5ms", "Задержка чтения входного файла. Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h')")
|
||||||
|
// flag.StringVar(&ExcludeEvent, "exclude-event", "", "Список событий исключенных из выдачи разделенных ';'")
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
|
@ -1298,17 +1366,13 @@ func main() {
|
||||||
LogFileExtention = os.Getenv("LOG_FILE_EXT")
|
LogFileExtention = os.Getenv("LOG_FILE_EXT")
|
||||||
}
|
}
|
||||||
|
|
||||||
if os.Getenv("TEMP") != "" {
|
// if os.Getenv("TEMP") != "" {
|
||||||
LogFileExtention = os.Getenv("LOG_FILE_EXT")
|
// DirTemp = os.Getenv("TEMP")
|
||||||
}
|
// }
|
||||||
|
|
||||||
if os.Getenv("DIR_LOG") != "" {
|
if os.Getenv("DIR_LOG") != "" {
|
||||||
DirLog = os.Getenv("DIR_LOG")
|
DirLog = os.Getenv("DIR_LOG")
|
||||||
}
|
}
|
||||||
|
|
||||||
if os.Getenv("TEMP") != "" {
|
|
||||||
LogFileExtention = os.Getenv("LOG_FILE_EXT")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Проверяем параметры и переменные окружения для работы с ElasticSearch / OpenSearch
|
// Проверяем параметры и переменные окружения для работы с ElasticSearch / OpenSearch
|
||||||
if SendToEs {
|
if SendToEs {
|
||||||
|
@ -1365,6 +1429,8 @@ func main() {
|
||||||
LocalTZOffset = TimeZoneOffset
|
LocalTZOffset = TimeZoneOffset
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Println(DirCfg, FileCfg, DirIn, DirOut, DirTemp, DirLog, DictFile)
|
||||||
|
|
||||||
// Запуск как сервиса
|
// Запуск как сервиса
|
||||||
if RunAsWinService {
|
if RunAsWinService {
|
||||||
RunWinService()
|
RunWinService()
|
||||||
|
@ -1372,4 +1438,4 @@ func main() {
|
||||||
runOperations()
|
runOperations()
|
||||||
}
|
}
|
||||||
log.Println("Работа программы завершена")
|
log.Println("Работа программы завершена")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue