1S-logprocessor/log-processor.go

1478 lines
56 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// ----------------------------------------------------------------------
// Программа для анализа и записи журналов 1С
// Версия: 1.0.0
// Автор: Сергей Калинин svk@nuk-svk.ru https://git.nuk-svk.ru/svk/
// ----------------------------------------------------------------------
package main
import (
"bufio"
"fmt"
"log"
"os"
"flag"
"time"
"io"
// "runtime"
"path/filepath"
"regexp"
"strings"
"strconv"
"encoding/json"
// "github.com/hpcloud/tail"
"net"
"sort"
"crypto/tls"
"net/http"
"context"
"gopkg.in/ini.v1"
opensearch "github.com/opensearch-project/opensearch-go/v2"
opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"github.com/kardianos/service"
)
var (
DirCfg string
FileCfg string
CreateFileCfg bool
DirIn string
DirOut string
DirTemp string
DirLog string
ObjectTypes string
DictObjects map[int]map[int]string
OpenedFiles map[string]chan string
LogFileExtention string
SendToEs bool
EsUrl string
EsUser string
EsPassword string
EsIndexPrefix string
EsIndexName string
EsSkipVerify bool
EsDiscoverNode bool
Debug bool
OutFormat string
WriteOutFile bool
EsClient *opensearch.Client
EsBulk bool
EsBulkRecordsQuantity int
LocalTZOffset string
LocalTimeZone string
TimeZoneOffset string
Duration string
DictFile string
TailSleep time.Duration
RunAsWinService bool
ExcludeEvent []string
InFile string
WorkLogOut string
)
type Config struct {
DirCfg string `ini:"dirCfg"`
DirIn string `ini:"dirIn"`
DirOut string `ini:"dirOut"`
DirTemp string `ini:"dirTemp"`
DirLog string `ini:"dirLog"`
DictFile string `ini:"dictFile"`
ObjectTypes string `ini:"objectTypes"`
LogFileExtention string `ini:"logFileExtention"`
SendToEs bool `ini:"sendToEs"`
EsUrl string `ini:"esUrl"`
EsUser string `ini:"esUser"`
EsPassword string `ini:"esPassword"`
EsIndexPrefix string `ini:"esIndexPrefix"`
EsSkipVerify bool `ini:"esSkipVerify"`
// EsBulk bool `ini:"esBulk"`
// EsBulkRecordsQuantity int `ini:"esBulkQuantity"`
OutFormat string `ini:"outFormat"`
WriteOutFile bool `ini:"writeOutFile"`
TimeZoneOffset string `ini:"timeZoneOffset"`
Duration string `ini:"Duration"`
ExcludeEvent []string `ini:"excludeEvent"`
}
type Key struct {
ObjectType int
ObjectNumber int
}
type OutRecord struct {
Timestamp string `json:"@timestamp"`
DateTime string `json:"log_datetime"`
TranNumber int `json:"TranNumber"`
TranDuration int `json:"TranDuration"`
TranStatus string `json:"TranStatus"`
Usr string `json:"Usr"`
ComputerName string `json:"ComputerName"`
ApplicationName string `json:"ApplicationName"`
ConnectionID int `json:"ConnectionID"`
Event string `json:"Event"`
Importance string `json:"Importance"`
Comment string `json:"Comment"`
Metadata string `json:"Metadata"`
Data string `json:"Data"`
PresentData string `json:"PresentData"`
Server string `json:"Server"`
Port int `json:"Port"`
AddPort int `json:"AddPort"`
SessionID int `json:"SessionID"`
ID string `json:"id"`
IndexDayPrefix string `json:"index_day_prefix"`
// OtherData1 string `json:"OtherData1"`
// OtherData2 string `json:"OtherData1"`
}
var IndexTemplate = `{
"settings" : { },
"mappings" : {
"properties" : {
"@timestamp" : {
"type" : "date"
},
"Comment" : {
"type" : "text"
},
"AddPort" : {
"type" : "keyword"
},
"ApplicationName" : {
"type" : "keyword"
},
"TranDuration" : {
"type" : "long"
},
"Server" : {
"type" : "keyword"
},
"PresentData" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"Port" : {
"type" : "keyword"
},
"Metadata" : {
"type" : "keyword"
},
"Data" : {
"type" : "text"
},
"Importance" : {
"type" : "keyword"
},
"ConnectionID" : {
"type" : "keyword"
},
"Usr" : {
"type" : "keyword"
},
"ComputerName" : {
"type" : "keyword"
},
"TranStatus" : {
"type" : "keyword"
},
"Event" : {
"type" : "keyword"
},
"TranNumber" : {
"type" : "keyword"
},
"SessionID" : {
"type" : "keyword"
}
}
}
}`
// Шаблон файла конфигурации
var ConfigTemplate = `[General]
; Каталог для входящих данных
dirIn=in
; Каталог для исходящих данных
dirOut=out
; Каталог для временных файлов
dirTemp=tmp
; Каталог для журнала работы программы
dirLog=log
; Имя файла словаря
dictFile=1Cv8.lgf
; Список объектов для выборки из словаря
objectTypes=1,2,3,4,5,6,7,8
; Расширение файлов журнало 1С (входящих)
logFileExtention=.lgp
[ElasticSearch]
; Включене отправки данных в ElasticSearch
sendToEs=false
; Получать список узлов кластера ElasticSearch
esDiscoverNode=true
; Адрес сервера ElasticSearch
esUrl=https://elastic:9200
; Пользователь для подключсения к ElasticSearch
esUser=user_name
; Пароль пользователя ElasticSearch
esPassword=user_password
; Отключить проверку SSL сертификатов при подключении
esSkipVerify=false
; Перфикс индекса в ElasticSearch
esIndexPrefix=test_log
[Processing]
; Включение вывода обработанной информации в файл
writeOutFile=false
; Формат выходного файла
outFormat=csv
; Сдвиг времени (временная зона)
timeZoneOffset=+03:00
; Задержка чтения входного файла между строками
; Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h'
readDuration=5ms
; Список событий (поле "Event"), исключенных из выдачи, разделенный ";"
excludeEvent=
`
// Список файлов в каталоге
func getDirectoryContents(dir string) []string {
var names []string
entries, err := os.ReadDir(dir);
if err != nil {
log.Println("Ошибка получения содержимого каталога: ", dir, err);
} else {
if Debug {
log.Println("Get the directory content:", dir)
}
}
//iterate through the directory entities and print out their name.
for _, entry := range(entries) {
if filepath.Ext(entry.Name()) == LogFileExtention {
// fmt.Println(entry.Name());
if Debug {
log.Println("Finded file:", entry.Name())
}
names = append(names, entry.Name())
}
}
return names
}
func getLastModifyFile(dir string) []string {
files, err := os.ReadDir(dir)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
var modTime time.Time
var names []string
for _, fi := range files {
// // fmt.Println(fi)
// names = append(names, fi.Name())
if Debug {
log.Println(LogFileExtention, filepath.Ext(fi.Name()))
}
if filepath.Ext(fi.Name()) == LogFileExtention {
fileInfo, _ := fi.Info()
if Debug {
log.Println("Finding file info:", fileInfo)
}
if fileInfo.Mode().IsRegular() {
if !fileInfo.ModTime().Before(modTime) {
if fileInfo.ModTime().After(modTime) {
modTime = fileInfo.ModTime()
names = names[:0]
}
// if filepath.Ext(fi.Name()) == ".lgp" {
dt := time.Now()
// Сравниваем имя последнего файла с текущей датой
// и если равны - запускаем процесс разборки
if getDate(fi.Name()) == dt.Format("2006-01-02") {
names = append(names, fi.Name())
log.Println("Текущая дата и дата файла совпадают:", getDate(fi.Name()), dt.Format("2006-01-02"))
}
// }
}
if Debug {
log.Println("Finding file modify time:", fileInfo.ModTime())
log.Println("Last file modify time:", modTime)
log.Println("Current time:", fileInfo.ModTime().Before(modTime), fileInfo.ModTime().After(modTime))
}
} else {
if Debug {
log.Println("Finding file was not regular:", fi)
}
}
}
}
return names
}
func getNewFilesFromDir(filesList []string, dir string) []string {
var names []string
newFilesList := getDirectoryContents(dir)
if len(newFilesList) > 0 {
// log.Println("Получаем список файлов", newFilesList)
for _, newFileName := range newFilesList {
if !implContains(filesList, newFileName) {
fmt.Println(newFileName, implContains(filesList, newFileName))
names = append(names, newFileName)
}
}
}
return names
}
// Readln returns a single line (without the ending \n)
// from the input buffered reader.
// An error is returned iff there is an error with the
// buffered reader.
func Readln(r *bufio.Reader) (string, error) {
var (isPrefix bool = true
err error = nil
line, ln []byte
)
for isPrefix && err == nil {
line, isPrefix, err = r.ReadLine()
ln = append(ln, line...)
}
return string(ln),err
}
// func Readln(r *bufio.Reader) (string, error){
// result, err := r.ReadString('\n')
// // log.Println(result)
// ln := result
// return ln, err
// }
func hex2int(hexStr string) int {
// base 16 for hexadecimal
result, err := strconv.ParseInt(hexStr, 16, 64)
if err != nil {
log.Printf("error converting value: %v, %v\n",hexStr, err)
return 0
}
return int(result)
}
func readDictFile(fileName string) map[int]map[int]string {
var (
// arr map[Key]string
recBegin bool
// recEnd bool
outLine string
l string
res map[int]map[int]string
)
res = make(map[int]map[int]string)
objectTypesList := strings.Split(ObjectTypes, ",")
f, err := os.Open(fileName)
if err != nil {
log.Printf("error opening file: %v\n",err)
os.Exit(1)
} else {
if Debug {
log.Println("Opening file:", fileName)
}
}
r := bufio.NewReader(f)
s, e := Readln(r)
// регулярное выражение для получения данных
regexpSplitData := regexp.MustCompile(`\{(?P<objType>\d+),(.*,|)(?P<objData>.+),(?P<objNumber>\d+)\},$`)
// регулярное выражение определения однострочной записи
regexpString := regexp.MustCompile(`^\{\d+,.+,\d+\},$`)
// регулярное выражение для многострочных записей соответстве нно начало строки и конец
// regexpMultilineBegin := regexp.MustCompile(`^.+(\},)$`)
regexpMultilineEnd := regexp.MustCompile(`^.+},$`)
for e == nil {
// log.Println(s)
s,e = Readln(r)
l = strings.TrimSpace(s)
// strings.TrimRight(outLine, ",")
// regexp.MustCompile("[*,%_]{1}").Split(refString, -1)
// Обработка многострочных записей
matchedString := regexpString.MatchString(l)
// matchedBegin := regexpMultilineBegin.MatchString(l)
matchedEnd := regexpMultilineEnd.MatchString(l)
// fmt.Println(matchedBegin)
if matchedString == true && recBegin == false{
findedData := regexpSplitData.FindStringSubmatch(l)
if findedData != nil {
result := make(map[string]string)
for i, name := range regexpSplitData.SubexpNames() {
// log.Println(i, name, findedData[i])
if i != 0 && name != "" {
result[name] = findedData[i]
}
}
if implContains(objectTypesList, result["objType"]) == false {
continue
}
objTypeInt, _ := strconv.Atoi(result["objType"])
objNumberInt, _ := strconv.Atoi(result["objNumber"])
if res[objTypeInt] == nil {
res[objTypeInt] = make(map[int]string)
}
res[objTypeInt][objNumberInt] = strings.Trim(result["objData"], "{}\"")
// log.Println(">>",objTypeInt, objNumberInt, result["objData"])
result = nil
}
}
if matchedEnd == false {
recBegin = true
// log.Println("multilineline begin")
}
if recBegin {
outLine = outLine + l
}
if recBegin && matchedEnd {
recBegin = false
s = outLine
outLine = ""
findedData := regexpSplitData.FindStringSubmatch(s)
if findedData != nil {
result := make(map[string]string)
for i, name := range regexpSplitData.SubexpNames() {
// log.Println(i, name, findedData[i])
if i != 0 && name != "" {
result[name] = findedData[i]
}
}
if implContains(objectTypesList, result["objType"]) == false {
continue
}
objTypeInt, _ := strconv.Atoi(result["objType"])
objNumberInt, _ := strconv.Atoi(result["objNumber"])
if res[objTypeInt] == nil {
res[objTypeInt] = make(map[int]string)
}
res[objTypeInt][objNumberInt] = strings.Trim(result["objData"], "{}")
// log.Println("<<",objTypeInt, objNumberInt, result["objData"])
}
}
// time.Sleep(1 * time.Second)
}
return res
}
// Создаем выходной файл. Если он есть открываем на запись
// и перемещаем указатель в конец файла
func createOutFile(fileName string) *os.File {
var (
fOut *os.File
err error
)
fileOutPath := filepath.Join(DirOut, fileName)
if fileExists(fileOutPath) {
fOut, err = os.OpenFile(fileOutPath, os.O_RDWR, 0644)
if err != nil {
log.Println("Open out file", fileOutPath, "error:", err)
}
} else {
fOut, err = os.Create(fileOutPath)
if err != nil {
log.Println("Create out file", fileOutPath, "error:", err)
}
}
// defer fOut.Close()
fOutInfo, _ := os.Stat(fileOutPath)
fOutSize := fOutInfo.Size()
_, err = fOut.Seek(fOutSize, 0)
if err != nil {
log.Printf("Seek error on %s: %s", fileOutPath, err)
}
return fOut
}
func writeOutCSVFile(outChannel *os.File, m map[string]string) {
var result string
if len(m) == 0 {
return
}
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
// strings.Join(m[k], ";")
result = result + m[k]
result = result + ";"
}
// fmt.Println(result)
result = result + "\n"
_, err := outChannel.WriteString(result)
if err != nil {
log.Println("Ошибка записи в выходной файл:", err)
}
}
// func writeOutCSVHeader(outChannel *os.File) {
// var result string
// var m OutRecord
// keys := make([]string, 0, len(m))
// for k := range m {
// keys = append(keys, k)
// }
// sort.Strings(keys)
//
// for _, k := range keys {
// // strings.Join(m[k], ";")
// result = result + k
// result = result + ";"
// }
// // fmt.Println(result)
// result = result + "\n"
// _, err := outChannel.WriteString(result)
// if err != nil {
// log.Println("Ошибка записи в выходной файл:", err)
// }
// }
//
func writeOutJSONFile(outChannel *os.File, line map[string]string) {
// fmt.Println(">>>>>>>>>",line)
// a_json, err := json.Marshal(line)
if len(line) == 0 {
return
}
a_json, err := json.MarshalIndent(line, "", " ")
if err != nil {
log.Printf("Error encoding: %s", line, "with error %s", err)
}
if err != nil{
log.Printf("Error encoding query: %s", err)
}
outChannel.Write(a_json)
}
func getDataFromDict(dictType int, dictIndex string) string {
i, err := strconv.Atoi(dictIndex)
if err != nil {
log.Println("func getDataFromDict, string to int converted error:", dictIndex)
return "not"
}
return DictObjects[dictType][i]
}
// Функция возвращает дату в формате YYYY-MM-DD
func getDate(dateString string) string {
t := fmt.Sprintf("%s-%s-%s", dateString[0:4], dateString[4:6], dateString[6:8])
return t
}
// Получение временной зоны и часового пояса
// например: MSK, +03:00
func getLocalTimeZone() (string, string) {
zone, _ := time.Now().Zone()
zoneBounds, _ := time.Now().ZoneBounds()
l := strings.Fields(zoneBounds.String())
zoneOffsetArr := strings.Split(l[2], "")
zoneOffset := fmt.Sprintf("%s%s%s:%s%s", zoneOffsetArr[0], zoneOffsetArr[1], zoneOffsetArr[2], zoneOffsetArr[3], zoneOffsetArr[4])
// fmt.Println(zone)
// fmt.Println(zoneOffset)
return zone, zoneOffset
}
// Перобразование строки с датой в отпечаток времени (timestamp) для Elasticsearch
// "2023-01-03T10:15:30+03:00"
func getTimestamp(dateString string) string {
// t := fmt.Sprintf("%s-%s-%s %s:%s:%s %s", dateString[0:4], dateString[4:6], dateString[6:8], dateString[8:10], dateString[10:12], dateString[12:14], zone)
// t := fmt.Sprintf("%s-%s-%sT%s:%s:%sZ%s", dateString[0:4], dateString[4:6], dateString[6:8], dateString[8:10], dateString[10:12], dateString[12:14], LocalTZOffset)
t := fmt.Sprintf("%s-%s-%sT%s:%s:%s%s", dateString[0:4], dateString[4:6], dateString[6:8], dateString[8:10], dateString[10:12], dateString[12:14], LocalTZOffset)
// return "2023-01-03T10:15:30+03:00"
return t
}
// Получение времени длительности транзакции
func getDuration(transactionTime int, recordTimeStamp string) string {
if transactionTime == 0 {
return "0"
}
beginTime := time.Unix(int64(transactionTime), 0)
zone, _ := beginTime.Zone()
// fmt.Println("The second time is", beginTime, zone, offset)
t := fmt.Sprintf("%s-%s-%s %s:%s:%s %s", recordTimeStamp[0:4], recordTimeStamp[4:6], recordTimeStamp[6:8], recordTimeStamp[8:10], recordTimeStamp[10:12], recordTimeStamp[12:14], zone)
endTime, err := time.Parse("2006-01-02 15:04:05 MST", t)
if err != nil {
log.Println("Ошибка преобразования времени 'func getDuration()':", endTime, err)
}
duration := endTime.Unix() - int64(transactionTime)
// fmt.Println(transactionTime, recordTimeStamp, t, duration)
return strconv.Itoa(int(duration))
}
func fileExists(filename string) bool {
info, err := os.Stat(filename)
if os.IsNotExist(err) {
return false
}
return !info.IsDir()
}
func directoryExists(dir string) bool {
info, err := os.Stat(dir)
if os.IsNotExist(err) {
return false
}
return info.IsDir()
}
func createTempFile(fileName string) (*os.File, int64) {
var (
tempFile *os.File
offsetPosition int64
)
// Создаем временный файл для сохранения текущей позиции в обрабатываемом файле
tempFileName := fileName + ".tmp"
tempFileFullPath := filepath.Join(DirTemp, tempFileName)
// tempFile, err := os.OpenFile(tempFileFullPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
tempFile, err := os.OpenFile(tempFileFullPath, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
log.Println("Open temp file", tempFileFullPath, "error:", err)
}
// tempFile, err := os.OpenFile(tempFileFullPath, os.O_RDWR, 0644)
tempFileBuf := bufio.NewReader(tempFile)
offsetPositionStr, err := Readln(tempFileBuf)
// fmt.Println("--->", offsetPositionStr, "<---")
// os.Exit(1)
if offsetPositionStr != "" {
offsetPosition, _ = strconv.ParseInt(offsetPositionStr, 10, 64)
if err != nil {
log.Println("Unable convert value:", offsetPositionStr, err)
}
}
// defer tempFile.Close()
return tempFile, offsetPosition
}
// Запись временного файла с указателем на позицию в обрабатыввакемом файле
// чтобы в случае переоткрытия не обрабатывать повторно уже обработанные данные
func writeTempFile(outChannel *os.File, offset int64) {
// line = line + "\n"
// Перемещаем указательь на начало файла и перезаписываем строку
outChannel.Truncate(0)
_, err := outChannel.Seek(0, 0)
str := strconv.FormatInt(offset, 10)
_, err = outChannel.WriteString(str)
if err != nil {
log.Println("Ошибка записи временного файла:", err)
}
}
// Удаление временного файла
func deleteTempFile(tempFile *os.File, fileName string) {
tempFile.Close()
// Удаляем временный файл
tempFileName := fileName + ".tmp"
e := os.Remove(filepath.Join(DirTemp, tempFileName))
if e != nil {
log.Println("Eror with remove file", tempFileName, e)
} else {
log.Println("Удалён временный файл:", tempFileName)
}
}
// (\d{14}),(\w{1}),\{(\w+),(?\w+)\},(\d+),(\d+),(\d+),(\d+),(\d+),(\w{1}),\"(.*)\",(\d+),\{(.+)\},\"(.*)\",(\d+),(\d+),(\d+),(\d+),(\d{1}),\{(\d|,)+\}
func parseString (outLine string, logOffset int64) map[string]string {
var (
resultData string
result map[string]string
)
result = nil
regexpLogRecord := regexp.MustCompile(`(?P<DateTime>\d{14}),(?P<TranStatus>\w{1}),\{(?P<TranDuration>\w+),(?P<TranNumber>\w+)\},(?P<Usr>\d+),(?P<ComputerName>\d+),(?P<ApplicationName>\d+),(?P<ConnectionID>\d+),(?P<Event>\d+),(?P<Importance>\w{1}),\"(?P<Comment>.*)\",(?P<Metadata>\d+),\{(?P<Data>.+)\},\"(?P<PresentData>.*)\",(?P<Server>\d+),(?P<Port>\d+),(?P<AddPort>\d+),(?P<SessionID>\d+),(?P<OtherData1>\d{1}),\{(\d|,)+\}$`)
// Полную строку прогоняем через регексп и распихиваем сразу по полям
matchedOut := regexpLogRecord.MatchString(outLine)
// if Debug {
// log.Println(outLine)
// }
if matchedOut {
findedData := regexpLogRecord.FindStringSubmatch(outLine)
if findedData != nil {
// result := make(map[string]string)
result = make(map[string]string)
// В зависимости от поля (типа) производим те или иные операции
// подстановка из словаря, конвертация, и т.д.
// Типы записей в словаре:
// 1 пользователи; 2 компьютеры; 3 приложения; 4 события;
// 5 метаданные; 6 серверы; 7 основные порты; 8 вспомогательные порты.
for i, name := range regexpLogRecord.SubexpNames() {
switch name {
case "DateTime":
// Подготоваливаем разные данные на основе даты записи в логе
recordTimeStamp := getTimestamp(findedData[i])
recordDate := getDate(findedData[i])
resultData = "@timestamp: " + recordTimeStamp + recordDate
result["@timestamp"] = recordTimeStamp
result["index_day_prefix"] = recordDate
result["id"] = findedData[i] + "-" + strconv.FormatInt(logOffset, 10)
case "Usr":
resultData = getDataFromDict(1, findedData[i])
if resultData != "not" {
findedData[i] = resultData
}
case "ComputerName":
resultData = getDataFromDict(2, findedData[i])
if resultData != "not" {
findedData[i] = resultData
}
case "ApplicationName":
resultData = getDataFromDict(3, findedData[i])
if resultData != "not" {
findedData[i] = resultData
}
case "Event":
resultData = getDataFromDict(4, findedData[i])
if resultData != "not" {
findedData[i] = resultData
// log.Println(">>>>>>>>",findedData[i], ExcludeEvent)
for _, word := range ExcludeEvent {
if resultData == word {
if Debug {
log.Println("The event", resultData, "coincides with the specified filter", word)
}
return nil
}
}
}
case "Metadata":
resultData = getDataFromDict(5, findedData[i])
if resultData != "not" {
findedData[i] = resultData
}
case "Server":
resultData = getDataFromDict(6, findedData[i])
if resultData != "not" {
findedData[i] = resultData
}
case "Port":
resultData = getDataFromDict(7, findedData[i])
if resultData != "not" {
findedData[i] = resultData
}
case "AddPort":
resultData = getDataFromDict(8, findedData[i])
if resultData != "not" {
findedData[i] = resultData
}
case "TranDuration":
// Транзакция в шестнадцатеричном виде {24371d9855260;3bd1316}
// {TranDurationTranNumber}
// - первый элемент число секунд с 01.01.0001 00:00:00 умноженное на 10000
// - второй номер транзакции
if findedData[i] != "0" {
transactionBeginTimeStamp := hex2int(findedData[i]) / 10000 - 62135607600
// fmt.Println(">>>>", findedData[i], hex2int(findedData[i]), transactionBeginTimeStamp)
resultData = getDuration(transactionBeginTimeStamp, findedData[1])
} else {
resultData = "0"
}
findedData[i] = resultData
case "TranNumber":
resultData = strconv.Itoa(hex2int(findedData[i]))
findedData[i] = resultData
default:
resultData = findedData[i]
}
if i != 0 && name != "" {
result[name] = findedData[i]
}
}
}
}
return result
}
// Проверка вхождения строки в срез (массив строк)
func implContains(sl []string, name string) bool {
// iterate over the array and compare given string to each element
for _, value := range sl {
if value == name {
return true
}
}
return false
}
// Запуск чтения файлов
func runOperations () {
var client *opensearch.Client
if InFile != "" {
tail(client, InFile, os.Stdout)
return
}
// получаем список файлов на момент запуска
filesList := getDirectoryContents(DirIn)
// получаем последний модифицированный файл
lastModifyFiles := getLastModifyFile(DirIn)
log.Println("Список новых файлов:", lastModifyFiles, len(lastModifyFiles))
// запускаем процесс обработки
if len(lastModifyFiles) == 0 {
log.Println("Новых файлов не найдено")
return
}
for _, fileName := range lastModifyFiles {
DictObjects = readDictFile(DictFile)
log.Println("Запускаем обработку файла", fileName)
if SendToEs {
EsIndexName = EsIndexPrefix + "-" + getDate(fileName)
client = esConnect()
esCreateIndex(EsIndexName, client)
}
go tail(client, fileName, os.Stdout)
}
// Получаем список новых файлов, и если они появились после запуска программы
// то запускаем процесс обработки
for {
newFilesList := getNewFilesFromDir(filesList, DirIn)
log.Println("Список новых файлов:", newFilesList)
// Если новых файлов нет засыпаем на 10 сек
if len(newFilesList) == 0 {
time.Sleep(10 * time.Second)
continue
}
if newFilesList[0] != "" {
for _, fileName := range newFilesList {
DictObjects = readDictFile(DictFile)
if SendToEs {
EsIndexName = EsIndexPrefix + "-" + getDate(fileName)
// esConnect()
esCreateIndex(EsIndexName, client)
}
// Добавляем имя файла в список полученный при старте (список
// обработанных файлов для корректного определения новых файлов
filesList = append(filesList, fileName)
// запускаем процесс чтения (обработки)
go tail(client, fileName, os.Stdout)
}
} else {
log.Println("Новых файлов не найдено")
time.Sleep(10 * time.Second)
}
time.Sleep(1 * time.Second)
}
log.Println("Завершение 'func runOperations()'")
}
func tail(client *opensearch.Client, fileName string, out io.Writer) {
var shutUp = false
var (
recBegin = false
outLine string
l string
// bulkCount int
// arrRecords []map[string]string
fOut *os.File
// matchedEnd bool
)
// bulkCount = 0
filesList := getDirectoryContents(DirIn)
filePath := filepath.Join(DirIn, fileName)
// Открываем временный файл и определяем позицию указателя в исходном файле
tempFile, offsetPossition := createTempFile(fileName)
// Определяем размер файла и сравниваем с позицией при открытии
// если равны значит файл открылся на конце
fInfo, _ := os.Stat(filePath)
fSize := fInfo.Size()
log.Println(filePath, "size:", fSize)
if offsetPossition == fSize {
log.Println("Достигнут конец файла", fileName, "Размер:", fSize, "Позиция:", offsetPossition)
}
f, err := os.Open(filePath)
if err != nil {
log.Println("Ошибка открытия файла:", filePath, err)
}
defer f.Close()
// Переместим указатель в файле на последнюю позицию при закрытии программы
pos, err := f.Seek(offsetPossition, 0)
if err != nil {
log.Printf("Seek error on %s: %s", fileName, err)
} else {
log.Println("Начинаем с позиции:", pos)
}
r := bufio.NewReader(f)
info, err := f.Stat()
if err != nil {
log.Println("Ошибка открытия файла bufio.NewReader:", filePath, err)
}
// Создаем файл для записи обработанных (выходных) данных
if WriteOutFile {
fOut = createOutFile(fileName)
// writeOutCSVHeader(fOut)
}
oldSize := info.Size()
// Регулярные выражения для определения начала и конца записи
regexpBeginRecord := regexp.MustCompile(`^\{\d{14},\w{1},$`)
regexpEndRecord := regexp.MustCompile(`^\},$`)
// var (й
// isPrefix bool = true
// // err error = nil
// line []byte
// )
for {
line, err := Readln(r)
// for line, _, err := r.ReadLine(); err != io.EOF; line, _, err = r.ReadLine() {
for err == nil {
// fmt.Println(line)
// определяем текущую позицию записи в файле и пишем в файл
logOffset, _ := f.Seek(0, os.SEEK_CUR)
writeTempFile(tempFile, logOffset)
// ------- обработка строк --------
l = strings.TrimSpace(string(line))
// Находим начало записи
matchedBegin := regexpBeginRecord.MatchString(l)
if matchedBegin {
recBegin = true
outLine = ""
}
if recBegin {
outLine = outLine + l
}
// Находим конец записи
matchedEnd := regexpEndRecord.MatchString(l)
if matchedEnd {
recBegin = false
outLine = strings.TrimSuffix(strings.TrimLeft(outLine, "{,"), "},")
if Debug {
log.Println(outLine)
}
// Парсим подготовленную строку
result := parseString(outLine, logOffset)
if Debug {
log.Println(len(result), result)
}
// Пишем выходные файлы
if WriteOutFile {
if OutFormat == "json" {
writeOutJSONFile(fOut, result)
} else {
writeOutCSVFile(fOut, result)
}
}
// Отправляем данные в ElasticSearch / OpenSearch
if SendToEs {
// if result["index_day_prefix"] == "" {
// continue
// }
indexName := EsIndexPrefix + "-" + result["index_day_prefix"]
// пакетная отправка данных в эластик
// addRes := esAddRecord(client, indexName, result)
go esAddRecord(client, indexName, result)
time.Sleep(TailSleep)
// time.Sleep(5 * time.Millisecond)
// Если запись не добавилась делаем повторную попытку через 5 сек.
// if !addRes {
// time.Sleep(5 * time.Second)
// addRes = esAddRecord(client, indexName, result)
// log.Println("Повторная попытка добавления записи:", result, addRes)
// }
}
writeTempFile(tempFile, logOffset)
if Debug {
log.Println(outLine)
log.Println(result)
}
outLine = ""
result = nil
}
// if prefix {
// fmt.Println(">",string(line))
// } else {
// fmt.Println(">>",string(line))
// }
// --------------------------------
line, err = Readln(r)
}
pos, err := f.Seek(0, io.SeekCurrent)
if err != nil {
log.Println("Ошибка определения позиции 'pos, err := f.Seek(0, io.SeekCurrent)' в файле", fileName, "error:", err)
// panic(err)
}
// Если задан файл с коммандной строки то выходим
if InFile != "" {
return
}
// Запускаем чтение каталога и получепние списка с новыми файлами
for {
time.Sleep(time.Second)
newinfo, err := f.Stat()
if err != nil {
log.Println("Ошибка ошибка получения статистики 'newinfo, err := f.Stat()' по файлу", fileName, "error:", err)
// panic(err)
}
newSize := newinfo.Size()
if newSize != oldSize {
if newSize < oldSize {
f.Seek(0, 0)
} else {
f.Seek(pos, io.SeekStart)
}
r = bufio.NewReader(f)
oldSize = newSize
break
}
// fmt.Println("eof")
newFilesList := getNewFilesFromDir(filesList, DirIn)
if len(newFilesList) != 0 {
if !implContains(newFilesList, fileName) {
shutUp = true
log.Println("Обнаружен новый файл", newFilesList, shutUp)
break
}
}
}
if shutUp {
break
}
}
if shutUp {
deleteTempFile(tempFile, fileName)
}
}
func createWorkDir() {
dirs := []string{DirIn, DirOut, DirTemp, DirLog}
for _, dir := range dirs {
// fmt.Println(dir)
if !directoryExists(dir) {
err := os.Mkdir(dir, 0755)
if err != nil {
log.Println(err)
} else {
log.Println("Создан каталог", dir)
}
}
}
}
// CloseIdleConnections()
func esConnect() *opensearch.Client {
// Initialize the Client with SSL/TLS enabled.
EsClient, err := opensearch.NewClient(opensearch.Config{
Transport: &http.Transport{
MaxIdleConns: 10,
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: 300 * time.Second,
IdleConnTimeout: 1 * time.Millisecond,
DisableKeepAlives: true,
// ForceAttemptHTTP2: true,
MaxConnsPerHost: 20,
DialContext: (&net.Dialer{
Timeout: 300 * time.Second,
// KeepAlive: 10 * time.Millisecond,
}).DialContext,
TLSClientConfig: &tls.Config{InsecureSkipVerify: EsSkipVerify},
},
Addresses: []string{EsUrl},
Username: EsUser,
Password: EsPassword,
})
if err != nil {
log.Println("cannot initialize", err)
os.Exit(1)
}
// Получение списка узлов кластера для параллельной работы
// вставка данных будет идти через все узлы
if EsDiscoverNode {
EsClient.DiscoverNodes()
}
if err != nil {
log.Println("Ошибка получения списка узлов кластера ES:", err)
os.Exit(1)
}
log.Println("Подключение к ElasticSearch:", EsClient)
return EsClient
}
// Создаем индекс в ES
func esCreateIndex(indexName string, client *opensearch.Client) {
mapping := strings.NewReader(IndexTemplate)
// Create an index with non-default settings.
createIndex := opensearchapi.IndicesCreateRequest{
Index: indexName,
Body: mapping,
}
createIndexResponse, err := createIndex.Do(context.Background(), client)
if err != nil {
log.Println("failed to create index ", err, createIndexResponse)
os.Exit(1)
}
fmt.Println(createIndexResponse.StatusCode)
if strings.Contains(createIndexResponse.String(), "already exists") {
log.Println(createIndexResponse.IsError())
log.Println("Index", indexName, "already exists")
} else if createIndexResponse.StatusCode == 200 {
log.Println("Создан индекс:", indexName)
}
}
func esAddRecordsBulk(client *opensearch.Client, indexName string, arrLine []map[string]string) {
// Perform bulk operations.
for _, obj := range arrLine {
aJson, err := json.Marshal(obj)
fmt.Println(aJson)
if err != nil{
log.Printf("Error encoding query: %s", err)
}
}
// blk, err := client.Bulk(
// strings.NewReader(`
// { "index" : { "_index" : "go-test-index1", "_id" : "2" } }
// { "title" : "Interstellar", "director" : "Christopher Nolan", "year" : "2014"}
// { "create" : { "_index" : "go-test-index1", "_id" : "3" } }
// { "title" : "Star Trek Beyond", "director" : "Justin Lin", "year" : "2015"}
// { "update" : {"_id" : "3", "_index" : "go-test-index1" } }
// { "doc" : {"year" : "2016"} }
// `),
// )
//
// if err != nil {
// fmt.Println("failed to perform bulk operations", err)
// os.Exit(1)
// }
// fmt.Println("Performing bulk operations")
// fmt.Println(blk)
}
// Вставка записи в индекс ES
func esAddRecord(client *opensearch.Client, indexName string, line map[string]string) bool {
// fmt.Println(">>>>>>>>>",indexName)
// a_json, err := json.Marshal(line)
if len(line) == 0 {
return false
}
var operResult bool
aJson, err := json.MarshalIndent(line, "", " ")
if err != nil {
log.Printf("Error encoding: %s", line, "with error %s", err)
}
if Debug {
fmt.Println(line)
}
document := strings.NewReader(string(aJson))
req := opensearchapi.IndexRequest{
Index: indexName,
Body: document,
}
insertResponse, err := req.Do(context.Background(), client)
// res, err := req.Do(context.Background(), client)
if err != nil {
log.Println("failed to insert document ", err, line)
// os.Exit(1)
operResult = false
} else {
operResult = true
}
if Debug {
log.Println(insertResponse)
}
defer insertResponse.Body.Close()
return operResult
}
// Читаем конфиг и определяем переменные
func readConfigFile(fileName string) {
cfg, err := ini.LoadSources(ini.LoadOptions{
IgnoreInlineComment: true,
}, fileName)
if err != nil {
log.Println("Ошибка чтения файла конфигурации ", fileName, ": ", err)
}
config := Config{}
cfg.MapTo(&config)
DirIn = cfg.Section("General").Key("dirIn").String()
DirOut = cfg.Section("General").Key("dirOut").String()
DirTemp = cfg.Section("General").Key("dirTemp").String()
DirLog = cfg.Section("General").Key("dirLog").String()
DictFile = cfg.Section("General").Key("dictFile").String()
ObjectTypes = cfg.Section("General").Key("objectTypes").String()
LogFileExtention = cfg.Section("General").Key("logFileExtention").String()
SendToEs = cfg.Section("ElasticSearch").Key("sendToEs").MustBool()
if SendToEs {
EsDiscoverNode = cfg.Section("ElasticSearch").Key("esDiscoverNode").MustBool()
EsUrl = cfg.Section("ElasticSearch").Key("esUrl").String()
EsUser = cfg.Section("ElasticSearch").Key("esUser").String()
EsPassword = cfg.Section("ElasticSearch").Key("esPassword").String()
EsSkipVerify = cfg.Section("ElasticSearch").Key("esSkipVerify").MustBool()
EsIndexPrefix = cfg.Section("ElasticSearch").Key("esIndexPrefix").String()
}
WriteOutFile = cfg.Section("Processing").Key("writeOutFile").MustBool()
OutFormat = cfg.Section("Processing").Key("outFormat").String()
TimeZoneOffset = cfg.Section("Processing").Key("timeZoneOffset").String()
Duration = cfg.Section("Processing").Key("readDuration").String()
ExcludeEvent = strings.Split(cfg.Section("Processing").Key("excludeEvent").String(), ";")
}
// Создание файла конфигурации
func writeConfigFile(fileName string) {
f, err := os.Create(fileName)
if err != nil {
log.Println("Create out file error:", err)
}
_, err = f.WriteString(ConfigTemplate)
if err != nil {
log.Println("Ошибка записи в выходной файл:", err)
} else {
log.Println("Создан файл конфигурации:", fileName)
}
}
// Создание каталога с конфигами
func createConfDir(dir string) {
if !directoryExists(dir) {
err := os.Mkdir(dir, 0700)
if err != nil {
log.Println(err)
} else {
log.Println("Создан каталог", dir)
}
}
}
const serviceName = "1C log processor"
const serviceDescription = "1C log processor"
type program struct{}
func (p program) Start(s service.Service) error {
log.Println("Служба", s.String(), "запущена")
go p.RunService()
return nil
}
func (p program) Stop(s service.Service) error {
log.Println("Служба", s.String(), "остановлена")
return nil
}
func (p program) RunService() {
for {
log.Println("Запускаем основной процесс")
runOperations()
time.Sleep(1 * time.Second)
}
}
func RunWinService() {
serviceConfig := &service.Config{
Name: serviceName,
DisplayName: serviceName,
Description: serviceDescription,
}
prg := &program{}
s, err := service.New(prg, serviceConfig)
if err != nil {
log.Println("Ошибка создания сервиса:", err.Error())
}
err = s.Run()
if err != nil {
log.Println("Ошибка запуска сервиса:", err.Error())
}
}
func main() {
// Читаем опции коммандной строки
flag.BoolVar(&RunAsWinService, "run-win-service", false, "Запуск как службы Windows")
flag.StringVar(&DirCfg, "dir-config", ".config", "Каталог для файлов настройки (DIR_CFG)")
flag.StringVar(&FileCfg, "config", "config.ini", "Файл настроек")
flag.BoolVar(&CreateFileCfg, "create-config", false, "Создать файл настроек")
flag.BoolVar(&Debug, "debug", false, "Выводить отладочную информацию")
flag.StringVar(&DirIn, "dir-in", "in", "Каталог для исходных файлов (DIR_IN)")
flag.StringVar(&DirOut, "dir-out", "out", "Каталог для обработанных файлов (DIR_OUT)")
flag.StringVar(&DirTemp, "dir-temp", "tmp", "Каталог для временных файлов (TEMP)")
flag.StringVar(&DirLog, "dir-worklog", "log", "Каталог для лога работы (DIR_LOG)")
flag.StringVar(&DictFile, "dict-file", "1Cv8.lgf", "Файл со словарём (DICT_FILE)")
flag.BoolVar(&SendToEs, "es-send", false, "Отправлять данные в ElasticSearch")
flag.StringVar(&EsUrl, "es-url", "", "Адрес узла Elastic Search (ELASTICSEARCH_URL)")
flag.StringVar(&EsUser, "es-user", "", "Имя пользователя Elastic Search (ELASTICSEARCH_USER)")
flag.StringVar(&EsPassword, "es-password", "", "Пароль пользователя Elastic Search (ELASTICSEARCH_PASSWORD)")
flag.StringVar(&EsIndexPrefix, "es-index-prefix", "", "Префикс имени индекса Elastic Search (ELASTICSEARCH_INDEX_PREFIX)")
flag.BoolVar(&EsDiscoverNode, "es-discover-node", false, "Получать список узлов кластера ElasticSearch")
// flag.IntVar(&EsBulkRecordsQuantity, "es-bulk-quantity", 10, "Количество записей в одном запросе для пакетной вставки")
// flag.BoolVar(&EsBulk, "es-bulk", false, "Пакетная вставка записей в Elastic Search")
flag.BoolVar(&EsSkipVerify, "es-skip-verify", false, "Пропустить проверку сертификатов при подключении к Elastic Search")
flag.BoolVar(&WriteOutFile, "write-out-file", false, "Запись обработанных данных в файл")
flag.StringVar(&OutFormat, "out-format", "csv", "Формат данных на выходе (csv, json) (OUT_FORMAT)")
flag.StringVar(&LogFileExtention, "log-file-ext", ".lgp", "Расширение файлов с журналами (обрабатываемых логов) (LOG_FILE_EXT)")
flag.StringVar(&ObjectTypes, "object-types", "1,2,3,4,5,6,7,8", "Список типов объектов словаря для выборки, разделённый запятой (OBJECT_TYPES)")
flag.StringVar(&TimeZoneOffset, "tz-offset", "+03:00", "Сдвиг по времени от UTC")
flag.StringVar(&Duration, "tail-sleep", "5ms", "Задержка чтения входного файла. Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h')")
flag.StringVar(&InFile, "file", "", "Имя файла для обработки. Если требуется обработать один файл")
flag.StringVar(&WorkLogOut, "worklog-out", "file", "Направление вывода журнала работы программы (console, file)")
flag.Parse()
configFile := filepath.Join(DirCfg, FileCfg)
if fileExists(configFile) {
readConfigFile(configFile)
log.Println("Используются настройки из файла конфгурации:", configFile)
} else {
log.Println("Не найден файл конфгурации:", configFile)
if CreateFileCfg {
createConfDir(DirCfg)
writeConfigFile(configFile)
}
}
// os.Exit(1)
// Установим задержку чтения входного файла
TailSleep, _ = time.ParseDuration(Duration)
// log.Println(DirCfg, DirIn, "send es:", SendToEs, OutFormat, WriteOutFile, EsUrl, "esuser:", EsUser, Duration, TailSleep)
// Проверка и установка переменных для работы
if DirIn == "" && os.Getenv("DIR_IN") == "" {
fmt.Println("Make sure environment variables `DIR_IN`, or used with '-dir-in' argument")
os.Exit(1)
} else if DirIn == "" && os.Getenv("DIR_IN") != "" {
DirIn = os.Getenv("DIR_IN")
}
if WriteOutFile {
if DirOut == "" && os.Getenv("DIR_OUT") == "" {
fmt.Println("Make sure environment variables `DIR_OUT`, or used with '-dir-out' argument")
os.Exit(1)
} else if DirOut == "" && os.Getenv("DIR_OUT") != "" {
DirOut = os.Getenv("DIR_OUT")
}
}
if os.Getenv("LOG_FILE_EXT") != "" {
LogFileExtention = os.Getenv("LOG_FILE_EXT")
}
// if os.Getenv("TEMP") != "" {
// DirTemp = os.Getenv("TEMP")
// }
if os.Getenv("DIR_LOG") != "" {
DirLog = os.Getenv("DIR_LOG")
}
// Проверяем параметры и переменные окружения для работы с ElasticSearch / OpenSearch
if SendToEs {
if EsUser == "" && os.Getenv("ELASTICSEARCH_USER") == "" {
fmt.Println("Make sure environment variables `ELASTICSEARCH_USER=\"es_user_name\"` or used with '-es-user' argument")
os.Exit(1)
} else if EsUser == "" && os.Getenv("ELASTICSEARCH_USER") != "" {
EsUser = os.Getenv("ELASTICSEARCH_USER")
}
if EsPassword == "" && os.Getenv("ELASTICSEARCH_PASSWORD") == "" {
fmt.Println("Make sure environment variables `ELASTICSEARCH_PASSWORD or used with '-es-password' argument")
os.Exit(1)
} else if EsPassword == "" && os.Getenv("ELASTICSEARCH_PASSWORD") != "" {
EsPassword = os.Getenv("ELASTICSEARCH_PASSWORD")
}
if EsUrl == "" && os.Getenv("ELASTICSEARCH_URL") == "" {
fmt.Println("Make sure environment variables `ELASTICSEARCH_URL or used with '-es-url' argument")
os.Exit(1)
} else if EsUrl == "" && os.Getenv("ELASTICSEARCH_URL") != "" {
EsUrl = os.Getenv("ELASTICSEARCH_URL")
}
if EsIndexPrefix == "" && os.Getenv("ELASTICSEARCH_INDEX_PREFIX") == "" {
fmt.Println("Make sure environment variables `ELASTICSEARCH_INDEX_PREFIX` or used with '-es-index-prefix' argument")
os.Exit(1)
} else if EsIndexPrefix == "" && os.Getenv("ELASTICSEARCH_INDEX_PREFIX") != "" {
EsIndexPrefix = os.Getenv("ELASTICSEARCH_INDEX_PREFIX")
}
}
createWorkDir()
// Определим куда выводим лог работы
switch WorkLogOut {
case "file":
// Установка и открытие лога для программы
fLog, err := os.OpenFile(filepath.Join(DirLog, "1c-log-processor.log"), os.O_RDWR | os.O_CREATE | os.O_APPEND, 0666)
if err != nil {
fmt.Printf("error opening log file: %v", err)
}
defer fLog.Close()
log.SetOutput(fLog)
case "console":
log.SetOutput(os.Stdout)
}
// if Debug {
// log.SetOutput(os.Stdout)
// } else {
// log.SetOutput(fLog)
// }
// log.SetOutput(fLog)
// readDictFile(dictFile)
DictObjects = readDictFile(DictFile)
// Получаем временную зону и часовой пояс
// Если зону возвращает UTC то присваеваем вручную из опций командной строки
// Хреновый хак но пока решения нет. Так как на винде возвращает почему-то нули
LocalTimeZone, LocalTZOffset = getLocalTimeZone()
if LocalTZOffset == "+00:00" {
LocalTZOffset = TimeZoneOffset
}
log.Println(DirCfg, FileCfg, DirIn, DirOut, DirTemp, DirLog, DictFile)
// Запуск как сервиса
if RunAsWinService {
RunWinService()
} else {
runOperations()
}
log.Println("Работа программы завершена")
}