From e1271356b5d2fbf7b3111f2d2cba18cc51797b6f Mon Sep 17 00:00:00 2001 From: svkalinin Date: Thu, 9 Mar 2023 16:10:55 +0300 Subject: [PATCH] first commit --- README.md | 144 +++++ go.mod | 18 + go.sum | 77 +++ log-processor.go | 1387 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 1626 insertions(+) create mode 100644 README.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 log-processor.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..c5242a5 --- /dev/null +++ b/README.md @@ -0,0 +1,144 @@ +## О программе + +Программа для обработки журналов 1С. + +В качестве входных данных используются файлы журналов "20230126000001.lgp". В качестве словаря для подстановки значений используется файл "1Cv8.lgf". + +Обработанные данные могут быть сохранены в файл как в JSON так и в CSV форматах. Также возможна отправка обработанных данных в ElasticSearch. + +Программа работает как в Linux так и Windows. + +!!! Внимание !!! + +С целью ускорения работы запуск процесса отправки записей в эластик сделан параллельно, т.е. каждая запись отправляется в отдельном потоке (goroutine) поэтому для исключения переполнения буфера соединений в процедуру чтения файла (func tail()) введена задержка в 5 миллисекунд (time.Sleep(5 * time.Millisecond)). Это костыль и пока нормального решения я не придумал. + +## Опции командной строки + +``` +Usage of /tmp/go-build847101717/b001/exe/log-processor: + -config string + Файл настроек (default "config.ini") + -create-config + Создать файл настроек + -debug + Выводить данные в консоль + -dict-file string + Файл со словарём (DICT_FILE) (default "1Cv8.lgf") + -dir-config string + Каталог для файлов настройки (DIR_CFG) (default ".config") + -dir-in string + Каталог для исходных файлов (DIR_IN) (default "in") + -dir-log string + Каталог для лога работы (DIR_LOG) (default "log") + -dir-out string + Каталог для обработанных файлов (DIR_OUT) (default "out") + -dir-temp string + Каталог для временных файлов (TEMP) (default "tmp") + -es-bulk + Пакетная вставка записей в Elastic Search + -es-bulk-quantity int + Количество записей в одном запросе для пакетной вставки (default 10) + -es-index-prefix string + Префикс имени индекса Elastic Search (ELASTICSEARCH_INDEX_PREFIX) + -es-password string + Пароль пользователя Elastic Search (ELASTICSEARCH_PASSWORD) + -es-send + Отправлять данные в ElasticSearch + -es-skip-verify + Пропустить проверку сертификатов при подключении к Elastic Search + -es-url string + Адрес узла Elastic Search (ELASTICSEARCH_URL) + -es-user string + Имя пользователя Elastic Search (ELASTICSEARCH_USER) + -log-file-ext string + Расширение файлов с журналами (обрабатываемых логов) (LOG_FILE_EXT) (default ".lgp") + -object-types string + Список типов объектов словаря для выборки, разделённый запятой (OBJECT_TYPES) (default "1,2,3,4,5,6,7,8") + -out-format string + Формат данных на выходе (csv, json) (OUT_FORMAT) (default "csv") + -run-win-service + Запуск как службы Windows + -tail-sleep string + Задержка чтения входного файла. Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h') (default "5ms") + -tz-offset string + Сдвиг по времени от UTC (default "+03:00") + -write-out-file + Запись обработанных данных в файл +``` + +## Настройка + +Передача параметров для работы программы возможна через файл настроек, через командную строку и через переменные окружения (но не все). +Приоритетным является конфигурационный файл. Т.е. при запуске проверяется наличие файла конфигурации, и в случае если его нет проверяются параметры командной строки, если опции командной строки не заданы то проверяются переменные окружения. + +Для создания файла настроек программу следует запустить с указанием опций: + +``` +log-processor -dir-config ~/tmp/123 -config processor.ini -create-config +``` + +Пути к каталогам и файлам можно переопределить через указание соответствующих параметров при запуске. После правки параметров порграмму можно запускать. + +## Использование + +При первом запуске будут созданы недостающие каталоги (в случае отсутствия). После запуска программа получает последний файл (по дате модификации) в каталоге заданном опцией "-dir-in". И запускает потоковый процесс обработки данных. По мере чтения файла, производится проверка входного каталога на наличие новых файлов, и при обнаружении оного процесс чтения открытого файла будет остановлен и запущена обработка нового. + +Позиция указателя в открытом файле сохраняется во временном файле типа "20230126000001.lgp.tmp", который будет удален при нормальном завершении работы с файлом. При ручном прерывании процесса обработки или аварийном завершении программы и последующем запуске процесс чтения файла (и запись выходного файла) будет продолжен с того места на котором произошло завершение. + +Программа позволяет отправлять обработанные данные в ElasticSearch/OpenSearch опция "-es-send", сохранять в файлах формата csv и json: "-write-out-file" и "-out-format" соответсвенно, выводить в консоль - "-debug". Для всех операций используются соответствующие ключи запуска или параметры в файле настроек. + +## Запуск как служба Windows + +Перед запуском службу следует зарегистрировать в системе средствами Windows: + +``` +sc.exe create "1C log processor" binPath="c:\1c-logprocessor\1c-logprocessor.exe -di +r-config C:\1c-logprocessor\.config\ -run-win-service" DisplayName="1C log processor" type=own start=auto +``` + +Пути можно указать свои. Управление службой осуществляется через штатную оснастку системы. + +## Пример конфигурационного файла + +``` +[General] +; Каталог для входящих данных +dirIn=in +; Каталог для исходящих данных +dirOut=out +; Каталог для временных файлов +dirTemp=tmp +; Каталог для журнала работы программы +dirLog=log +; Имя файла словаря +dictFile=1Cv8.lgf +; Список объектов для выборки из словаря +objectTypes=1,2,3,4,5,6,7,8 +; Расширение файлов журнало 1С (входящих) +logFileExtention=.lgp + +[ElasticSearch] +; Включене отправки данных в ElasticSearch +sendToEs=false +; Адрес сервера ElasticSearch +esUrl=https://elastic:9200 +; Пользователь для подключсения к ElasticSearch +esUser=user_name +; Пароль пользователя ElasticSearch +esPassword=user_password +; Отключить проверку SSL сертификатов при подключении +esSkipVerify=false +; Перфикс индекса в ElasticSearch +esIndexPrefix=test_log + +[Processing] +; Включение вывода обработанной информации в файл +writeOutFile=false +; Формат выходного файла +outFormat=csv +; Сдвиг времени (временная зона) +timeZoneOffset=+03:00 +; Задержка чтения входного файла между строками +; Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h' +readDuration=5ms +``` diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d7c2ba6 --- /dev/null +++ b/go.mod @@ -0,0 +1,18 @@ +module log-reader + +go 1.19 + +require github.com/hpcloud/tail v1.0.0 + +// replace github.com/hpcloud/tail v1.0.0 => /home/svkalinin/Проекты/git-repo-copy/tail + +require ( + github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/kardianos/service v1.2.2 // indirect + github.com/opensearch-project/opensearch-go/v2 v2.2.0 // indirect + golang.org/x/sys v0.4.0 // indirect + gopkg.in/fsnotify.v1 v1.4.7 // indirect + gopkg.in/fsnotify/fsnotify.v1 v1.4.7 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..180605e --- /dev/null +++ b/go.sum @@ -0,0 +1,77 @@ +github.com/aws/aws-sdk-go v1.44.180/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/aws/aws-sdk-go-v2 v1.17.3/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2/config v1.18.8/go.mod h1:5XCmmyutmzzgkpk/6NYTjeWb6lgo9N170m1j6pQkIBs= +github.com/aws/aws-sdk-go-v2/credentials v1.13.8/go.mod h1:lVa4OHbvgjVot4gmh1uouF1ubgexSCN92P6CJQpT0t8= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.21/go.mod h1:ugwW57Z5Z48bpvUyZuaPy4Kv+vEfJWnIrky7RmkBvJg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.27/go.mod h1:a1/UpzeyBBerajpnP5nGZa9mGzsBn5cOKxm6NWQsvoI= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.21/go.mod h1:+Gxn8jYn5k9ebfHEqlhrMirFjSW0v0C9fI+KN5vk2kE= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28/go.mod h1:yRZVr/iT0AqyHeep00SZ4YfBAKojXz08w3XMBscdi0c= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.21/go.mod h1:lRToEJsn+DRA9lW4O9L9+/3hjTkUzlzyzHqn8MTds5k= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.0/go.mod h1:wo/B7uUm/7zw/dWhBJ4FXuw1sySU5lyIhVg1Bu2yL9A= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.0/go.mod h1:TZSH7xLO7+phDtViY/KUp9WGCJMQkLJ/VpgkTFd5gh8= +github.com/aws/aws-sdk-go-v2/service/sts v1.18.0/go.mod h1:+lGbb3+1ugwKrNTWcf2RT05Xmp543B06zDFTwiTLp7I= +github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/kardianos/service v1.2.2 h1:ZvePhAHfvo0A7Mftk/tEzqEZ7Q4lgnR8sGz4xu1YX60= +github.com/kardianos/service v1.2.2/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM= +github.com/opensearch-project/opensearch-go/v2 v2.2.0 h1:6RicCBiqboSVtLMjSiKgVQIsND4I3sxELg9uwWe/TKM= +github.com/opensearch-project/opensearch-go/v2 v2.2.0/go.mod h1:R8NTTQMmfSRsmZdfEn2o9ZSuSXn0WTHPYhzgl7LCFLY= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201015000850-e3ed0017c211/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/fsnotify/fsnotify.v1 v1.4.7 h1:XNNYLJHt73EyYiCZi6+xjupS9CpvmiDgjPTAjrBlQbo= +gopkg.in/fsnotify/fsnotify.v1 v1.4.7/go.mod h1:Fyux9zXlo4rWoMSIzpn9fDAYjalPqJ/K1qJ27s+7ltE= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/log-processor.go b/log-processor.go new file mode 100644 index 0000000..32517be --- /dev/null +++ b/log-processor.go @@ -0,0 +1,1387 @@ +// ---------------------------------------------------------------------- +// Программа для анализа и записи журналов 1С +// Версия: 1.0.0 +// Автор: Сергей Калинин svk@nuk-svk.ru https://git.nuk-svk.ru/svk/ +// ---------------------------------------------------------------------- +// Использование log-processor: +// -dict-file string +// Файл со словарём (DICT_FILE) (default "1Cv8.lgf") +// -dir-in string +// Каталог для исходных файлов (DIR_IN) (default "in") +// -dir-log string +// Каталог для лога работы (DIR_LOG) (default "log") +// -dir-out string +// Каталог для обработанных файлов (DIR_OUT) (default "out") +// -dir-temp string +// Каталог для временных файлов (TEMP) (default "/tmp") +// -log-file-ext string +// Расширение файлов с журналами (обрабатываемых логов) (LOG_FILE_EXT) (default ".lgp") +// -object-types string +// Список типов объектов для выборки разделённый запятой (OBJECT_TYPES) (default "1,2,3,4,5,6,7,8") +// -out-format string +// Выводить данные в формате JSON (OUT_FORMAT) (default "json") +// -send-to-es +// Отправлять данные в ElasticSearch (SEND_TO_ES) +// ------------------------------------------------------------------------- +// ELASTICSEARCH_URL="https://user:pass@elastic:9200" + +//------------------------- ------------------------------------------------ + +package main + +import ( + "bufio" + "fmt" + "log" + "os" + "flag" + "time" + "io" + // "runtime" + "path/filepath" + "regexp" + "strings" + "strconv" + "encoding/json" + // "github.com/hpcloud/tail" + "net" + "sort" + "crypto/tls" + "net/http" + "context" + "gopkg.in/ini.v1" + opensearch "github.com/opensearch-project/opensearch-go/v2" + opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" + "github.com/kardianos/service" +) +var ( + DirCfg string + FileCfg string + CreateFileCfg bool + DirIn string + DirOut string + DirTemp string + DirLog string + ObjectTypes string + DictObjects map[int]map[int]string + OpenedFiles map[string]chan string + LogFileExtention string + SendToEs bool + EsUrl string + EsUser string + EsPassword string + EsIndexPrefix string + EsIndexName string + EsSkipVerify bool + Debug bool + OutFormat string + WriteOutFile bool + EsClient *opensearch.Client + EsBulk bool + EsBulkRecordsQuantity int + LocalTZOffset string + LocalTimeZone string + TimeZoneOffset string + Duration string + DictFile string + TailSleep time.Duration + RunAsWinService bool +) + +type Config struct { + DirCfg string `ini:"dirCfg"` + DirIn string `ini:"dirIn"` + DirOut string `ini:"dirOut"` + DirTemp string `ini:"dirTemp"` + DirLog string `ini:"dirLog"` + DictFile string `ini:"dictFile"` + ObjectTypes string `ini:"objectTypes"` + LogFileExtention string `ini:"logFileExtention"` + SendToEs bool `ini:"sendToEs"` + EsUrl string `ini:"esUrl"` + EsUser string `ini:"esUser"` + EsPassword string `ini:"esPassword"` + EsIndexPrefix string `ini:"esIndexPrefix"` + EsSkipVerify bool `ini:"esSkipVerify"` + // EsBulk bool `ini:"esBulk"` + // EsBulkRecordsQuantity int `ini:"esBulkQuantity"` + OutFormat string `ini:"outFormat"` + WriteOutFile bool `ini:"writeOutFile"` + TimeZoneOffset string `ini:"timeZoneOffset"` + Duration string `ini:"Duration"` +} + +type Key struct { + ObjectType int + ObjectNumber int +} + +type OutRecord struct { + Timestamp string `json:"@timestamp"` + DateTime string `json:"log_datetime"` + TranNumber int `json:"TranNumber"` + TranDuration int `json:"TranDuration"` + TranStatus string `json:"TranStatus"` + Usr string `json:"Usr"` + ComputerName string `json:"ComputerName"` + ApplicationName string `json:"ApplicationName"` + ConnectionID int `json:"ConnectionID"` + Event string `json:"Event"` + Importance string `json:"Importance"` + Comment string `json:"Comment"` + Metadata string `json:"Metadata"` + Data string `json:"Data"` + PresentData string `json:"PresentData"` + Server string `json:"Server"` + Port int `json:"Port"` + AddPort int `json:"AddPort"` + SessionID int `json:"SessionID"` + ID string `json:"id"` + IndexDayPrefix string `json:"index_day_prefix"` + // OtherData1 string `json:"OtherData1"` + // OtherData2 string `json:"OtherData1"` +} + +var IndexTemplate = `{ + "settings" : { }, + "mappings" : { + "properties" : { + "@timestamp" : { + "type" : "date" + }, + "Comment" : { + "type" : "text" + }, + "AddPort" : { + "type" : "keyword" + }, + "ApplicationName" : { + "type" : "keyword" + }, + "TranDuration" : { + "type" : "long" + }, + "Server" : { + "type" : "keyword" + }, + "PresentData" : { + "type" : "text", + "fields" : { + "keyword" : { + "ignore_above" : 256, + "type" : "keyword" + } + } + }, + "Port" : { + "type" : "keyword" + }, + "Metadata" : { + "type" : "keyword" + }, + "Data" : { + "type" : "text" + }, + "Importance" : { + "type" : "keyword" + }, + "ConnectionID" : { + "type" : "keyword" + }, + "Usr" : { + "type" : "keyword" + }, + "ComputerName" : { + "type" : "keyword" + }, + "TranStatus" : { + "type" : "keyword" + }, + "Event" : { + "type" : "keyword" + }, + "TranNumber" : { + "type" : "keyword" + }, + "SessionID" : { + "type" : "keyword" + } + } + } +}` + +// Шаблон файла конфигурации +var ConfigTemplate = `[General] +; Каталог для входящих данных +dirIn=in +; Каталог для исходящих данных +dirOut=out +; Каталог для временных файлов +dirTemp=tmp +; Каталог для журнала работы программы +dirLog=log +; Имя файла словаря +dictFile=1Cv8.lgf +; Список объектов для выборки из словаря +objectTypes=1,2,3,4,5,6,7,8 +; Расширение файлов журнало 1С (входящих) +logFileExtention=.lgp + +[ElasticSearch] +; Включене отправки данных в ElasticSearch +sendToEs=false +; Адрес сервера ElasticSearch +esUrl=https://elastic:9200 +; Пользователь для подключсения к ElasticSearch +esUser=user_name +; Пароль пользователя ElasticSearch +esPassword=user_password +; Отключить проверку SSL сертификатов при подключении +esSkipVerify=false +; Перфикс индекса в ElasticSearch +esIndexPrefix=test_log + +[Processing] +; Включение вывода обработанной информации в файл +writeOutFile=false +; Формат выходного файла +outFormat=csv +; Сдвиг времени (временная зона) +timeZoneOffset=+03:00 +; Задержка чтения входного файла между строками +; Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h' +readDuration=5ms +` + +// Список файлов в каталоге +func getDirectoryContents(dir string) []string { + var names []string + entries, err := os.ReadDir(dir); + if err != nil { + log.Println("Ошибка получения содержимого каталога: ", dir, err); + } + //iterate through the directory entities and print out their name. + for _, entry := range(entries) { + if filepath.Ext(entry.Name()) == ".lgp" { + // fmt.Println(entry.Name()); + names = append(names, entry.Name()) + } + } + return names +} + +func getLastModifyFile(dir string) []string { + files, err := os.ReadDir(dir) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + var modTime time.Time + var names []string + for _, fi := range files { + // fmt.Println(fi) + // names = append(names, fi.Name()) + fileInfo, _ := fi.Info() + if fileInfo.Mode().IsRegular() { + // fmt.Println(fi, fileInfo.ModTime()) + if !fileInfo.ModTime().Before(modTime) { + if fileInfo.ModTime().After(modTime) { + modTime = fileInfo.ModTime() + names = names[:0] + } + if filepath.Ext(fi.Name()) == ".lgp" { + names = append(names, fi.Name()) + } + } + } + } + return names +} + +func getNewFilesFromDir(filesList []string, dir string) []string { + var names []string + newFilesList := getDirectoryContents(dir) + if len(newFilesList) > 0 { + // log.Println("Получаем список файлов", newFilesList) + for _, newFileName := range newFilesList { + if !implContains(filesList, newFileName) { + fmt.Println(newFileName, implContains(filesList, newFileName)) + names = append(names, newFileName) + } + } + } + return names +} + +// Readln returns a single line (without the ending \n) +// from the input buffered reader. +// An error is returned iff there is an error with the +// buffered reader. +func Readln(r *bufio.Reader) (string, error) { + var (isPrefix bool = true + err error = nil + line, ln []byte + ) + for isPrefix && err == nil { + line, isPrefix, err = r.ReadLine() + ln = append(ln, line...) + } + return string(ln),err +} + +func hex2int(hexStr string) int { + // base 16 for hexadecimal + result, err := strconv.ParseInt(hexStr, 16, 64) + if err != nil { + log.Printf("error converting value: %v, %v\n",hexStr, err) + return 0 + } + return int(result) +} + +func readDictFile(fileName string) map[int]map[int]string { + var ( + // arr map[Key]string + recBegin bool + // recEnd bool + outLine string + l string + res map[int]map[int]string + ) + + res = make(map[int]map[int]string) + + objectTypesList := strings.Split(ObjectTypes, ",") + + f, err := os.Open(fileName) + if err != nil { + log.Printf("error opening file: %v\n",err) + os.Exit(1) + } + r := bufio.NewReader(f) + s, e := Readln(r) + + // регулярное выражение для получения данных + regexpSplitData := regexp.MustCompile(`\{(?P\d+),(.*,|)(?P.+),(?P\d+)\},$`) + // регулярное выражение определения однострочной записи + regexpString := regexp.MustCompile(`^\{\d+,.+,\d+\},$`) + // регулярное выражение для многострочных записей соответстве нно начало строки и конец + // regexpMultilineBegin := regexp.MustCompile(`^.+(\},)$`) + regexpMultilineEnd := regexp.MustCompile(`^.+},$`) + + for e == nil { + // log.Println(s) + s,e = Readln(r) + + l = strings.TrimSpace(s) + // strings.TrimRight(outLine, ",") + + // regexp.MustCompile("[*,%_]{1}").Split(refString, -1) + + // Обработка многострочных записей + matchedString := regexpString.MatchString(l) + // matchedBegin := regexpMultilineBegin.MatchString(l) + matchedEnd := regexpMultilineEnd.MatchString(l) + // fmt.Println(matchedBegin) + if matchedString == true && recBegin == false{ + findedData := regexpSplitData.FindStringSubmatch(l) + if findedData != nil { + result := make(map[string]string) + for i, name := range regexpSplitData.SubexpNames() { + // log.Println(i, name, findedData[i]) + if i != 0 && name != "" { + result[name] = findedData[i] + } + } + if implContains(objectTypesList, result["objType"]) == false { + continue + } + objTypeInt, _ := strconv.Atoi(result["objType"]) + objNumberInt, _ := strconv.Atoi(result["objNumber"]) + if res[objTypeInt] == nil { + res[objTypeInt] = make(map[int]string) + } + res[objTypeInt][objNumberInt] = strings.Trim(result["objData"], "{}\"") + // log.Println(">>",objTypeInt, objNumberInt, result["objData"]) + result = nil + } + } + if matchedEnd == false { + recBegin = true + // log.Println("multilineline begin") + } + + if recBegin { + outLine = outLine + l + } + + if recBegin && matchedEnd { + recBegin = false + s = outLine + outLine = "" + findedData := regexpSplitData.FindStringSubmatch(s) + if findedData != nil { + result := make(map[string]string) + for i, name := range regexpSplitData.SubexpNames() { + // log.Println(i, name, findedData[i]) + if i != 0 && name != "" { + result[name] = findedData[i] + } + } + if implContains(objectTypesList, result["objType"]) == false { + continue + } + objTypeInt, _ := strconv.Atoi(result["objType"]) + objNumberInt, _ := strconv.Atoi(result["objNumber"]) + if res[objTypeInt] == nil { + res[objTypeInt] = make(map[int]string) + } + res[objTypeInt][objNumberInt] = strings.Trim(result["objData"], "{}") + // log.Println("<<",objTypeInt, objNumberInt, result["objData"]) + } + } + // time.Sleep(1 * time.Second) + } + return res +} + +// Создаем выходной файл. Если он есть открываем на запись +// и перемещаем указатель в конец файла +func createOutFile(fileName string) *os.File { + var ( + fOut *os.File + err error + ) + fileOutPath := filepath.Join(DirOut, fileName) + if fileExists(fileOutPath) { + fOut, err = os.OpenFile(fileOutPath, os.O_RDWR, 0644) + if err != nil { + log.Println("Open out file", fileOutPath, "error:", err) + } + } else { + fOut, err = os.Create(fileOutPath) + if err != nil { + log.Println("Create out file", fileOutPath, "error:", err) + } + } + // defer fOut.Close() + + fOutInfo, _ := os.Stat(fileOutPath) + fOutSize := fOutInfo.Size() + + _, err = fOut.Seek(fOutSize, 0) + if err != nil { + log.Printf("Seek error on %s: %s", fileOutPath, err) + } + return fOut +} + +func writeOutCSVFile(outChannel *os.File, m map[string]string) { + var result string + + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + + for _, k := range keys { + // strings.Join(m[k], ";") + result = result + m[k] + result = result + ";" + } + // fmt.Println(result) + result = result + "\n" + _, err := outChannel.WriteString(result) + if err != nil { + log.Println("Ошибка записи в выходной файл:", err) + } +} + +// func writeOutCSVHeader(outChannel *os.File) { + // var result string + // var m OutRecord + // keys := make([]string, 0, len(m)) + // for k := range m { + // keys = append(keys, k) + // } + // sort.Strings(keys) + // + // for _, k := range keys { + // // strings.Join(m[k], ";") + // result = result + k + // result = result + ";" + // } + // // fmt.Println(result) + // result = result + "\n" + // _, err := outChannel.WriteString(result) + // if err != nil { + // log.Println("Ошибка записи в выходной файл:", err) + // } +// } +// + +func writeOutJSONFile(outChannel *os.File, line map[string]string) { + // fmt.Println(">>>>>>>>>",line) + // a_json, err := json.Marshal(line) + a_json, err := json.MarshalIndent(line, "", " ") + if err != nil { + log.Printf("Error encoding: %s", line, "with error %s", err) + } + + if err != nil{ + log.Printf("Error encoding query: %s", err) + } + outChannel.Write(a_json) +} + +func getDataFromDict(dictType int, dictIndex string) string { + i, err := strconv.Atoi(dictIndex) + if err != nil { + log.Println("func getDataFromDict, string to int converted error:", dictIndex) + return "not" + } + return DictObjects[dictType][i] +} + +// Функция возвращает дату в формате YYYY-MM-DD +func getDate(dateString string) string { + t := fmt.Sprintf("%s-%s-%s", dateString[0:4], dateString[4:6], dateString[6:8]) + return t +} + +// Получение временной зоны и часового пояса +// например: MSK, +03:00 +func getLocalTimeZone() (string, string) { + zone, _ := time.Now().Zone() + zoneBounds, _ := time.Now().ZoneBounds() + l := strings.Fields(zoneBounds.String()) + zoneOffsetArr := strings.Split(l[2], "") + zoneOffset := fmt.Sprintf("%s%s%s:%s%s", zoneOffsetArr[0], zoneOffsetArr[1], zoneOffsetArr[2], zoneOffsetArr[3], zoneOffsetArr[4]) + // fmt.Println(zone) + // fmt.Println(zoneOffset) + return zone, zoneOffset +} + +// Перобразование строки с датой в отпечаток времени (timestamp) для Elasticsearch +// "2023-01-03T10:15:30+03:00" +func getTimestamp(dateString string) string { + // t := fmt.Sprintf("%s-%s-%s %s:%s:%s %s", dateString[0:4], dateString[4:6], dateString[6:8], dateString[8:10], dateString[10:12], dateString[12:14], zone) + // t := fmt.Sprintf("%s-%s-%sT%s:%s:%sZ%s", dateString[0:4], dateString[4:6], dateString[6:8], dateString[8:10], dateString[10:12], dateString[12:14], LocalTZOffset) + t := fmt.Sprintf("%s-%s-%sT%s:%s:%s%s", dateString[0:4], dateString[4:6], dateString[6:8], dateString[8:10], dateString[10:12], dateString[12:14], LocalTZOffset) + // return "2023-01-03T10:15:30+03:00" + return t +} + +// Получение времени длительности транзакции +func getDuration(transactionTime int, recordTimeStamp string) string { + if transactionTime == 0 { + return "0" + } + beginTime := time.Unix(int64(transactionTime), 0) + zone, _ := beginTime.Zone() + // fmt.Println("The second time is", beginTime, zone, offset) + + t := fmt.Sprintf("%s-%s-%s %s:%s:%s %s", recordTimeStamp[0:4], recordTimeStamp[4:6], recordTimeStamp[6:8], recordTimeStamp[8:10], recordTimeStamp[10:12], recordTimeStamp[12:14], zone) + endTime, err := time.Parse("2006-01-02 15:04:05 MST", t) + + if err != nil { + log.Println("Ошибка преобразования времени 'func getDuration()':", endTime, err) + } + duration := endTime.Unix() - int64(transactionTime) + // fmt.Println(transactionTime, recordTimeStamp, t, duration) + return strconv.Itoa(int(duration)) +} + +func fileExists(filename string) bool { + info, err := os.Stat(filename) + if os.IsNotExist(err) { + return false + } + return !info.IsDir() +} + +func directoryExists(dir string) bool { + info, err := os.Stat(dir) + if os.IsNotExist(err) { + return false + } + return info.IsDir() +} + +func createTempFile(fileName string) (*os.File, int64) { + var ( + tempFile *os.File + offsetPosition int64 + ) + // Создаем временный файл для сохранения текущей позиции в обрабатываемом файле + tempFileName := fileName + ".tmp" + tempFileFullPath := filepath.Join(DirTemp, tempFileName) + + // tempFile, err := os.OpenFile(tempFileFullPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + tempFile, err := os.OpenFile(tempFileFullPath, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + log.Println("Open temp file", tempFileFullPath, "error:", err) + } + // tempFile, err := os.OpenFile(tempFileFullPath, os.O_RDWR, 0644) + tempFileBuf := bufio.NewReader(tempFile) + offsetPositionStr, err := Readln(tempFileBuf) + // fmt.Println("--->", offsetPositionStr, "<---") +// os.Exit(1) + if offsetPositionStr != "" { + offsetPosition, _ = strconv.ParseInt(offsetPositionStr, 10, 64) + if err != nil { + log.Println("Unable convert value:", offsetPositionStr, err) + } + } + // defer tempFile.Close() + return tempFile, offsetPosition +} + +// Запись временного файла с указателем на позицию в обрабатыввакемом файле +// чтобы в случае переоткрытия не обрабатывать повторно уже обработанные данные +func writeTempFile(outChannel *os.File, offset int64) { + // line = line + "\n" + // Перемещаем указательь на начало файла и перезаписываем строку + outChannel.Truncate(0) + _, err := outChannel.Seek(0, 0) + str := strconv.FormatInt(offset, 10) + _, err = outChannel.WriteString(str) + if err != nil { + log.Println("Ошибка записи временного файла:", err) + } +} + +// Удаление временного файла +func deleteTempFile(tempFile *os.File, fileName string) { + tempFile.Close() + // Удаляем временный файл + tempFileName := fileName + ".tmp" + e := os.Remove(filepath.Join(DirTemp, tempFileName)) + if e != nil { + log.Println("Eror with remove file", tempFileName, e) + } else { + log.Println("Удалён временный файл:", tempFileName) + } +} + +func ParseString (outLine string, logOffset int64) map[string]string { + var ( + resultData string + result map[string]string + ) + regexpLogRecord := regexp.MustCompile(`(?P\d{14}),(?P\w{1}),\{(?P\w+),(?P\w+)\},(?P\d+),(?P\d+),(?P\d+),(?P\d+),(?P\d+),(?P\w{1}),\"(?P.*)\",(?P\d+),\{(?P.+)\},\"(?P.*)\",(?P\d+),(?P\d+),(?P\d+),(?P\d+),(?P\d{1}),\{(\d|,)+\}$`) + // Полную строку прогоняем через регексп и распихиваем сразу по полям + matchedOut := regexpLogRecord.MatchString(outLine) + // fmt.Println(matchedOut) + if matchedOut { + findedData := regexpLogRecord.FindStringSubmatch(outLine) + if findedData != nil { + // result := make(map[string]string) + result = make(map[string]string) + // В зависимости от поля (типа) производим те или иные операции + // подстановка из словаря, конвертация, и т.д. + // Типы записей в словаре: + // 1 – пользователи; 2 – компьютеры; 3 – приложения; 4 – события; + // 5 – метаданные; 6 – серверы; 7 – основные порты; 8 – вспомогательные порты. + for i, name := range regexpLogRecord.SubexpNames() { + switch name { + case "DateTime": + // Подготоваливаем разные данные на основе даты записи в логе + recordTimeStamp := getTimestamp(findedData[i]) + recordDate := getDate(findedData[i]) + resultData = "@timestamp: " + recordTimeStamp + recordDate + result["@timestamp"] = recordTimeStamp + result["index_day_prefix"] = recordDate + result["id"] = findedData[i] + "-" + strconv.FormatInt(logOffset, 10) + case "Usr": + resultData = getDataFromDict(1, findedData[i]) + if resultData != "not" { + findedData[i] = resultData + } + case "ComputerName": + resultData = getDataFromDict(2, findedData[i]) + if resultData != "not" { + findedData[i] = resultData + } + case "ApplicationName": + resultData = getDataFromDict(3, findedData[i]) + if resultData != "not" { + findedData[i] = resultData + } + case "Event": + resultData = getDataFromDict(4, findedData[i]) + if resultData != "not" { + findedData[i] = resultData + } + case "Metadata": + resultData = getDataFromDict(5, findedData[i]) + if resultData != "not" { + findedData[i] = resultData + } + case "Server": + resultData = getDataFromDict(6, findedData[i]) + if resultData != "not" { + findedData[i] = resultData + } + case "Port": + resultData = getDataFromDict(7, findedData[i]) + if resultData != "not" { + findedData[i] = resultData + } + case "AddPort": + resultData = getDataFromDict(8, findedData[i]) + if resultData != "not" { + findedData[i] = resultData + } + case "TranDuration": + // Транзакция в шестнадцатеричном виде {24371d9855260;3bd1316} + // {TranDurationTranNumber} + // - первый элемент – число секунд с 01.01.0001 00:00:00 умноженное на 10000 + // - второй – номер транзакции + if findedData[i] != "0" { + transactionBeginTimeStamp := hex2int(findedData[i]) / 10000 - 62135607600 + // fmt.Println(">>>>", findedData[i], hex2int(findedData[i]), transactionBeginTimeStamp) + resultData = getDuration(transactionBeginTimeStamp, findedData[1]) + } else { + resultData = "0" + } + findedData[i] = resultData + case "TranNumber": + resultData = strconv.Itoa(hex2int(findedData[i])) + findedData[i] = resultData + default: + resultData = findedData[i] + } + if i != 0 && name != "" { + result[name] = findedData[i] + } + } + } + } + return result +} + +// Проверка вхождения строки в срез (массив строк) +func implContains(sl []string, name string) bool { + // iterate over the array and compare given string to each element + for _, value := range sl { + if value == name { + return true + } + } + return false +} + +// Запуск чтения файлов +func runOperations () { + // получаем список файлов на момент запуска + filesList := getDirectoryContents(DirIn) + // получаем последний модифицированный файл + lastModifyFiles := getLastModifyFile(DirIn) + + var client *opensearch.Client + log.Println("Список новых файлов:", lastModifyFiles, len(lastModifyFiles)) + // запускаем процесс обработки + if len(lastModifyFiles) == 0 { + log.Println("Новых файлов не найдено") + return + } + for _, fileName := range lastModifyFiles { + log.Println("Запускаем обработку файла", fileName) + if SendToEs { + EsIndexName = EsIndexPrefix + "-" + getDate(fileName) + client = esConnect() + esCreateIndex(EsIndexName, client) + } + tail(client, fileName, os.Stdout) + } + + // Получаем список новых файлов, и если они появились после запуска программы + // то запускаем процесс обработки + for { + newFilesList := getNewFilesFromDir(filesList, DirIn) + log.Println("Список новых файлов:", newFilesList) + // Если новых файлов нет засыпаем на 10 сек + if len(newFilesList) == 0 { + time.Sleep(10 * time.Second) + continue + } + if newFilesList[0] != "" { + for _, fileName := range newFilesList { + if SendToEs { + EsIndexName = EsIndexPrefix + "-" + getDate(fileName) + // esConnect() + esCreateIndex(EsIndexName, client) + } + // Добавляем имя файла в список полученный при старте (список + // обработанных файлов для корректного определения новых файлов + filesList = append(filesList, fileName) + // запускаем процесс чтения (обработки) + tail(client, fileName, os.Stdout) + } + } else { + log.Println("Новых файлов не найдено") + time.Sleep(10 * time.Second) + } + time.Sleep(1 * time.Second) + } + log.Println("Завершение 'func runOperations()'") +} + +func tail(client *opensearch.Client, fileName string, out io.Writer) { + var shutUp = false + var ( + recBegin bool + outLine string + l string + bulkCount int + arrRecords []map[string]string + fOut *os.File + ) + + bulkCount = 0 + + filesList := getDirectoryContents(DirIn) + + filePath := filepath.Join(DirIn, fileName) + + // Открываем временный файл и определяем позицию указателя в исходном файле + tempFile, offsetPossition := createTempFile(fileName) + + // Определяем размер файла и сравниваем с позицией при открытии + // если равны значит файл открылся на конце + fInfo, _ := os.Stat(filePath) + fSize := fInfo.Size() + log.Println(filePath, "size:", fSize) + + if offsetPossition == fSize { + log.Println("Достигнут конец файла", fileName, "Размер:", fSize, "Позиция:", offsetPossition) + } + + f, err := os.Open(filePath) + if err != nil { + log.Println("Ошибка открытия файла:", filePath, err) + } + defer f.Close() + + // Переместим указатель в файле на последнюю позицию при закрытии программы + pos, err := f.Seek(offsetPossition, 0) + if err != nil { + log.Printf("Seek error on %s: %s", fileName, err) + } else { + log.Println("Начинаем с позиции:", pos) + } + + r := bufio.NewReader(f) + info, err := f.Stat() + if err != nil { + log.Println("Ошибка открытия файла bufio.NewReader:", filePath, err) + } + + // Создаем файл для записи обработанных (выходных) данных + if WriteOutFile { + fOut = createOutFile(fileName) + // writeOutCSVHeader(fOut) + } + + oldSize := info.Size() + regexpBeginRecord := regexp.MustCompile(`^\{\d{14},\w{1},$`) + regexpEndRecord := regexp.MustCompile(`^\},$`) + for { + for line, prefix, err := r.ReadLine(); err != io.EOF; line, prefix, err = r.ReadLine() { + // определяем текущую позицию записи в файле и пишем в файл + logOffset, _ := f.Seek(0, os.SEEK_CUR) + writeTempFile(tempFile, logOffset) + + // ------- обработка строк -------- + l = strings.TrimSpace(string(line)) + // Находим начало записи + matchedBegin := regexpBeginRecord.MatchString(l) + if matchedBegin { + recBegin = true + } + if recBegin { + outLine = outLine + l + } + // Находим конец записи + matchedEnd := regexpEndRecord.MatchString(l) + // fmt.Println(l) + if matchedEnd { + recBegin = false + outLine = strings.TrimSuffix(strings.TrimLeft(outLine, "{,"), "},") + // Парсим подготовленную строку + result := ParseString(outLine, logOffset) + // Пишем выходные файлы + if WriteOutFile { + if OutFormat == "json" { + writeOutJSONFile(fOut, result) + } else { + writeOutCSVFile(fOut, result) + } + } + // Отправляем данные в ElasticSearch / OpenSearch + if SendToEs { + if result["index_day_prefix"] == "" { + continue + } + indexName := EsIndexPrefix + "-" + result["index_day_prefix"] + // пакетная отправка данных в эластик + if EsBulk { + if bulkCount < EsBulkRecordsQuantity { + bulkCount++ + // arrRecords = append(arrRecords, result) + } else { + bulkCount = 0 + // esAddRecordsBulk(client, indexName, arrRecords) + } + arrRecords = append(arrRecords, result) + } else { + // addRes := esAddRecord(client, indexName, result) + go esAddRecord(client, indexName, result) + time.Sleep(TailSleep) + // time.Sleep(5 * time.Millisecond) + + // Если запись не добавилась делаем повторную попытку через 5 сек. + // if !addRes { + // time.Sleep(5 * time.Second) + // addRes = esAddRecord(client, indexName, result) + // log.Println("Повторная попытка добавления записи:", result, addRes) + // } + } + } + writeTempFile(tempFile, logOffset) + if Debug { + fmt.Println(result) + } + outLine = "" + } + if prefix { + // fmt.Fprint(out, string(line)) + } else { + // fmt.Fprintln(out, string(line)) + } + // -------------------------------- + } + pos, err := f.Seek(0, io.SeekCurrent) + if err != nil { + log.Println("Ошибка определения позиции 'pos, err := f.Seek(0, io.SeekCurrent)' в файле", fileName, "error:", err) + // panic(err) + } + for { + time.Sleep(time.Second) + newinfo, err := f.Stat() + if err != nil { + log.Println("Ошибка ошибка получения статистики 'newinfo, err := f.Stat()' по файлу", fileName, "error:", err) + // panic(err) + } + newSize := newinfo.Size() + if newSize != oldSize { + if newSize < oldSize { + f.Seek(0, 0) + } else { + f.Seek(pos, io.SeekStart) + } + r = bufio.NewReader(f) + oldSize = newSize + break + } + // fmt.Println("eof") + newFilesList := getNewFilesFromDir(filesList, DirIn) + if len(newFilesList) != 0 { + if !implContains(newFilesList, fileName) { + shutUp = true + log.Println("Обнаружен новый файл", newFilesList, shutUp) + break + } + } + } + if shutUp { + break + } + } + if shutUp { + deleteTempFile(tempFile, fileName) + } +} + +func createWorkDir() { + dirs := []string{DirIn, DirOut, DirTemp, DirLog} + for _, dir := range dirs { + // fmt.Println(dir) + if !directoryExists(dir) { + err := os.Mkdir(dir, 0755) + if err != nil { + log.Println(err) + } else { + log.Println("Создан каталог", dir) + } + } + } +} +// CloseIdleConnections() +func esConnect() *opensearch.Client { + // Initialize the Client with SSL/TLS enabled. + EsClient, err := opensearch.NewClient(opensearch.Config{ + Transport: &http.Transport{ + MaxIdleConns: 10, + MaxIdleConnsPerHost: 10, + ResponseHeaderTimeout: 60 * time.Second, + IdleConnTimeout: 1 * time.Millisecond, + DisableKeepAlives: true, + // ForceAttemptHTTP2: true, + MaxConnsPerHost: 20, + DialContext: (&net.Dialer{ + Timeout: 60 * time.Second, + // KeepAlive: 10 * time.Millisecond, + }).DialContext, + TLSClientConfig: &tls.Config{InsecureSkipVerify: EsSkipVerify}, + }, + Addresses: []string{EsUrl}, + Username: EsUser, + Password: EsPassword, + }) + if err != nil { + log.Println("cannot initialize", err) + os.Exit(1) + } + + EsClient.DiscoverNodes() + if err != nil { + log.Println("Ошибка получения списка узлов кластера ES:", err) + os.Exit(1) + } + log.Println("Подключение к ElasticSearch:", EsClient) + return EsClient + +} + +// Создаем индекс в ES +func esCreateIndex(indexName string, client *opensearch.Client) { + mapping := strings.NewReader(IndexTemplate) + // Create an index with non-default settings. + createIndex := opensearchapi.IndicesCreateRequest{ + Index: indexName, + Body: mapping, + } + createIndexResponse, err := createIndex.Do(context.Background(), client) + if err != nil { + log.Println("failed to create index ", err, createIndexResponse) + os.Exit(1) + } + fmt.Println(createIndexResponse.StatusCode) + if strings.Contains(createIndexResponse.String(), "already exists") { + log.Println(createIndexResponse.IsError()) + log.Println("Index", indexName, "already exists") + } else if createIndexResponse.StatusCode == 200 { + log.Println("Создан индекс:", indexName) + } +} + +func esAddRecordsBulk(client *opensearch.Client, indexName string, arrLine []map[string]string) { + // Perform bulk operations. + for _, obj := range arrLine { + aJson, err := json.Marshal(obj) + fmt.Println(aJson) + if err != nil{ + log.Printf("Error encoding query: %s", err) + } + + } + // blk, err := client.Bulk( + // strings.NewReader(` + // { "index" : { "_index" : "go-test-index1", "_id" : "2" } } + // { "title" : "Interstellar", "director" : "Christopher Nolan", "year" : "2014"} + // { "create" : { "_index" : "go-test-index1", "_id" : "3" } } + // { "title" : "Star Trek Beyond", "director" : "Justin Lin", "year" : "2015"} + // { "update" : {"_id" : "3", "_index" : "go-test-index1" } } + // { "doc" : {"year" : "2016"} } +// `), + // ) +// + // if err != nil { + // fmt.Println("failed to perform bulk operations", err) + // os.Exit(1) + // } + // fmt.Println("Performing bulk operations") + // fmt.Println(blk) + +} + +// Вставка записи в индекс ES +func esAddRecord(client *opensearch.Client, indexName string, line map[string]string) bool { + // fmt.Println(">>>>>>>>>",indexName) + // a_json, err := json.Marshal(line) + var operResult bool + aJson, err := json.MarshalIndent(line, "", " ") + if err != nil { + log.Printf("Error encoding: %s", line, "with error %s", err) + } + + document := strings.NewReader(string(aJson)) + + req := opensearchapi.IndexRequest{ + Index: indexName, + Body: document, + } + insertResponse, err := req.Do(context.Background(), client) + // res, err := req.Do(context.Background(), client) + if err != nil { + log.Println("failed to insert document ", err, line) + // os.Exit(1) + operResult = false + } else { + operResult = true + } + if Debug { + log.Println(insertResponse) + } + defer insertResponse.Body.Close() + return operResult +} + +// Читаем конфиг и определяем переменные +func readConfigFile(fileName string) { + cfg, err := ini.LoadSources(ini.LoadOptions{ + IgnoreInlineComment: true, + }, fileName) + if err != nil { + log.Println("Ошибка чтения файла конфигурации ", fileName, ": ", err) + } + + config := Config{} + cfg.MapTo(&config) + DirIn = cfg.Section("General").Key("dirIn").String() + DirOut = cfg.Section("General").Key("dirOut").String() + DirTemp = cfg.Section("General").Key("dirTemp").String() + DirLog = cfg.Section("General").Key("dirLog").String() + DictFile = cfg.Section("General").Key("dictFile").String() + ObjectTypes = cfg.Section("General").Key("objectTypes").String() + LogFileExtention = cfg.Section("General").Key("logFileExtention").String() + + SendToEs = cfg.Section("ElasticSearch").Key("sendToEs").MustBool() + if SendToEs { + EsUrl = cfg.Section("ElasticSearch").Key("esUrl").String() + EsUser = cfg.Section("ElasticSearch").Key("esUser").String() + EsPassword = cfg.Section("ElasticSearch").Key("esPassword").String() + EsSkipVerify = cfg.Section("ElasticSearch").Key("esSkipVerify").MustBool() + EsIndexPrefix = cfg.Section("ElasticSearch").Key("esIndexPrefix").String() + } + + WriteOutFile = cfg.Section("Processing").Key("writeOutFile").MustBool() + OutFormat = cfg.Section("Processing").Key("outFormat").String() + + TimeZoneOffset = cfg.Section("Processing").Key("timeZoneOffset").String() + Duration = cfg.Section("Processing").Key("readDuration").String() +} + +// Создание файла конфигурации +func writeConfigFile(fileName string) { + f, err := os.Create(fileName) + if err != nil { + log.Println("Create out file error:", err) + } + _, err = f.WriteString(ConfigTemplate) + if err != nil { + log.Println("Ошибка записи в выходной файл:", err) + } else { + log.Println("Создан файл конфигурации:", fileName) + } +} +// Создание каталога с конфигами +func createConfDir(dir string) { + if !directoryExists(dir) { + err := os.Mkdir(dir, 0700) + if err != nil { + log.Println(err) + } else { + log.Println("Создан каталог", dir) + } + } +} + +const serviceName = "1C log processor" +const serviceDescription = "1C log processor" + +type program struct{} + +func (p program) Start(s service.Service) error { + log.Println("Служба", s.String(), "запущена") + go p.RunService() + return nil +} + +func (p program) Stop(s service.Service) error { + log.Println("Служба", s.String(), "остановлена") + return nil +} + +func (p program) RunService() { + for { + log.Println("Запускаем основной процесс") + runOperations() + time.Sleep(1 * time.Second) + } +} + +func RunWinService() { + serviceConfig := &service.Config{ + Name: serviceName, + DisplayName: serviceName, + Description: serviceDescription, + } + prg := &program{} + s, err := service.New(prg, serviceConfig) + if err != nil { + log.Println("Ошибка создания сервиса:", err.Error()) + } + err = s.Run() + if err != nil { + log.Println("Ошибка запуска сервиса:", err.Error()) + } +} + + +func main() { + // Читаем опции коммандной строки + flag.BoolVar(&RunAsWinService, "run-win-service", false, "Запуск как службы Windows") + flag.StringVar(&DirCfg, "dir-config", ".config", "Каталог для файлов настройки (DIR_CFG)") + flag.StringVar(&FileCfg, "config", "config.ini", "Файл настроек") + flag.BoolVar(&CreateFileCfg, "create-config", false, "Создать файл настроек") + flag.BoolVar(&Debug, "debug", false, "Выводить данные в консоль") + flag.StringVar(&DirIn, "dir-in", "in", "Каталог для исходных файлов (DIR_IN)") + flag.StringVar(&DirOut, "dir-out", "out", "Каталог для обработанных файлов (DIR_OUT)") + flag.StringVar(&DirTemp, "dir-temp", "tmp", "Каталог для временных файлов (TEMP)") + flag.StringVar(&DirLog, "dir-log", "log", "Каталог для лога работы (DIR_LOG)") + flag.StringVar(&DictFile, "dict-file", "1Cv8.lgf", "Файл со словарём (DICT_FILE)") + flag.BoolVar(&SendToEs, "es-send", false, "Отправлять данные в ElasticSearch") + flag.StringVar(&EsUrl, "es-url", "", "Адрес узла Elastic Search (ELASTICSEARCH_URL)") + flag.StringVar(&EsUser, "es-user", "", "Имя пользователя Elastic Search (ELASTICSEARCH_USER)") + flag.StringVar(&EsPassword, "es-password", "", "Пароль пользователя Elastic Search (ELASTICSEARCH_PASSWORD)") + flag.StringVar(&EsIndexPrefix, "es-index-prefix", "", "Префикс имени индекса Elastic Search (ELASTICSEARCH_INDEX_PREFIX)") + flag.IntVar(&EsBulkRecordsQuantity, "es-bulk-quantity", 10, "Количество записей в одном запросе для пакетной вставки") + flag.BoolVar(&EsBulk, "es-bulk", false, "Пакетная вставка записей в Elastic Search") + flag.BoolVar(&EsSkipVerify, "es-skip-verify", false, "Пропустить проверку сертификатов при подключении к Elastic Search") + flag.BoolVar(&WriteOutFile, "write-out-file", false, "Запись обработанных данных в файл") + flag.StringVar(&OutFormat, "out-format", "csv", "Формат данных на выходе (csv, json) (OUT_FORMAT)") + flag.StringVar(&LogFileExtention, "log-file-ext", ".lgp", "Расширение файлов с журналами (обрабатываемых логов) (LOG_FILE_EXT)") + flag.StringVar(&ObjectTypes, "object-types", "1,2,3,4,5,6,7,8", "Список типов объектов словаря для выборки, разделённый запятой (OBJECT_TYPES)") + flag.StringVar(&TimeZoneOffset, "tz-offset", "+03:00", "Сдвиг по времени от UTC") + flag.StringVar(&Duration, "tail-sleep", "5ms", "Задержка чтения входного файла. Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h')") + + flag.Parse() + + configFile := filepath.Join(DirCfg, FileCfg) + if fileExists(configFile) { + readConfigFile(configFile) + log.Println("Используются настройки из файла конфгурации:", configFile) + } else { + log.Println("Не найден файл конфгурации:", configFile) + if CreateFileCfg { + createConfDir(DirCfg) + writeConfigFile(configFile) + } + } + + // os.Exit(1) + // Установим задержку чтения входного файла + TailSleep, _ = time.ParseDuration(Duration) + + // log.Println(DirCfg, DirIn, "send es:", SendToEs, OutFormat, WriteOutFile, EsUrl, "esuser:", EsUser, Duration, TailSleep) + + // Проверка и установка переменных для работы + if DirIn == "" && os.Getenv("DIR_IN") == "" { + fmt.Println("Make sure environment variables `DIR_IN`, or used with '-dir-in' argument") + os.Exit(1) + } else if DirIn == "" && os.Getenv("DIR_IN") != "" { + DirIn = os.Getenv("DIR_IN") + } + + if WriteOutFile { + if DirOut == "" && os.Getenv("DIR_OUT") == "" { + fmt.Println("Make sure environment variables `DIR_OUT`, or used with '-dir-out' argument") + os.Exit(1) + } else if DirOut == "" && os.Getenv("DIR_OUT") != "" { + DirOut = os.Getenv("DIR_OUT") + } + } + + if os.Getenv("LOG_FILE_EXT") != "" { + LogFileExtention = os.Getenv("LOG_FILE_EXT") + } + + if os.Getenv("TEMP") != "" { + LogFileExtention = os.Getenv("LOG_FILE_EXT") + } + + if os.Getenv("DIR_LOG") != "" { + DirLog = os.Getenv("DIR_LOG") + } + + if os.Getenv("TEMP") != "" { + LogFileExtention = os.Getenv("LOG_FILE_EXT") + } + + // Проверяем параметры и переменные окружения для работы с ElasticSearch / OpenSearch + if SendToEs { + if EsUser == "" && os.Getenv("ELASTICSEARCH_USER") == "" { + fmt.Println("Make sure environment variables `ELASTICSEARCH_USER=\"es_user_name\"` or used with '-es-user' argument") + os.Exit(1) + } else if EsUser == "" && os.Getenv("ELASTICSEARCH_USER") != "" { + EsUser = os.Getenv("ELASTICSEARCH_USER") + } + + if EsPassword == "" && os.Getenv("ELASTICSEARCH_PASSWORD") == "" { + fmt.Println("Make sure environment variables `ELASTICSEARCH_PASSWORD or used with '-es-password' argument") + os.Exit(1) + } else if EsPassword == "" && os.Getenv("ELASTICSEARCH_PASSWORD") != "" { + EsPassword = os.Getenv("ELASTICSEARCH_PASSWORD") + } + + if EsUrl == "" && os.Getenv("ELASTICSEARCH_URL") == "" { + fmt.Println("Make sure environment variables `ELASTICSEARCH_URL or used with '-es-url' argument") + os.Exit(1) + } else if EsUrl == "" && os.Getenv("ELASTICSEARCH_URL") != "" { + EsUrl = os.Getenv("ELASTICSEARCH_URL") + } + + if EsIndexPrefix == "" && os.Getenv("ELASTICSEARCH_INDEX_PREFIX") == "" { + fmt.Println("Make sure environment variables `ELASTICSEARCH_INDEX_PREFIX` or used with '-es-index-prefix' argument") + os.Exit(1) + } else if EsIndexPrefix == "" && os.Getenv("ELASTICSEARCH_INDEX_PREFIX") != "" { + EsIndexPrefix = os.Getenv("ELASTICSEARCH_INDEX_PREFIX") + } + } + + createWorkDir() + + // Установка и открытие лога для порграммы + fLog, err := os.OpenFile(filepath.Join(DirLog, "1c-log-processor.log"), os.O_RDWR | os.O_CREATE | os.O_APPEND, 0666) + if err != nil { + fmt.Printf("error opening log file: %v", err) + } + defer fLog.Close() + if Debug { + log.SetOutput(os.Stdout) + } else { + log.SetOutput(fLog) + } + // readDictFile(dictFile) + DictObjects = readDictFile(DictFile) + + // Получаем временную зону и часовой пояс + // Если зону возвращает UTC то присваеваем вручную из опций командной строки + // Хреновый хак но пока решения нет. Так как на винде возвращает почему-то нули + LocalTimeZone, LocalTZOffset = getLocalTimeZone() + if LocalTZOffset == "+00:00" { + LocalTZOffset = TimeZoneOffset + } + + // Запуск как сервиса + if RunAsWinService { + RunWinService() + } else { + runOperations() + } + log.Println("Работа программы завершена") +}