Compare commits

...

2 Commits

2 changed files with 161 additions and 75 deletions

View File

@ -2,7 +2,8 @@
Программа для обработки журналов 1С.
В качестве входных данных используются файлы журналов "20230126000001.lgp". В качестве словаря для подстановки значений используется файл "1Cv8.lgf".
В качестве входных данных используются файлы журналов "20230126000001.lgp" (Для обработки берется файл имя которого совпадает с текущей датой).
В качестве словаря для подстановки значений используется файл "1Cv8.lgf".
Обработанные данные могут быть сохранены в файл как в JSON так и в CSV форматах. Также возможна отправка обработанных данных в ElasticSearch.
@ -12,10 +13,14 @@
С целью ускорения работы запуск процесса отправки записей в эластик сделан параллельно, т.е. каждая запись отправляется в отдельном потоке (goroutine) поэтому для исключения переполнения буфера соединений в процедуру чтения файла (func tail()) введена задержка в 5 миллисекунд (time.Sleep(5 * time.Millisecond)). Это костыль и пока нормального решения я не придумал.
Типы ообъектов в словаре
1 пользователи; 2 компьютеры; 3 приложения; 4 события; 5 метаданные; 6 серверы; 7 основные порты; 8 вспомогательные порты.
## Опции командной строки
```
Usage of log-processor:
Usage of /tmp/go-build847101717/b001/exe/log-processor:
-config string
Файл настроек (default "config.ini")
-create-config
@ -85,6 +90,13 @@ log-processor -dir-config ~/tmp/123 -config processor.ini -create-config
Программа позволяет отправлять обработанные данные в ElasticSearch/OpenSearch опция "-es-send", сохранять в файлах формата csv и json: "-write-out-file" и "-out-format" соответсвенно, выводить в консоль - "-debug". Для всех операций используются соответствующие ключи запуска или параметры в файле настроек.
Для уменьшения количества записей можно задать список событий, которые будут исключены из итоговой выдачи (как в файл так и в эластик), разделенный точкой с запятой. Для этого используется параметр в файле конфигурации "excludeEvent":
```
excludeEvent=_$Transaction$_.Begin;\
_$Transaction$_.Commit
```
## Запуск как служба Windows
Перед запуском службу следует зарегистрировать в системе средствами Windows:
@ -96,6 +108,12 @@ r-config C:\1c-logprocessor\.config\ -run-win-service" DisplayName="1C log proce
Пути можно указать свои. Управление службой осуществляется через штатную оснастку системы.
## Автоматическая сборка и доставка
Для автоматизации сборки и доставки программы используется Gitlab CI.
При изменении файлов в репозитории будет запущен процесс сборки исполняемых файлов под разные платформы. После этого (при условии успешной сборки) файлы будут скопированы на WWW-сервис https://files.corp.samsonopt.ru и доступны по [ссылке](https://files.corp.samsonopt.ru/bin/1c-logprocessor/).
## Пример конфигурационного файла
```
@ -141,4 +159,6 @@ timeZoneOffset=+03:00
; Задержка чтения входного файла между строками
; Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h'
readDuration=5ms
; Список событий (поле "Event"), исключенных из выдачи, разделенный ";"
excludeEvent=
```

View File

@ -5,7 +5,6 @@
// ----------------------------------------------------------------------
// Лицензия: GPL(v3)
//------------------------- ------------------------------------------------
package main
import (
@ -25,10 +24,10 @@ import (
// "github.com/hpcloud/tail"
"net"
"sort"
"crypto/tls"
"net/http"
"context"
"gopkg.in/ini.v1"
"crypto/tls"
"net/http"
"context"
"gopkg.in/ini.v1"
opensearch "github.com/opensearch-project/opensearch-go/v2"
opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"github.com/kardianos/service"
@ -66,6 +65,7 @@ var (
DictFile string
TailSleep time.Duration
RunAsWinService bool
ExcludeEvent []string
)
type Config struct {
@ -88,7 +88,8 @@ type Config struct {
OutFormat string `ini:"outFormat"`
WriteOutFile bool `ini:"writeOutFile"`
TimeZoneOffset string `ini:"timeZoneOffset"`
Duration string `ini:"Duration"`
Duration string `ini:"Duration"`
ExcludeEvent []string `ini:"excludeEvent"`
}
type Key struct {
@ -233,6 +234,8 @@ timeZoneOffset=+03:00
; Задержка чтения входного файла между строками
; Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h'
readDuration=5ms
; Список событий (поле "Event"), исключенных из выдачи, разделенный ";"
excludeEvent=
`
// Список файлов в каталоге
@ -241,11 +244,18 @@ func getDirectoryContents(dir string) []string {
entries, err := os.ReadDir(dir);
if err != nil {
log.Println("Ошибка получения содержимого каталога: ", dir, err);
}
} else {
if Debug {
log.Println("Get the directory content:", dir)
}
}
//iterate through the directory entities and print out their name.
for _, entry := range(entries) {
if filepath.Ext(entry.Name()) == ".lgp" {
if filepath.Ext(entry.Name()) == LogFileExtention {
// fmt.Println(entry.Name());
if Debug {
log.Println("Finded file:", entry.Name())
}
names = append(names, entry.Name())
}
}
@ -261,19 +271,42 @@ func getLastModifyFile(dir string) []string {
var modTime time.Time
var names []string
for _, fi := range files {
// fmt.Println(fi)
// // fmt.Println(fi)
// names = append(names, fi.Name())
fileInfo, _ := fi.Info()
if fileInfo.Mode().IsRegular() {
// fmt.Println(fi, fileInfo.ModTime())
if !fileInfo.ModTime().Before(modTime) {
if fileInfo.ModTime().After(modTime) {
modTime = fileInfo.ModTime()
names = names[:0]
if Debug {
log.Println(LogFileExtention, filepath.Ext(fi.Name()))
}
if filepath.Ext(fi.Name()) == LogFileExtention {
fileInfo, _ := fi.Info()
if Debug {
log.Println("Finding file info:", fileInfo)
}
if fileInfo.Mode().IsRegular() {
if !fileInfo.ModTime().Before(modTime) {
if fileInfo.ModTime().After(modTime) {
modTime = fileInfo.ModTime()
names = names[:0]
}
// if filepath.Ext(fi.Name()) == ".lgp" {
dt := time.Now()
// Сравниваем имя последнего файла с текущей датой
// и если равны - запускаем процесс разборки
if getDate(fi.Name()) == dt.Format("2006-01-02") {
names = append(names, fi.Name())
log.Println("Текущая дата и дата файла совпадают:", getDate(fi.Name()), dt.Format("2006-01-02"))
}
// }
}
if filepath.Ext(fi.Name()) == ".lgp" {
names = append(names, fi.Name())
if Debug {
log.Println("Finding file modify time:", fileInfo.ModTime())
log.Println("Last file modify time:", modTime)
log.Println("Current time:", fileInfo.ModTime().Before(modTime), fileInfo.ModTime().After(modTime))
}
} else {
if Debug {
log.Println("Finding file was not regular:", fi)
}
}
}
}
@ -339,6 +372,10 @@ func readDictFile(fileName string) map[int]map[int]string {
if err != nil {
log.Printf("error opening file: %v\n",err)
os.Exit(1)
} else {
if Debug {
log.Println("Opening file:", fileName)
}
}
r := bufio.NewReader(f)
s, e := Readln(r)
@ -460,7 +497,9 @@ func createOutFile(fileName string) *os.File {
func writeOutCSVFile(outChannel *os.File, m map[string]string) {
var result string
if len(m) == 0 {
return
}
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
@ -506,6 +545,9 @@ func writeOutCSVFile(outChannel *os.File, m map[string]string) {
func writeOutJSONFile(outChannel *os.File, line map[string]string) {
// fmt.Println(">>>>>>>>>",line)
// a_json, err := json.Marshal(line)
if len(line) == 0 {
return
}
a_json, err := json.MarshalIndent(line, "", " ")
if err != nil {
log.Printf("Error encoding: %s", line, "with error %s", err)
@ -646,16 +688,19 @@ func deleteTempFile(tempFile *os.File, fileName string) {
log.Println("Удалён временный файл:", tempFileName)
}
}
func ParseString (outLine string, logOffset int64) map[string]string {
// (\d{14}),(\w{1}),\{(\w+),(?\w+)\},(\d+),(\d+),(\d+),(\d+),(\d+),(\w{1}),\"(.*)\",(\d+),\{(.+)\},\"(.*)\",(\d+),(\d+),(\d+),(\d+),(\d{1}),\{(\d|,)+\}
func parseString (outLine string, logOffset int64) map[string]string {
var (
resultData string
result map[string]string
)
result = nil
regexpLogRecord := regexp.MustCompile(`(?P<DateTime>\d{14}),(?P<TranStatus>\w{1}),\{(?P<TranDuration>\w+),(?P<TranNumber>\w+)\},(?P<Usr>\d+),(?P<ComputerName>\d+),(?P<ApplicationName>\d+),(?P<ConnectionID>\d+),(?P<Event>\d+),(?P<Importance>\w{1}),\"(?P<Comment>.*)\",(?P<Metadata>\d+),\{(?P<Data>.+)\},\"(?P<PresentData>.*)\",(?P<Server>\d+),(?P<Port>\d+),(?P<AddPort>\d+),(?P<SessionID>\d+),(?P<OtherData1>\d{1}),\{(\d|,)+\}$`)
// Полную строку прогоняем через регексп и распихиваем сразу по полям
matchedOut := regexpLogRecord.MatchString(outLine)
// fmt.Println(matchedOut)
if Debug {
log.Println(outLine)
}
if matchedOut {
findedData := regexpLogRecord.FindStringSubmatch(outLine)
if findedData != nil {
@ -695,6 +740,15 @@ func ParseString (outLine string, logOffset int64) map[string]string {
resultData = getDataFromDict(4, findedData[i])
if resultData != "not" {
findedData[i] = resultData
// log.Println(">>>>>>>>",findedData[i], ExcludeEvent)
for _, word := range ExcludeEvent {
if resultData == word {
if Debug {
log.Println("The event", resultData, "coincides with the specified filter", word)
}
return nil
}
}
}
case "Metadata":
resultData = getDataFromDict(5, findedData[i])
@ -769,14 +823,15 @@ func runOperations () {
log.Println("Новых файлов не найдено")
return
}
for _, fileName := range lastModifyFiles {
for _, fileName := range lastModifyFiles {
DictObjects = readDictFile(DictFile)
log.Println("Запускаем обработку файла", fileName)
if SendToEs {
EsIndexName = EsIndexPrefix + "-" + getDate(fileName)
client = esConnect()
esCreateIndex(EsIndexName, client)
}
tail(client, fileName, os.Stdout)
go tail(client, fileName, os.Stdout)
}
// Получаем список новых файлов, и если они появились после запуска программы
@ -790,7 +845,8 @@ func runOperations () {
continue
}
if newFilesList[0] != "" {
for _, fileName := range newFilesList {
for _, fileName := range newFilesList {
DictObjects = readDictFile(DictFile)
if SendToEs {
EsIndexName = EsIndexPrefix + "-" + getDate(fileName)
// esConnect()
@ -800,7 +856,7 @@ func runOperations () {
// обработанных файлов для корректного определения новых файлов
filesList = append(filesList, fileName)
// запускаем процесс чтения (обработки)
tail(client, fileName, os.Stdout)
go tail(client, fileName, os.Stdout)
}
} else {
log.Println("Новых файлов не найдено")
@ -814,15 +870,15 @@ func runOperations () {
func tail(client *opensearch.Client, fileName string, out io.Writer) {
var shutUp = false
var (
recBegin bool
recBegin = false
outLine string
l string
bulkCount int
arrRecords []map[string]string
// bulkCount int
// arrRecords []map[string]string
fOut *os.File
)
bulkCount = 0
// bulkCount = 0
filesList := getDirectoryContents(DirIn)
@ -870,8 +926,17 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) {
oldSize := info.Size()
regexpBeginRecord := regexp.MustCompile(`^\{\d{14},\w{1},$`)
regexpEndRecord := regexp.MustCompile(`^\},$`)
// var (
// isPrefix bool = true
// // err error = nil
// line []byte
// )
for {
for line, prefix, err := r.ReadLine(); err != io.EOF; line, prefix, err = r.ReadLine() {
line, err := Readln(r)
// for line, _, err := r.ReadLine(); err != io.EOF; line, _, err = r.ReadLine() {
for err == nil {
// fmt.Println(line)
// определяем текущую позицию записи в файле и пишем в файл
logOffset, _ := f.Seek(0, os.SEEK_CUR)
writeTempFile(tempFile, logOffset)
@ -885,15 +950,20 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) {
}
if recBegin {
outLine = outLine + l
// outLine = outLine + strings.TrimSpace(l)
}
// Находим конец записи
matchedEnd := regexpEndRecord.MatchString(l)
// fmt.Println(l)
// fmt.Println(recBegin, matchedEnd)
// fmt.Println(outLine)
if matchedEnd {
recBegin = false
outLine = strings.TrimSuffix(strings.TrimLeft(outLine, "{,"), "},")
// Парсим подготовленную строку
result := ParseString(outLine, logOffset)
result := parseString(outLine, logOffset)
if Debug {
log.Println(len(result), result)
}
// Пишем выходные файлы
if WriteOutFile {
if OutFormat == "json" {
@ -904,46 +974,38 @@ func tail(client *opensearch.Client, fileName string, out io.Writer) {
}
// Отправляем данные в ElasticSearch / OpenSearch
if SendToEs {
if result["index_day_prefix"] == "" {
continue
}
// if result["index_day_prefix"] == "" {
// continue
// }
indexName := EsIndexPrefix + "-" + result["index_day_prefix"]
// пакетная отправка данных в эластик
if EsBulk {
if bulkCount < EsBulkRecordsQuantity {
bulkCount++
// arrRecords = append(arrRecords, result)
} else {
bulkCount = 0
// esAddRecordsBulk(client, indexName, arrRecords)
}
arrRecords = append(arrRecords, result)
} else {
// addRes := esAddRecord(client, indexName, result)
go esAddRecord(client, indexName, result)
time.Sleep(TailSleep)
// time.Sleep(5 * time.Millisecond)
// addRes := esAddRecord(client, indexName, result)
go esAddRecord(client, indexName, result)
time.Sleep(TailSleep)
// time.Sleep(5 * time.Millisecond)
// Если запись не добавилась делаем повторную попытку через 5 сек.
// if !addRes {
// time.Sleep(5 * time.Second)
// addRes = esAddRecord(client, indexName, result)
// log.Println("Повторная попытка добавления записи:", result, addRes)
// }
}
// Если запись не добавилась делаем повторную попытку через 5 сек.
// if !addRes {
// time.Sleep(5 * time.Second)
// addRes = esAddRecord(client, indexName, result)
// log.Println("Повторная попытка добавления записи:", result, addRes)
// }
}
writeTempFile(tempFile, logOffset)
if Debug {
fmt.Println(result)
log.Println(outLine)
log.Println(result)
}
outLine = ""
result = nil
}
if prefix {
// fmt.Fprint(out, string(line))
} else {
// fmt.Fprintln(out, string(line))
}
// if prefix {
// fmt.Println(">",string(line))
// } else {
// fmt.Println(">>",string(line))
// }
// --------------------------------
line, err = Readln(r)
}
pos, err := f.Seek(0, io.SeekCurrent)
if err != nil {
@ -1098,12 +1160,17 @@ func esAddRecordsBulk(client *opensearch.Client, indexName string, arrLine []map
func esAddRecord(client *opensearch.Client, indexName string, line map[string]string) bool {
// fmt.Println(">>>>>>>>>",indexName)
// a_json, err := json.Marshal(line)
if len(line) == 0 {
return false
}
var operResult bool
aJson, err := json.MarshalIndent(line, "", " ")
if err != nil {
log.Printf("Error encoding: %s", line, "with error %s", err)
}
if Debug {
fmt.Println(line)
}
document := strings.NewReader(string(aJson))
req := opensearchapi.IndexRequest{
@ -1160,6 +1227,7 @@ func readConfigFile(fileName string) {
TimeZoneOffset = cfg.Section("Processing").Key("timeZoneOffset").String()
Duration = cfg.Section("Processing").Key("readDuration").String()
ExcludeEvent = strings.Split(cfg.Section("Processing").Key("excludeEvent").String(), ";")
}
// Создание файла конфигурации
@ -1228,7 +1296,6 @@ func RunWinService() {
}
}
func main() {
// Читаем опции коммандной строки
flag.BoolVar(&RunAsWinService, "run-win-service", false, "Запуск как службы Windows")
@ -1256,6 +1323,7 @@ func main() {
flag.StringVar(&ObjectTypes, "object-types", "1,2,3,4,5,6,7,8", "Список типов объектов словаря для выборки, разделённый запятой (OBJECT_TYPES)")
flag.StringVar(&TimeZoneOffset, "tz-offset", "+03:00", "Сдвиг по времени от UTC")
flag.StringVar(&Duration, "tail-sleep", "5ms", "Задержка чтения входного файла. Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h')")
// flag.StringVar(&ExcludeEvent, "exclude-event", "", "Список событий исключенных из выдачи разделенных ';'")
flag.Parse()
@ -1298,17 +1366,13 @@ func main() {
LogFileExtention = os.Getenv("LOG_FILE_EXT")
}
if os.Getenv("TEMP") != "" {
LogFileExtention = os.Getenv("LOG_FILE_EXT")
}
// if os.Getenv("TEMP") != "" {
// DirTemp = os.Getenv("TEMP")
// }
if os.Getenv("DIR_LOG") != "" {
DirLog = os.Getenv("DIR_LOG")
}
if os.Getenv("TEMP") != "" {
LogFileExtention = os.Getenv("LOG_FILE_EXT")
}
// Проверяем параметры и переменные окружения для работы с ElasticSearch / OpenSearch
if SendToEs {
@ -1365,6 +1429,8 @@ func main() {
LocalTZOffset = TimeZoneOffset
}
log.Println(DirCfg, FileCfg, DirIn, DirOut, DirTemp, DirLog, DictFile)
// Запуск как сервиса
if RunAsWinService {
RunWinService()
@ -1372,4 +1438,4 @@ func main() {
runOperations()
}
log.Println("Работа программы завершена")
}
}