diff --git a/README.md b/README.md index 1385b82..7e816f9 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,8 @@ Программа для обработки журналов 1С. -В качестве входных данных используются файлы журналов "20230126000001.lgp". В качестве словаря для подстановки значений используется файл "1Cv8.lgf". +В качестве входных данных используются файлы журналов "20230126000001.lgp" (Для обработки берется файл имя которого совпадает с текущей датой). +В качестве словаря для подстановки значений используется файл "1Cv8.lgf". Обработанные данные могут быть сохранены в файл как в JSON так и в CSV форматах. Также возможна отправка обработанных данных в ElasticSearch. @@ -12,10 +13,14 @@ С целью ускорения работы запуск процесса отправки записей в эластик сделан параллельно, т.е. каждая запись отправляется в отдельном потоке (goroutine) поэтому для исключения переполнения буфера соединений в процедуру чтения файла (func tail()) введена задержка в 5 миллисекунд (time.Sleep(5 * time.Millisecond)). Это костыль и пока нормального решения я не придумал. +Типы ообъектов в словаре +1 – пользователи; 2 – компьютеры; 3 – приложения; 4 – события; 5 – метаданные; 6 – серверы; 7 – основные порты; 8 – вспомогательные порты. + + ## Опции командной строки ``` -Usage of log-processor: +Usage of /tmp/go-build847101717/b001/exe/log-processor: -config string Файл настроек (default "config.ini") -create-config @@ -87,6 +92,13 @@ log-processor -dir-config ~/tmp/123 -config processor.ini -create-config Программа позволяет отправлять обработанные данные в ElasticSearch/OpenSearch опция "-es-send", сохранять в файлах формата csv и json: "-write-out-file" и "-out-format" соответсвенно, выводить в консоль - "-debug". Для всех операций используются соответствующие ключи запуска или параметры в файле настроек. +Для уменьшения количества записей можно задать список событий, которые будут исключены из итоговой выдачи (как в файл так и в эластик), разделенный точкой с запятой. Для этого используется параметр в файле конфигурации "excludeEvent": + +``` +excludeEvent=_$Transaction$_.Begin;\ +_$Transaction$_.Commit +``` + ## Запуск как служба Windows Перед запуском службу следует зарегистрировать в системе средствами Windows: @@ -98,6 +110,12 @@ r-config C:\1c-logprocessor\.config\ -run-win-service" DisplayName="1C log proce Пути можно указать свои. Управление службой осуществляется через штатную оснастку системы. +## Автоматическая сборка и доставка + +Для автоматизации сборки и доставки программы используется Gitlab CI. + +При изменении файлов в репозитории будет запущен процесс сборки исполняемых файлов под разные платформы. После этого (при условии успешной сборки) файлы будут скопированы на WWW-сервис https://files.corp.samsonopt.ru и доступны по [ссылке](https://files.corp.samsonopt.ru/bin/1c-logprocessor/). + ## Пример конфигурационного файла ``` @@ -141,4 +159,6 @@ timeZoneOffset=+03:00 ; Задержка чтения входного файла между строками ; Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h' readDuration=5ms +; Список событий (поле "Event"), исключенных из выдачи, разделенный ";" +excludeEvent= ``` diff --git a/log-processor.go b/log-processor.go index 9350617..e2eab69 100644 --- a/log-processor.go +++ b/log-processor.go @@ -5,7 +5,6 @@ // ---------------------------------------------------------------------- // Лицензия: GPL(v3) //------------------------- ------------------------------------------------ - package main import ( @@ -25,10 +24,10 @@ import ( // "github.com/hpcloud/tail" "net" "sort" - "crypto/tls" - "net/http" - "context" - "gopkg.in/ini.v1" + "crypto/tls" + "net/http" + "context" + "gopkg.in/ini.v1" opensearch "github.com/opensearch-project/opensearch-go/v2" opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" "github.com/kardianos/service" @@ -66,6 +65,7 @@ var ( DictFile string TailSleep time.Duration RunAsWinService bool + ExcludeEvent []string ) type Config struct { @@ -88,7 +88,8 @@ type Config struct { OutFormat string `ini:"outFormat"` WriteOutFile bool `ini:"writeOutFile"` TimeZoneOffset string `ini:"timeZoneOffset"` - Duration string `ini:"Duration"` + Duration string `ini:"Duration"` + ExcludeEvent []string `ini:"excludeEvent"` } type Key struct { @@ -233,6 +234,8 @@ timeZoneOffset=+03:00 ; Задержка чтения входного файла между строками ; Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h' readDuration=5ms +; Список событий (поле "Event"), исключенных из выдачи, разделенный ";" +excludeEvent= ` // Список файлов в каталоге @@ -241,11 +244,18 @@ func getDirectoryContents(dir string) []string { entries, err := os.ReadDir(dir); if err != nil { log.Println("Ошибка получения содержимого каталога: ", dir, err); - } + } else { + if Debug { + log.Println("Get the directory content:", dir) + } + } //iterate through the directory entities and print out their name. for _, entry := range(entries) { - if filepath.Ext(entry.Name()) == ".lgp" { + if filepath.Ext(entry.Name()) == LogFileExtention { // fmt.Println(entry.Name()); + if Debug { + log.Println("Finded file:", entry.Name()) + } names = append(names, entry.Name()) } } @@ -261,19 +271,42 @@ func getLastModifyFile(dir string) []string { var modTime time.Time var names []string for _, fi := range files { - // fmt.Println(fi) + // // fmt.Println(fi) // names = append(names, fi.Name()) - fileInfo, _ := fi.Info() - if fileInfo.Mode().IsRegular() { - // fmt.Println(fi, fileInfo.ModTime()) - if !fileInfo.ModTime().Before(modTime) { - if fileInfo.ModTime().After(modTime) { - modTime = fileInfo.ModTime() - names = names[:0] + if Debug { + log.Println(LogFileExtention, filepath.Ext(fi.Name())) + } + if filepath.Ext(fi.Name()) == LogFileExtention { + fileInfo, _ := fi.Info() + if Debug { + log.Println("Finding file info:", fileInfo) + } + + if fileInfo.Mode().IsRegular() { + if !fileInfo.ModTime().Before(modTime) { + if fileInfo.ModTime().After(modTime) { + modTime = fileInfo.ModTime() + names = names[:0] + } + // if filepath.Ext(fi.Name()) == ".lgp" { + dt := time.Now() + // Сравниваем имя последнего файла с текущей датой + // и если равны - запускаем процесс разборки + if getDate(fi.Name()) == dt.Format("2006-01-02") { + names = append(names, fi.Name()) + log.Println("Текущая дата и дата файла совпадают:", getDate(fi.Name()), dt.Format("2006-01-02")) + } + // } } - if filepath.Ext(fi.Name()) == ".lgp" { - names = append(names, fi.Name()) + if Debug { + log.Println("Finding file modify time:", fileInfo.ModTime()) + log.Println("Last file modify time:", modTime) + log.Println("Current time:", fileInfo.ModTime().Before(modTime), fileInfo.ModTime().After(modTime)) } + } else { + if Debug { + log.Println("Finding file was not regular:", fi) + } } } } @@ -339,6 +372,10 @@ func readDictFile(fileName string) map[int]map[int]string { if err != nil { log.Printf("error opening file: %v\n",err) os.Exit(1) + } else { + if Debug { + log.Println("Opening file:", fileName) + } } r := bufio.NewReader(f) s, e := Readln(r) @@ -460,7 +497,9 @@ func createOutFile(fileName string) *os.File { func writeOutCSVFile(outChannel *os.File, m map[string]string) { var result string - + if len(m) == 0 { + return + } keys := make([]string, 0, len(m)) for k := range m { keys = append(keys, k) @@ -506,6 +545,9 @@ func writeOutCSVFile(outChannel *os.File, m map[string]string) { func writeOutJSONFile(outChannel *os.File, line map[string]string) { // fmt.Println(">>>>>>>>>",line) // a_json, err := json.Marshal(line) + if len(line) == 0 { + return + } a_json, err := json.MarshalIndent(line, "", " ") if err != nil { log.Printf("Error encoding: %s", line, "with error %s", err) @@ -646,16 +688,19 @@ func deleteTempFile(tempFile *os.File, fileName string) { log.Println("Удалён временный файл:", tempFileName) } } - -func ParseString (outLine string, logOffset int64) map[string]string { +// (\d{14}),(\w{1}),\{(\w+),(?\w+)\},(\d+),(\d+),(\d+),(\d+),(\d+),(\w{1}),\"(.*)\",(\d+),\{(.+)\},\"(.*)\",(\d+),(\d+),(\d+),(\d+),(\d{1}),\{(\d|,)+\} +func parseString (outLine string, logOffset int64) map[string]string { var ( resultData string result map[string]string ) + result = nil regexpLogRecord := regexp.MustCompile(`(?P\d{14}),(?P\w{1}),\{(?P\w+),(?P\w+)\},(?P\d+),(?P\d+),(?P\d+),(?P\d+),(?P\d+),(?P\w{1}),\"(?P.*)\",(?P\d+),\{(?P.+)\},\"(?P.*)\",(?P\d+),(?P\d+),(?P\d+),(?P\d+),(?P\d{1}),\{(\d|,)+\}$`) // Полную строку прогоняем через регексп и распихиваем сразу по полям matchedOut := regexpLogRecord.MatchString(outLine) - // fmt.Println(matchedOut) + if Debug { + log.Println(outLine) + } if matchedOut { findedData := regexpLogRecord.FindStringSubmatch(outLine) if findedData != nil { @@ -695,6 +740,15 @@ func ParseString (outLine string, logOffset int64) map[string]string { resultData = getDataFromDict(4, findedData[i]) if resultData != "not" { findedData[i] = resultData + // log.Println(">>>>>>>>",findedData[i], ExcludeEvent) + for _, word := range ExcludeEvent { + if resultData == word { + if Debug { + log.Println("The event", resultData, "coincides with the specified filter", word) + } + return nil + } + } } case "Metadata": resultData = getDataFromDict(5, findedData[i]) @@ -769,14 +823,15 @@ func runOperations () { log.Println("Новых файлов не найдено") return } - for _, fileName := range lastModifyFiles { + for _, fileName := range lastModifyFiles { + DictObjects = readDictFile(DictFile) log.Println("Запускаем обработку файла", fileName) if SendToEs { EsIndexName = EsIndexPrefix + "-" + getDate(fileName) client = esConnect() esCreateIndex(EsIndexName, client) } - tail(client, fileName, os.Stdout) + go tail(client, fileName, os.Stdout) } // Получаем список новых файлов, и если они появились после запуска программы @@ -790,7 +845,8 @@ func runOperations () { continue } if newFilesList[0] != "" { - for _, fileName := range newFilesList { + for _, fileName := range newFilesList { + DictObjects = readDictFile(DictFile) if SendToEs { EsIndexName = EsIndexPrefix + "-" + getDate(fileName) // esConnect() @@ -800,7 +856,7 @@ func runOperations () { // обработанных файлов для корректного определения новых файлов filesList = append(filesList, fileName) // запускаем процесс чтения (обработки) - tail(client, fileName, os.Stdout) + go tail(client, fileName, os.Stdout) } } else { log.Println("Новых файлов не найдено") @@ -814,15 +870,15 @@ func runOperations () { func tail(client *opensearch.Client, fileName string, out io.Writer) { var shutUp = false var ( - recBegin bool + recBegin = false outLine string l string - bulkCount int - arrRecords []map[string]string + // bulkCount int + // arrRecords []map[string]string fOut *os.File ) - bulkCount = 0 + // bulkCount = 0 filesList := getDirectoryContents(DirIn) @@ -870,8 +926,17 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) { oldSize := info.Size() regexpBeginRecord := regexp.MustCompile(`^\{\d{14},\w{1},$`) regexpEndRecord := regexp.MustCompile(`^\},$`) + // var ( + // isPrefix bool = true + // // err error = nil + // line []byte + // ) for { - for line, prefix, err := r.ReadLine(); err != io.EOF; line, prefix, err = r.ReadLine() { + line, err := Readln(r) + // for line, _, err := r.ReadLine(); err != io.EOF; line, _, err = r.ReadLine() { + for err == nil { + // fmt.Println(line) + // определяем текущую позицию записи в файле и пишем в файл logOffset, _ := f.Seek(0, os.SEEK_CUR) writeTempFile(tempFile, logOffset) @@ -885,15 +950,20 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) { } if recBegin { outLine = outLine + l + // outLine = outLine + strings.TrimSpace(l) } // Находим конец записи matchedEnd := regexpEndRecord.MatchString(l) - // fmt.Println(l) + // fmt.Println(recBegin, matchedEnd) + // fmt.Println(outLine) if matchedEnd { recBegin = false outLine = strings.TrimSuffix(strings.TrimLeft(outLine, "{,"), "},") // Парсим подготовленную строку - result := ParseString(outLine, logOffset) + result := parseString(outLine, logOffset) + if Debug { + log.Println(len(result), result) + } // Пишем выходные файлы if WriteOutFile { if OutFormat == "json" { @@ -904,46 +974,38 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) { } // Отправляем данные в ElasticSearch / OpenSearch if SendToEs { - if result["index_day_prefix"] == "" { - continue - } + // if result["index_day_prefix"] == "" { + // continue + // } indexName := EsIndexPrefix + "-" + result["index_day_prefix"] // пакетная отправка данных в эластик - if EsBulk { - if bulkCount < EsBulkRecordsQuantity { - bulkCount++ - // arrRecords = append(arrRecords, result) - } else { - bulkCount = 0 - // esAddRecordsBulk(client, indexName, arrRecords) - } - arrRecords = append(arrRecords, result) - } else { - // addRes := esAddRecord(client, indexName, result) - go esAddRecord(client, indexName, result) - time.Sleep(TailSleep) - // time.Sleep(5 * time.Millisecond) + // addRes := esAddRecord(client, indexName, result) + go esAddRecord(client, indexName, result) + time.Sleep(TailSleep) + // time.Sleep(5 * time.Millisecond) - // Если запись не добавилась делаем повторную попытку через 5 сек. - // if !addRes { - // time.Sleep(5 * time.Second) - // addRes = esAddRecord(client, indexName, result) - // log.Println("Повторная попытка добавления записи:", result, addRes) - // } - } + // Если запись не добавилась делаем повторную попытку через 5 сек. + // if !addRes { + // time.Sleep(5 * time.Second) + // addRes = esAddRecord(client, indexName, result) + // log.Println("Повторная попытка добавления записи:", result, addRes) + // } } writeTempFile(tempFile, logOffset) if Debug { - fmt.Println(result) + log.Println(outLine) + log.Println(result) } outLine = "" + result = nil } - if prefix { - // fmt.Fprint(out, string(line)) - } else { - // fmt.Fprintln(out, string(line)) - } + // if prefix { + // fmt.Println(">",string(line)) + // } else { + // fmt.Println(">>",string(line)) + // } // -------------------------------- + line, err = Readln(r) } pos, err := f.Seek(0, io.SeekCurrent) if err != nil { @@ -1098,12 +1160,17 @@ func esAddRecordsBulk(client *opensearch.Client, indexName string, arrLine []map func esAddRecord(client *opensearch.Client, indexName string, line map[string]string) bool { // fmt.Println(">>>>>>>>>",indexName) // a_json, err := json.Marshal(line) + if len(line) == 0 { + return false + } var operResult bool aJson, err := json.MarshalIndent(line, "", " ") if err != nil { log.Printf("Error encoding: %s", line, "with error %s", err) } - + if Debug { + fmt.Println(line) + } document := strings.NewReader(string(aJson)) req := opensearchapi.IndexRequest{ @@ -1160,6 +1227,7 @@ func readConfigFile(fileName string) { TimeZoneOffset = cfg.Section("Processing").Key("timeZoneOffset").String() Duration = cfg.Section("Processing").Key("readDuration").String() + ExcludeEvent = strings.Split(cfg.Section("Processing").Key("excludeEvent").String(), ";") } // Создание файла конфигурации @@ -1228,7 +1296,6 @@ func RunWinService() { } } - func main() { // Читаем опции коммандной строки flag.BoolVar(&RunAsWinService, "run-win-service", false, "Запуск как службы Windows") @@ -1256,6 +1323,7 @@ func main() { flag.StringVar(&ObjectTypes, "object-types", "1,2,3,4,5,6,7,8", "Список типов объектов словаря для выборки, разделённый запятой (OBJECT_TYPES)") flag.StringVar(&TimeZoneOffset, "tz-offset", "+03:00", "Сдвиг по времени от UTC") flag.StringVar(&Duration, "tail-sleep", "5ms", "Задержка чтения входного файла. Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h')") + // flag.StringVar(&ExcludeEvent, "exclude-event", "", "Список событий исключенных из выдачи разделенных ';'") flag.Parse() @@ -1298,17 +1366,13 @@ func main() { LogFileExtention = os.Getenv("LOG_FILE_EXT") } - if os.Getenv("TEMP") != "" { - LogFileExtention = os.Getenv("LOG_FILE_EXT") - } + // if os.Getenv("TEMP") != "" { + // DirTemp = os.Getenv("TEMP") + // } if os.Getenv("DIR_LOG") != "" { DirLog = os.Getenv("DIR_LOG") } - - if os.Getenv("TEMP") != "" { - LogFileExtention = os.Getenv("LOG_FILE_EXT") - } // Проверяем параметры и переменные окружения для работы с ElasticSearch / OpenSearch if SendToEs { @@ -1365,6 +1429,8 @@ func main() { LocalTZOffset = TimeZoneOffset } + log.Println(DirCfg, FileCfg, DirIn, DirOut, DirTemp, DirLog, DictFile) + // Запуск как сервиса if RunAsWinService { RunWinService() @@ -1372,4 +1438,4 @@ func main() { runOperations() } log.Println("Работа программы завершена") -} +}