// ---------------------------------------------------------------------- // Программа для анализа и записи журналов 1С // Версия: 1.2.0 // Автор: Сергей Калинин, svk@nuk-svk.ru // https://git.nuk-svk.ru/svk/1c-logprocessor // Лицензия: GPLv3 // ---------------------------------------------------------------------- // Использование log-processor: // см. README.md // ------------------------------------------------------------------------- // ELASTICSEARCH_URL="https://user:pass@elastic:9200" //------------------------- ------------------------------------------------ package main import ( "bufio" "fmt" "log" "os" "flag" "time" "io" // "runtime" "path/filepath" "regexp" "strings" "strconv" "encoding/json" // "github.com/hpcloud/tail" "net" "sort" "crypto/tls" "net/http" "context" "gopkg.in/ini.v1" opensearch "github.com/opensearch-project/opensearch-go/v2" opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" "github.com/kardianos/service" ) const ( Version = "1.2.0" ) var ( DirCfg string FileCfg string CreateFileCfg bool DirIn string DirOut string DirTemp string DirLog string ObjectTypes string DictObjects map[int]map[int]string OpenedFiles map[string]chan string LogFileExtention string SendToEs bool EsUrl string EsUser string EsPassword string EsIndexPrefix string EsIndexName string EsSkipVerify bool EsDiscoverNode bool Debug bool OutFormat string WriteOutFile bool EsClient *opensearch.Client EsBulk bool EsBulkRecordsQuantity int LocalTZOffset string LocalTimeZone string TimeZoneOffset string Duration string DictFile string TailSleep time.Duration RunAsWinService bool ExcludeEvent []string InFile string WorkLogOut string ServiceName string // PrintVersion bool ) type Config struct { DirCfg string `ini:"dirCfg"` DirIn string `ini:"dirIn"` DirOut string `ini:"dirOut"` DirTemp string `ini:"dirTemp"` DirLog string `ini:"dirLog"` DictFile string `ini:"dictFile"` ObjectTypes string `ini:"objectTypes"` LogFileExtention string `ini:"logFileExtention"` SendToEs bool `ini:"sendToEs"` EsUrl string `ini:"esUrl"` EsUser string `ini:"esUser"` EsPassword string `ini:"esPassword"` EsIndexPrefix string `ini:"esIndexPrefix"` EsSkipVerify bool `ini:"esSkipVerify"` EsBulk bool `ini:"esBulk"` EsBulkRecordsQuantity int `ini:"esBulkQuantity"` OutFormat string `ini:"outFormat"` WriteOutFile bool `ini:"writeOutFile"` TimeZoneOffset string `ini:"timeZoneOffset"` Duration string `ini:"Duration"` ExcludeEvent []string `ini:"excludeEvent"` ServiceName string `ini:"serviceName"` } type Key struct { ObjectType int ObjectNumber int } type OutRecord struct { Timestamp string `json:"@timestamp"` DateTime string `json:"log_datetime"` TranNumber int `json:"TranNumber"` TranDuration int `json:"TranDuration"` TranStatus string `json:"TranStatus"` Usr string `json:"Usr"` ComputerName string `json:"ComputerName"` ApplicationName string `json:"ApplicationName"` ConnectionID int `json:"ConnectionID"` Event string `json:"Event"` Importance string `json:"Importance"` Comment string `json:"Comment"` Metadata string `json:"Metadata"` Data string `json:"Data"` PresentData string `json:"PresentData"` Server string `json:"Server"` Port int `json:"Port"` AddPort int `json:"AddPort"` SessionID int `json:"SessionID"` ID string `json:"id"` IndexDayPrefix string `json:"index_day_prefix"` // OtherData1 string `json:"OtherData1"` // OtherData2 string `json:"OtherData1"` } var IndexTemplate = `{ "settings" : { }, "mappings" : { "properties" : { "@timestamp" : { "type" : "date" }, "Comment" : { "type" : "text" }, "AddPort" : { "type" : "keyword" }, "ApplicationName" : { "type" : "keyword" }, "TranDuration" : { "type" : "long" }, "Server" : { "type" : "keyword" }, "PresentData" : { "type" : "text", "fields" : { "keyword" : { "ignore_above" : 256, "type" : "keyword" } } }, "Port" : { "type" : "keyword" }, "Metadata" : { "type" : "keyword" }, "Data" : { "type" : "text" }, "Importance" : { "type" : "keyword" }, "ConnectionID" : { "type" : "keyword" }, "Usr" : { "type" : "keyword" }, "ComputerName" : { "type" : "keyword" }, "TranStatus" : { "type" : "keyword" }, "Event" : { "type" : "keyword" }, "TranNumber" : { "type" : "keyword" }, "SessionID" : { "type" : "keyword" } } } }` // Шаблон файла конфигурации var ConfigTemplate = `[General] ; Каталог для входящих данных dirIn=in ; Каталог для исходящих данных dirOut=out ; Каталог для временных файлов dirTemp=tmp ; Каталог для журнала работы программы dirLog=log ; Имя файла словаря dictFile=1Cv8.lgf ; Список объектов для выборки из словаря objectTypes=1,2,3,4,5,6,7,8 ; Расширение файлов журналов 1С (входящих) logFileExtention=.lgp ; Наименование службы windows serviceName=1C Log Processor [ElasticSearch] ; Включить отправку данных в ElasticSearch sendToEs=false ; Получать список узлов кластера ElasticSearch esDiscoverNode=true ; Адрес сервера ElasticSearch esUrl=https://elastic:9200 ; Пользователь для подключения к ElasticSearch esUser=user_name ; Пароль пользователя ElasticSearch esPassword=user_password ; Отключить проверку SSL сертификатов при подключении esSkipVerify=false ; Префикс индекса в ElasticSearch esIndexPrefix=test_log ; Пакетная вставка данных esBulk=true ; Количество записей в одном "пакете" esBulkRecordsQuantity=10 [Processing] ; Включение вывода обработанной информации в файл writeOutFile=false ; Формат выходного файла outFormat=csv ; Сдвиг времени (временная зона) timeZoneOffset=+03:00 ; Задержка чтения входного файла между строками ; Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h' readDuration=5ms ; Список событий (поле "Event"), исключенных из выдачи, разделенный ";" excludeEvent= ` // Список файлов в каталоге func getDirectoryContents(dir string) []string { var names []string entries, err := os.ReadDir(dir); if err != nil { log.Println("Ошибка получения содержимого каталога: ", dir, err); } else { if Debug { log.Println("Get the directory content:", dir) } } //iterate through the directory entities and print out their name. for _, entry := range(entries) { if filepath.Ext(entry.Name()) == LogFileExtention { // fmt.Println(entry.Name()); if Debug { log.Println("Finded file:", entry.Name()) } names = append(names, entry.Name()) } } return names } func getLastModifyFile(dir string) []string { files, err := os.ReadDir(dir) if err != nil { fmt.Println(err) os.Exit(1) } var modTime time.Time var names []string for _, fi := range files { // // fmt.Println(fi) // names = append(names, fi.Name()) if Debug { log.Println(LogFileExtention, filepath.Ext(fi.Name())) } if filepath.Ext(fi.Name()) == LogFileExtention { fileInfo, _ := fi.Info() if Debug { log.Println("Finding file info:", fileInfo) } // Проверяем что это файл if fileInfo.Mode().IsRegular() { if Debug { log.Println("Найден файл:",fi.Name(), modTime, fileInfo.ModTime(), fileInfo.ModTime().Before(modTime)) } dt := time.Now() if Debug { log.Println("Текущая Дата:",dt, dt.Format("2006-01-02")) log.Println("Дата файла:", fi.Name(), getDate(fi.Name())) } // Сравниваем имя последнего файла с текущей датой // и если равны - запускаем процесс разборки if getDate(fi.Name()) == dt.Format("2006-01-02") { names = append(names, fi.Name()) log.Println("Текущая дата и дата файла совпадают:", getDate(fi.Name()), dt.Format("2006-01-02")) } // Проверяем что дата модификации файла не раньше текущей // if !fileInfo.ModTime().Before(modTime) { // log.Println(">>>",fi.Name(), modTime, fileInfo.ModTime(), fileInfo.ModTime().After(modTime)) // if fileInfo.ModTime().After(modTime) { // modTime = fileInfo.ModTime() // // names = names[:0] // names = append(names, fi.Name()) // log.Println(names) // } // dt := time.Now() // if Debug { // log.Println("Текущая Дата:",dt, dt.Format("2006-01-02")) // log.Println("Дата файла:", fi.Name(), getDate(fi.Name())) // } // // Сравниваем имя последнего файла с текущей датой // // и если равны - запускаем процесс разборки // // if getDate(fi.Name()) == dt.Format("2006-01-02") { // // names = append(names, fi.Name()) // // log.Println("Текущая дата и дата файла совпадают:", getDate(fi.Name()), dt.Format("2006-01-02")) // // } // } if Debug { log.Println("Finding file modify time:", fileInfo.ModTime()) log.Println("Last file modify time:", modTime) log.Println("Current time:", fileInfo.ModTime().Before(modTime), fileInfo.ModTime().After(modTime)) } } else { if Debug { log.Println("Finding file was not regular:", fi) } } } } log.Println(names) return names } func getNewFilesFromDir(filesList []string, dir string) []string { var names []string newFilesList := getDirectoryContents(dir) if len(newFilesList) > 0 { // log.Println("Получаем список файлов", newFilesList) for _, newFileName := range newFilesList { if !implContains(filesList, newFileName) { fmt.Println(newFileName, implContains(filesList, newFileName)) names = append(names, newFileName) } } } return names } // Readln returns a single line (without the ending \n) // from the input buffered reader. // An error is returned iff there is an error with the // buffered reader. func Readln(r *bufio.Reader) (string, error) { var (isPrefix bool = true err error = nil line, ln []byte ) for isPrefix && err == nil { line, isPrefix, err = r.ReadLine() ln = append(ln, line...) } return string(ln),err } // func Readln(r *bufio.Reader) (string, error){ // result, err := r.ReadString('\n') // // log.Println(result) // ln := result // return ln, err // } func hex2int(hexStr string) int { // base 16 for hexadecimal result, err := strconv.ParseInt(hexStr, 16, 64) if err != nil { log.Printf("error converting value: %v, %v\n",hexStr, err) return 0 } return int(result) } func readDictFile(fileName string) map[int]map[int]string { var ( // arr map[Key]string recBegin bool // recEnd bool outLine string l string res map[int]map[int]string ) res = make(map[int]map[int]string) objectTypesList := strings.Split(ObjectTypes, ",") f, err := os.Open(fileName) if err != nil { log.Printf("error opening file: %v\n",err) os.Exit(1) } else { if Debug { log.Println("Opening file:", fileName) } } r := bufio.NewReader(f) s, e := Readln(r) // регулярное выражение для получения данных regexpSplitData := regexp.MustCompile(`\{(?P\d+),(.*,|)(?P.+),(?P\d+)\},$`) // регулярное выражение определения однострочной записи regexpString := regexp.MustCompile(`^\{\d+,.+,\d+\},$`) // регулярное выражение для многострочных записей соответстве нно начало строки и конец // regexpMultilineBegin := regexp.MustCompile(`^.+(\},)$`) regexpMultilineEnd := regexp.MustCompile(`^.+},$`) for e == nil { // log.Println(s) s,e = Readln(r) l = strings.TrimSpace(s) // strings.TrimRight(outLine, ",") // regexp.MustCompile("[*,%_]{1}").Split(refString, -1) // Обработка многострочных записей matchedString := regexpString.MatchString(l) // matchedBegin := regexpMultilineBegin.MatchString(l) matchedEnd := regexpMultilineEnd.MatchString(l) // fmt.Println(matchedBegin) if matchedString == true && recBegin == false{ findedData := regexpSplitData.FindStringSubmatch(l) if findedData != nil { result := make(map[string]string) for i, name := range regexpSplitData.SubexpNames() { // log.Println(i, name, findedData[i]) if i != 0 && name != "" { result[name] = findedData[i] } } if implContains(objectTypesList, result["objType"]) == false { continue } objTypeInt, _ := strconv.Atoi(result["objType"]) objNumberInt, _ := strconv.Atoi(result["objNumber"]) if res[objTypeInt] == nil { res[objTypeInt] = make(map[int]string) } res[objTypeInt][objNumberInt] = strings.Trim(result["objData"], "{}\"") // log.Println(">>",objTypeInt, objNumberInt, result["objData"]) result = nil } } if matchedEnd == false { recBegin = true // log.Println("multilineline begin") } if recBegin { outLine = outLine + l } if recBegin && matchedEnd { recBegin = false s = outLine outLine = "" findedData := regexpSplitData.FindStringSubmatch(s) if findedData != nil { result := make(map[string]string) for i, name := range regexpSplitData.SubexpNames() { // log.Println(i, name, findedData[i]) if i != 0 && name != "" { result[name] = findedData[i] } } if implContains(objectTypesList, result["objType"]) == false { continue } objTypeInt, _ := strconv.Atoi(result["objType"]) objNumberInt, _ := strconv.Atoi(result["objNumber"]) if res[objTypeInt] == nil { res[objTypeInt] = make(map[int]string) } res[objTypeInt][objNumberInt] = strings.Trim(result["objData"], "{}") // log.Println("<<",objTypeInt, objNumberInt, result["objData"]) } } // time.Sleep(1 * time.Second) } return res } // Создаем выходной файл. Если он есть открываем на запись // и перемещаем указатель в конец файла func createOutFile(fileName string) *os.File { var ( fOut *os.File err error ) fileOutPath := filepath.Join(DirOut, fileName) if fileExists(fileOutPath) { fOut, err = os.OpenFile(fileOutPath, os.O_RDWR, 0644) if err != nil { log.Println("Open out file", fileOutPath, "error:", err) } } else { fOut, err = os.Create(fileOutPath) if err != nil { log.Println("Create out file", fileOutPath, "error:", err) } } // defer fOut.Close() fOutInfo, _ := os.Stat(fileOutPath) fOutSize := fOutInfo.Size() _, err = fOut.Seek(fOutSize, 0) if err != nil { log.Printf("Seek error on %s: %s", fileOutPath, err) } return fOut } func writeOutCSVFile(outChannel *os.File, m map[string]string) { var result string if len(m) == 0 { return } keys := make([]string, 0, len(m)) for k := range m { keys = append(keys, k) } sort.Strings(keys) for _, k := range keys { // strings.Join(m[k], ";") result = result + m[k] result = result + ";" } // fmt.Println(result) result = result + "\n" _, err := outChannel.WriteString(result) if err != nil { log.Println("Ошибка записи в выходной файл:", err) } } // func writeOutCSVHeader(outChannel *os.File) { // var result string // var m OutRecord // keys := make([]string, 0, len(m)) // for k := range m { // keys = append(keys, k) // } // sort.Strings(keys) // // for _, k := range keys { // // strings.Join(m[k], ";") // result = result + k // result = result + ";" // } // // fmt.Println(result) // result = result + "\n" // _, err := outChannel.WriteString(result) // if err != nil { // log.Println("Ошибка записи в выходной файл:", err) // } // } // func writeOutJSONFile(outChannel *os.File, line map[string]string) { // fmt.Println(">>>>>>>>>",line) // a_json, err := json.Marshal(line) if len(line) == 0 { return } a_json, err := json.MarshalIndent(line, "", " ") if err != nil { log.Printf("Error encoding: %s", line, "with error %s", err) } if err != nil{ log.Printf("Error encoding query: %s", err) } outChannel.Write(a_json) } func getDataFromDict(dictType int, dictIndex string) string { i, err := strconv.Atoi(dictIndex) if err != nil { log.Println("func getDataFromDict, string to int converted error:", dictIndex) return "not" } return DictObjects[dictType][i] } // Функция возвращает дату в формате YYYY-MM-DD func getDate(dateString string) string { t := fmt.Sprintf("%s-%s-%s", dateString[0:4], dateString[4:6], dateString[6:8]) return t } // Получение временной зоны и часового пояса // например: MSK, +03:00 func getLocalTimeZone() (string, string) { zone, _ := time.Now().Zone() zoneBounds, _ := time.Now().ZoneBounds() l := strings.Fields(zoneBounds.String()) zoneOffsetArr := strings.Split(l[2], "") zoneOffset := fmt.Sprintf("%s%s%s:%s%s", zoneOffsetArr[0], zoneOffsetArr[1], zoneOffsetArr[2], zoneOffsetArr[3], zoneOffsetArr[4]) // fmt.Println(zone) // fmt.Println(zoneOffset) return zone, zoneOffset } // Перобразование строки с датой в отпечаток времени (timestamp) для Elasticsearch // "2023-01-03T10:15:30+03:00" func getTimestamp(dateString string) string { // t := fmt.Sprintf("%s-%s-%s %s:%s:%s %s", dateString[0:4], dateString[4:6], dateString[6:8], dateString[8:10], dateString[10:12], dateString[12:14], zone) // t := fmt.Sprintf("%s-%s-%sT%s:%s:%sZ%s", dateString[0:4], dateString[4:6], dateString[6:8], dateString[8:10], dateString[10:12], dateString[12:14], LocalTZOffset) t := fmt.Sprintf("%s-%s-%sT%s:%s:%s%s", dateString[0:4], dateString[4:6], dateString[6:8], dateString[8:10], dateString[10:12], dateString[12:14], LocalTZOffset) // return "2023-01-03T10:15:30+03:00" return t } // Получение времени длительности транзакции func getDuration(transactionTime int, recordTimeStamp string) string { if transactionTime == 0 { return "0" } beginTime := time.Unix(int64(transactionTime), 0) zone, _ := beginTime.Zone() // fmt.Println("The second time is", beginTime, zone, offset) t := fmt.Sprintf("%s-%s-%s %s:%s:%s %s", recordTimeStamp[0:4], recordTimeStamp[4:6], recordTimeStamp[6:8], recordTimeStamp[8:10], recordTimeStamp[10:12], recordTimeStamp[12:14], zone) endTime, err := time.Parse("2006-01-02 15:04:05 MST", t) if err != nil { log.Println("Ошибка преобразования времени 'func getDuration()':", endTime, err) } duration := endTime.Unix() - int64(transactionTime) // fmt.Println(transactionTime, recordTimeStamp, t, duration) return strconv.Itoa(int(duration)) } func fileExists(filename string) bool { info, err := os.Stat(filename) if os.IsNotExist(err) { return false } return !info.IsDir() } func directoryExists(dir string) bool { info, err := os.Stat(dir) if os.IsNotExist(err) { return false } return info.IsDir() } func createTempFile(fileName string) (*os.File, int64) { var ( tempFile *os.File offsetPosition int64 ) // Создаем временный файл для сохранения текущей позиции в обрабатываемом файле tempFileName := fileName + ".tmp" tempFileFullPath := filepath.Join(DirTemp, tempFileName) // tempFile, err := os.OpenFile(tempFileFullPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) tempFile, err := os.OpenFile(tempFileFullPath, os.O_RDWR|os.O_CREATE, 0644) if err != nil { log.Println("Open temp file", tempFileFullPath, "error:", err) } // tempFile, err := os.OpenFile(tempFileFullPath, os.O_RDWR, 0644) tempFileBuf := bufio.NewReader(tempFile) offsetPositionStr, err := Readln(tempFileBuf) // fmt.Println("--->", offsetPositionStr, "<---") // os.Exit(1) if offsetPositionStr != "" { offsetPosition, _ = strconv.ParseInt(offsetPositionStr, 10, 64) if err != nil { log.Println("Unable convert value:", offsetPositionStr, err) } } // defer tempFile.Close() return tempFile, offsetPosition } // Запись временного файла с указателем на позицию в обрабатыввакемом файле // чтобы в случае переоткрытия не обрабатывать повторно уже обработанные данные func writeTempFile(outChannel *os.File, offset int64) { // line = line + "\n" // Перемещаем указательь на начало файла и перезаписываем строку outChannel.Truncate(0) _, err := outChannel.Seek(0, 0) str := strconv.FormatInt(offset, 10) _, err = outChannel.WriteString(str) if err != nil { log.Println("Ошибка записи временного файла:", err) } } // Удаление временного файла func deleteTempFile(tempFile *os.File, fileName string) { tempFile.Close() // Удаляем временный файл tempFileName := fileName + ".tmp" e := os.Remove(filepath.Join(DirTemp, tempFileName)) if e != nil { log.Println("Eror with remove file", tempFileName, e) } else { log.Println("Удалён временный файл:", tempFileName) } } // (\d{14}),(\w{1}),\{(\w+),(?\w+)\},(\d+),(\d+),(\d+),(\d+),(\d+),(\w{1}),\"(.*)\",(\d+),\{(.+)\},\"(.*)\",(\d+),(\d+),(\d+),(\d+),(\d{1}),\{(\d|,)+\} func parseString (outLine string, logOffset int64) map[string]string { var ( resultData string result map[string]string ) result = nil regexpLogRecord := regexp.MustCompile(`(?P\d{14}),(?P\w{1}),\{(?P\w+),(?P\w+)\},(?P\d+),(?P\d+),(?P\d+),(?P\d+),(?P\d+),(?P\w{1}),\"(?P.*)\",(?P\d+),\{(?P.+)\},\"(?P.*)\",(?P\d+),(?P\d+),(?P\d+),(?P\d+),(?P\d{1}),\{(\d|,)+\}$`) // Полную строку прогоняем через регексп и распихиваем сразу по полям matchedOut := regexpLogRecord.MatchString(outLine) // if Debug { // log.Println(outLine) // } if matchedOut { findedData := regexpLogRecord.FindStringSubmatch(outLine) if findedData != nil { // result := make(map[string]string) result = make(map[string]string) // В зависимости от поля (типа) производим те или иные операции // подстановка из словаря, конвертация, и т.д. // Типы записей в словаре: // 1 – пользователи; 2 – компьютеры; 3 – приложения; 4 – события; // 5 – метаданные; 6 – серверы; 7 – основные порты; 8 – вспомогательные порты. for i, name := range regexpLogRecord.SubexpNames() { switch name { case "DateTime": // Подготоваливаем разные данные на основе даты записи в логе recordTimeStamp := getTimestamp(findedData[i]) recordDate := getDate(findedData[i]) resultData = "@timestamp: " + recordTimeStamp + recordDate result["@timestamp"] = recordTimeStamp result["index_day_prefix"] = recordDate result["id"] = findedData[i] + "-" + strconv.FormatInt(logOffset, 10) case "Usr": resultData = getDataFromDict(1, findedData[i]) if resultData != "not" { findedData[i] = resultData } case "ComputerName": resultData = getDataFromDict(2, findedData[i]) if resultData != "not" { findedData[i] = resultData } case "ApplicationName": resultData = getDataFromDict(3, findedData[i]) if resultData != "not" { findedData[i] = resultData } case "Event": resultData = getDataFromDict(4, findedData[i]) if resultData != "not" { findedData[i] = resultData // log.Println(">>>>>>>>",findedData[i], ExcludeEvent) for _, word := range ExcludeEvent { if resultData == word { if Debug { log.Println("The event", resultData, "coincides with the specified filter", word) } return nil } } } case "Metadata": resultData = getDataFromDict(5, findedData[i]) if resultData != "not" { findedData[i] = resultData } case "Server": resultData = getDataFromDict(6, findedData[i]) if resultData != "not" { findedData[i] = resultData } case "Port": resultData = getDataFromDict(7, findedData[i]) if resultData != "not" { findedData[i] = resultData } case "AddPort": resultData = getDataFromDict(8, findedData[i]) if resultData != "not" { findedData[i] = resultData } case "TranDuration": // Транзакция в шестнадцатеричном виде {24371d9855260;3bd1316} // {TranDurationTranNumber} // - первый элемент – число секунд с 01.01.0001 00:00:00 умноженное на 10000 // - второй – номер транзакции if findedData[i] != "0" { transactionBeginTimeStamp := hex2int(findedData[i]) / 10000 - 62135607600 // fmt.Println(">>>>", findedData[i], hex2int(findedData[i]), transactionBeginTimeStamp) resultData = getDuration(transactionBeginTimeStamp, findedData[1]) } else { resultData = "0" } findedData[i] = resultData case "TranNumber": resultData = strconv.Itoa(hex2int(findedData[i])) findedData[i] = resultData default: resultData = findedData[i] } if i != 0 && name != "" { result[name] = findedData[i] } } } } return result } // Проверка вхождения строки в срез (массив строк) func implContains(sl []string, name string) bool { // iterate over the array and compare given string to each element for _, value := range sl { if value == name { return true } } return false } // Запуск чтения файлов func runOperations () { var client *opensearch.Client if InFile != "" { if SendToEs { client = esConnect() EsIndexName = EsIndexPrefix + "-" + getDate(InFile) esCreateIndex(EsIndexName, client) } tail(client, InFile, os.Stdout) return } // получаем список файлов на момент запуска filesList := getDirectoryContents(DirIn) // получаем последний модифицированный файл lastModifyFiles := getLastModifyFile(DirIn) log.Println("Список новых файлов:", lastModifyFiles, len(lastModifyFiles)) // запускаем процесс обработки if len(lastModifyFiles) == 0 { log.Println("Новых файлов не найдено") return } for _, fileName := range lastModifyFiles { DictObjects = readDictFile(DictFile) log.Println("Запускаем обработку файла", fileName) if SendToEs { EsIndexName = EsIndexPrefix + "-" + getDate(fileName) client = esConnect() esCreateIndex(EsIndexName, client) } go tail(client, fileName, os.Stdout) } // Получаем список новых файлов, и если они появились после запуска программы // то запускаем процесс обработки for { newFilesList := getNewFilesFromDir(filesList, DirIn) log.Println("Список новых файлов:", newFilesList) // Если новых файлов нет засыпаем на 10 сек if len(newFilesList) == 0 { time.Sleep(10 * time.Second) continue } if newFilesList[0] != "" { for _, fileName := range newFilesList { DictObjects = readDictFile(DictFile) if SendToEs { EsIndexName = EsIndexPrefix + "-" + getDate(fileName) // esConnect() esCreateIndex(EsIndexName, client) } // Добавляем имя файла в список полученный при старте (список // обработанных файлов для корректного определения новых файлов filesList = append(filesList, fileName) // запускаем процесс чтения (обработки) go tail(client, fileName, os.Stdout) } } else { log.Println("Новых файлов не найдено") time.Sleep(10 * time.Second) } time.Sleep(1 * time.Second) } log.Println("Завершение 'func runOperations()'") } func tail(client *opensearch.Client, fileName string, out io.Writer) { var shutUp = false var ( recBegin = false outLine string l string bulkCount int arrRecords []map[string]string fOut *os.File indexName string // matchedEnd bool ) // bulkCount = 0 filesList := getDirectoryContents(DirIn) filePath := filepath.Join(DirIn, fileName) // Открываем временный файл и определяем позицию указателя в исходном файле tempFile, offsetPossition := createTempFile(fileName) // Определяем размер файла и сравниваем с позицией при открытии // если равны значит файл открылся на конце fInfo, _ := os.Stat(filePath) fSize := fInfo.Size() log.Println(filePath, "size:", fSize) if offsetPossition == fSize { log.Println("Достигнут конец файла", fileName, "Размер:", fSize, "Позиция:", offsetPossition) } f, err := os.Open(filePath) if err != nil { log.Println("Ошибка открытия файла:", filePath, err) } defer f.Close() // Переместим указатель в файле на последнюю позицию при закрытии программы pos, err := f.Seek(offsetPossition, 0) if err != nil { log.Printf("Seek error on %s: %s", fileName, err) } else { log.Println("Начинаем с позиции:", pos) } r := bufio.NewReader(f) info, err := f.Stat() if err != nil { log.Println("Ошибка открытия файла bufio.NewReader:", filePath, err) } // Создаем файл для записи обработанных (выходных) данных if WriteOutFile { fOut = createOutFile(fileName) // writeOutCSVHeader(fOut) } oldSize := info.Size() // Регулярные выражения для определения начала и конца записи regexpBeginRecord := regexp.MustCompile(`^\{\d{14},\w{1},$`) regexpEndRecord := regexp.MustCompile(`^\},$`) for { line, err := Readln(r) // for line, _, err := r.ReadLine(); err != io.EOF; line, _, err = r.ReadLine() { for err == nil { // fmt.Println(line) // определяем текущую позицию записи в файле и пишем в файл logOffset, _ := f.Seek(0, os.SEEK_CUR) writeTempFile(tempFile, logOffset) // ------- обработка строк -------- l = strings.TrimSpace(string(line)) // Находим начало записи matchedBegin := regexpBeginRecord.MatchString(l) if matchedBegin { recBegin = true outLine = "" } if recBegin { outLine = outLine + l } // Находим конец записи matchedEnd := regexpEndRecord.MatchString(l) if matchedEnd { recBegin = false outLine = strings.TrimSuffix(strings.TrimLeft(outLine, "{,"), "},") if Debug { log.Println(outLine) } // Парсим подготовленную строку result := parseString(outLine, logOffset) if Debug { log.Println(len(result), result) log.Println("Префикс индекса:", EsIndexPrefix, result["index_day_prefix"]) } // Пишем выходные файлы if WriteOutFile { if OutFormat == "json" { writeOutJSONFile(fOut, result) } else { writeOutCSVFile(fOut, result) } } // Отправляем данные в ElasticSearch / OpenSearch if SendToEs { if result["index_day_prefix"] != "" { indexName = EsIndexPrefix + "-" + result["index_day_prefix"] } // if Debug { // log.Println("Index name:", indexName) // // log.Println("Пакетная обработка:", EsBulk) // } // Пакетная обработка, запускается если выставлена опция командной строки // иначе отправляется по одной строке if EsBulk { if bulkCount < EsBulkRecordsQuantity { if len(result) != 0 { arrRecords = append(arrRecords, result) bulkCount++ } } else { go esAddRecordsBulk(client, indexName, arrRecords) bulkCount = 0 arrRecords = nil } } else { go esAddRecord(client, indexName, result) } time.Sleep(TailSleep) } writeTempFile(tempFile, logOffset) if Debug { log.Println(outLine) log.Println(result) } outLine = "" result = nil } // if prefix { // fmt.Println(">",string(line)) // } else { // fmt.Println(">>",string(line)) // } // -------------------------------- line, err = Readln(r) } pos, err := f.Seek(0, io.SeekCurrent) if err != nil { log.Println("Ошибка определения позиции 'pos, err := f.Seek(0, io.SeekCurrent)' в файле", fileName, "error:", err) // panic(err) } // Если задан файл с коммандной строки то выходим if InFile != "" { return } // Запускаем чтение каталога и получепние списка с новыми файлами for { time.Sleep(time.Second) newinfo, err := f.Stat() if err != nil { log.Println("Ошибка ошибка получения статистики 'newinfo, err := f.Stat()' по файлу", fileName, "error:", err) // panic(err) } newSize := newinfo.Size() if newSize != oldSize { if newSize < oldSize { f.Seek(0, 0) } else { f.Seek(pos, io.SeekStart) } r = bufio.NewReader(f) oldSize = newSize break } // fmt.Println("eof") newFilesList := getNewFilesFromDir(filesList, DirIn) if len(newFilesList) != 0 { if !implContains(newFilesList, fileName) { shutUp = true log.Println("Обнаружен новый файл", newFilesList, shutUp) break } } } // Если получен сигнал о наличии новго файла то прекращаем читать старый // и отправляем пакет данных в эластик (чтобы не потерять последние записи) if shutUp { if SendToEs && EsBulk { go esAddRecordsBulk(client, indexName, arrRecords) bulkCount = 0 arrRecords = nil } break } } if shutUp { deleteTempFile(tempFile, fileName) } } func createWorkDir() { dirs := []string{DirIn, DirOut, DirTemp, DirLog} for _, dir := range dirs { // fmt.Println(dir) if !directoryExists(dir) { err := os.Mkdir(dir, 0755) if err != nil { log.Println(err) } else { log.Println("Создан каталог", dir) } } } } // CloseIdleConnections() func esConnect() *opensearch.Client { // Initialize the Client with SSL/TLS enabled. EsClient, err := opensearch.NewClient(opensearch.Config{ Transport: &http.Transport{ MaxIdleConns: 10, MaxIdleConnsPerHost: 10, ResponseHeaderTimeout: 300 * time.Second, IdleConnTimeout: 1 * time.Millisecond, DisableKeepAlives: true, // ForceAttemptHTTP2: true, MaxConnsPerHost: 20, DialContext: (&net.Dialer{ Timeout: 300 * time.Second, // KeepAlive: 10 * time.Millisecond, }).DialContext, TLSClientConfig: &tls.Config{InsecureSkipVerify: EsSkipVerify}, }, Addresses: []string{EsUrl}, Username: EsUser, Password: EsPassword, }) if err != nil { log.Println("cannot initialize", err) os.Exit(1) } // Получение списка узлов кластера для параллельной работы // вставка данных будет идти через все узлы if EsDiscoverNode { EsClient.DiscoverNodes() } if err != nil { log.Println("Ошибка получения списка узлов кластера ES:", err) os.Exit(1) } log.Println("Подключение к ElasticSearch:", EsClient) return EsClient } // Создаем индекс в ES func esCreateIndex(indexName string, client *opensearch.Client) { mapping := strings.NewReader(IndexTemplate) // Create an index with non-default settings. createIndex := opensearchapi.IndicesCreateRequest{ Index: indexName, Body: mapping, } if Debug { log.Println("Create index:", createIndex) } createIndexResponse, err := createIndex.Do(context.Background(), client) if err != nil { log.Println("failed to create index ", err, createIndexResponse) os.Exit(1) } if Debug { log.Println("Create index:", createIndex) } fmt.Println(createIndexResponse.StatusCode) if strings.Contains(createIndexResponse.String(), "already exists") { log.Println(createIndexResponse.IsError()) log.Println("Index", indexName, "already exists") } else if createIndexResponse.StatusCode == 200 { log.Println("Создан индекс:", indexName) } } // Вставка записи в индекс ES func esAddRecord(client *opensearch.Client, indexName string, line map[string]string) bool { // fmt.Println(">>>>>>>>>",indexName) // a_json, err := json.Marshal(line) if len(line) == 0 { return false } var operResult bool aJson, err := json.MarshalIndent(line, "", " ") if err != nil { log.Printf("Error encoding: %s", line, "with error %s", err) } if Debug { log.Println(line) } document := strings.NewReader(string(aJson)) if Debug { log.Println(document) } req := opensearchapi.IndexRequest{ Index: indexName, Body: document, } if Debug { log.Println(req) } insertResponse, err := req.Do(context.Background(), client) // res, err := req.Do(context.Background(), client) if err != nil { log.Println("failed to insert document ", err, line) // os.Exit(1) operResult = false } else { operResult = true } // Проверям результат добавления записи, если код возврата не равен 200 // то повторяем несколько раз if insertResponse.StatusCode == 200 && operResult { log.Println("Пакет данных добавлен") } else { // Ждем две секунды перед повторной отправкой j := 1 for j <= 5 { time.Sleep(2 * time.Second) insertResponse, err := req.Do(context.Background(), client) if err != nil { log.Println("Ошибка повторного добавления записи в opensearch (процедура esAddRecordsBulk):", err) } if insertResponse.StatusCode == 200 { operResult = true break } j++ } } if Debug { log.Printf("Ответ от %s при добавлении записи: %s", EsUrl, insertResponse) } defer insertResponse.Body.Close() return operResult } // Пакетная Вставка записуй в индекс ES func esAddRecordsBulk(client *opensearch.Client, indexName string, arr []map[string]string) bool { if Debug { log.Println("================= Отправка пакета данных ===================") log.Println("Индекс:", indexName) // log.Println(arr) } if len(arr) == 0 { return false } var ( operResult bool resultJson string ) resultJson = "" for _, line := range arr { // if Debug { // log.Println("----", i, line) // } if len(line) != 0 { resultJson += fmt.Sprintf("{\"create\": { \"_index\": \"%s\"}}\n", indexName) aJson, err := json.Marshal(line) if err != nil { log.Printf("Error encoding: %s", line, "with error %s", err) } else { resultJson += fmt.Sprintf("%s\n", string(aJson)) } } } // _ = strings.NewReader(string(aJson)) if Debug { log.Println(string(resultJson)) } r := strings.NewReader(resultJson) bulkResp, err := client.Bulk(r) if err != nil { log.Println("Ошибка добавления пакета данных в opensearch (процедура esAddRecordsBulk):", err) operResult = false } else { operResult = true } // Проверям результат добавления записи, если код возврата не равен 200 // то повторяем несколько раз if bulkResp.StatusCode == 200 && operResult { log.Println("Пакет данных добавлен") } else { // Ждем две секунды перед повторной отправкой j := 1 for j <= 5 { time.Sleep(2 * time.Second) bulkResp, err = client.Bulk(r) if err != nil { log.Println("Ошибка повторного добавления пакета данных в opensearch (процедура esAddRecordsBulk):", err) } if bulkResp.StatusCode == 200 { operResult = true break } j++ } } if Debug { log.Printf("Ответ от %s при добавлении пакета данных: %s", EsUrl, bulkResp) } defer bulkResp.Body.Close() return operResult } // Читаем конфиг и определяем переменные func readConfigFile(fileName string) { cfg, err := ini.LoadSources(ini.LoadOptions{ IgnoreInlineComment: true, }, fileName) if err != nil { log.Println("Ошибка чтения файла конфигурации ", fileName, ": ", err) } config := Config{} cfg.MapTo(&config) DirIn = cfg.Section("General").Key("dirIn").String() DirOut = cfg.Section("General").Key("dirOut").String() DirTemp = cfg.Section("General").Key("dirTemp").String() DirLog = cfg.Section("General").Key("dirLog").String() DictFile = cfg.Section("General").Key("dictFile").String() ObjectTypes = cfg.Section("General").Key("objectTypes").String() LogFileExtention = cfg.Section("General").Key("logFileExtention").String() ServiceName = cfg.Section("General").Key("serviceName").String() SendToEs = cfg.Section("ElasticSearch").Key("sendToEs").MustBool() if SendToEs { EsDiscoverNode = cfg.Section("ElasticSearch").Key("esDiscoverNode").MustBool() EsUrl = cfg.Section("ElasticSearch").Key("esUrl").String() EsUser = cfg.Section("ElasticSearch").Key("esUser").String() EsPassword = cfg.Section("ElasticSearch").Key("esPassword").String() EsSkipVerify = cfg.Section("ElasticSearch").Key("esSkipVerify").MustBool() EsIndexPrefix = cfg.Section("ElasticSearch").Key("esIndexPrefix").String() EsBulk = cfg.Section("ElasticSearch").Key("esBulk").MustBool() EsBulkRecordsQuantity, _ = strconv.Atoi(cfg.Section("ElasticSearch").Key("esBulkRecordsQuantity").String()) } WriteOutFile = cfg.Section("Processing").Key("writeOutFile").MustBool() OutFormat = cfg.Section("Processing").Key("outFormat").String() TimeZoneOffset = cfg.Section("Processing").Key("timeZoneOffset").String() Duration = cfg.Section("Processing").Key("readDuration").String() ExcludeEvent = strings.Split(cfg.Section("Processing").Key("excludeEvent").String(), ";") } // Создание файла конфигурации func writeConfigFile(fileName string) { f, err := os.Create(fileName) if err != nil { log.Println("Create out file error:", err) } _, err = f.WriteString(ConfigTemplate) if err != nil { log.Println("Ошибка записи в выходной файл:", err) } else { log.Println("Создан файл конфигурации:", fileName) } } // Создание каталога с конфигами func createConfDir(dir string) { if !directoryExists(dir) { err := os.Mkdir(dir, 0700) if err != nil { log.Println(err) } else { log.Println("Создан каталог", dir) } } } // const serviceName = "1C log processor" const serviceDescription = "1C log processor" type program struct{} func (p program) Start(s service.Service) error { log.Println("Служба", s.String(), "запущена") go p.RunService() return nil } func (p program) Stop(s service.Service) error { log.Println("Служба", s.String(), "остановлена") return nil } func (p program) RunService() { for { log.Println("Запускаем основной процесс") runOperations() time.Sleep(1 * time.Second) } } func RunWinService() { serviceConfig := &service.Config{ Name: ServiceName, DisplayName: ServiceName, Description: serviceDescription, } prg := &program{} s, err := service.New(prg, serviceConfig) if err != nil { log.Println("Ошибка создания сервиса:", err.Error()) } err = s.Run() if err != nil { log.Println("Ошибка запуска сервиса:", err.Error()) } } func main() { // Читаем опции коммандной строки flag.BoolVar(&RunAsWinService, "run-win-service", false, "Запуск как службы Windows") flag.StringVar(&DirCfg, "dir-config", ".config", "Каталог для файлов настройки (DIR_CFG)") flag.StringVar(&FileCfg, "config", "config.ini", "Файл настроек") flag.BoolVar(&CreateFileCfg, "create-config", false, "Создать файл настроек") flag.BoolVar(&Debug, "debug", false, "Выводить отладочную информацию") flag.StringVar(&DirIn, "dir-in", "in", "Каталог для исходных файлов (DIR_IN)") flag.StringVar(&DirOut, "dir-out", "out", "Каталог для обработанных файлов (DIR_OUT)") flag.StringVar(&DirTemp, "dir-temp", "tmp", "Каталог для временных файлов (TEMP)") flag.StringVar(&DirLog, "dir-worklog", "log", "Каталог для лога работы (DIR_LOG)") flag.StringVar(&DictFile, "dict-file", "1Cv8.lgf", "Файл со словарём (DICT_FILE)") flag.BoolVar(&SendToEs, "es-send", false, "Отправлять данные в ElasticSearch") flag.StringVar(&EsUrl, "es-url", "", "Адрес узла Elastic Search (ELASTICSEARCH_URL)") flag.StringVar(&EsUser, "es-user", "", "Имя пользователя Elastic Search (ELASTICSEARCH_USER)") flag.StringVar(&EsPassword, "es-password", "", "Пароль пользователя Elastic Search (ELASTICSEARCH_PASSWORD)") flag.StringVar(&EsIndexPrefix, "es-index-prefix", "", "Префикс имени индекса Elastic Search (ELASTICSEARCH_INDEX_PREFIX)") flag.BoolVar(&EsDiscoverNode, "es-discover-node", false, "Получать список узлов кластера ElasticSearch") flag.IntVar(&EsBulkRecordsQuantity, "es-bulk-quantity", 10, "Количество записей в одном запросе для пакетной вставки") flag.BoolVar(&EsBulk, "es-bulk", false, "Пакетная вставка записей в Elastic Search") flag.BoolVar(&EsSkipVerify, "es-skip-verify", false, "Пропустить проверку сертификатов при подключении к Elastic Search") flag.BoolVar(&WriteOutFile, "write-out-file", false, "Запись обработанных данных в файл") flag.StringVar(&OutFormat, "out-format", "csv", "Формат данных на выходе (csv, json) (OUT_FORMAT)") flag.StringVar(&LogFileExtention, "log-file-ext", ".lgp", "Расширение файлов с журналами (обрабатываемых логов) (LOG_FILE_EXT)") flag.StringVar(&ObjectTypes, "object-types", "1,2,3,4,5,6,7,8", "Список типов объектов словаря для выборки, разделённый запятой (OBJECT_TYPES)") flag.StringVar(&TimeZoneOffset, "tz-offset", "+03:00", "Сдвиг по времени от UTC") flag.StringVar(&Duration, "tail-sleep", "5ms", "Задержка чтения входного файла. Разрешены обозначения 'ns', 'us' ('µs'), 'ms', 's', 'm','h')") flag.StringVar(&InFile, "file", "", "Имя файла для обработки. Если требуется обработать один файл") flag.StringVar(&WorkLogOut, "worklog-out", "file", "Направление вывода журнала работы программы (console, file)") // flag.BoolVar(&Version, "version", false, "Версия программы") flag.BoolFunc("version", "Версия программы", func(s string) error { fmt.Println("Версия:", Version) os.Exit(0) return nil }) flag.Parse() configFile := filepath.Join(DirCfg, FileCfg) if fileExists(configFile) { readConfigFile(configFile) log.Println("Используются настройки из файла конфгурации:", configFile) } else { log.Println("Не найден файл конфгурации:", configFile) if CreateFileCfg { createConfDir(DirCfg) writeConfigFile(configFile) } } // os.Exit(1) // Установим задержку чтения входного файла TailSleep, _ = time.ParseDuration(Duration) // log.Println(DirCfg, DirIn, "send es:", SendToEs, OutFormat, WriteOutFile, EsUrl, "esuser:", EsUser, Duration, TailSleep) // Проверка и установка переменных для работы if DirIn == "" && os.Getenv("DIR_IN") == "" { fmt.Println("Make sure environment variables `DIR_IN`, or used with '-dir-in' argument") os.Exit(1) } else if DirIn == "" && os.Getenv("DIR_IN") != "" { DirIn = os.Getenv("DIR_IN") } if WriteOutFile { if DirOut == "" && os.Getenv("DIR_OUT") == "" { fmt.Println("Make sure environment variables `DIR_OUT`, or used with '-dir-out' argument") os.Exit(1) } else if DirOut == "" && os.Getenv("DIR_OUT") != "" { DirOut = os.Getenv("DIR_OUT") } } if os.Getenv("LOG_FILE_EXT") != "" { LogFileExtention = os.Getenv("LOG_FILE_EXT") } if os.Getenv("DIR_LOG") != "" { DirLog = os.Getenv("DIR_LOG") } // Проверяем параметры и переменные окружения для работы с ElasticSearch / OpenSearch if SendToEs { if EsUser == "" && os.Getenv("ELASTICSEARCH_USER") == "" { fmt.Println("Make sure environment variables `ELASTICSEARCH_USER=\"es_user_name\"` or used with '-es-user' argument") os.Exit(1) } else if EsUser == "" && os.Getenv("ELASTICSEARCH_USER") != "" { EsUser = os.Getenv("ELASTICSEARCH_USER") } if EsPassword == "" && os.Getenv("ELASTICSEARCH_PASSWORD") == "" { fmt.Println("Make sure environment variables `ELASTICSEARCH_PASSWORD or used with '-es-password' argument") os.Exit(1) } else if EsPassword == "" && os.Getenv("ELASTICSEARCH_PASSWORD") != "" { EsPassword = os.Getenv("ELASTICSEARCH_PASSWORD") } if EsUrl == "" && os.Getenv("ELASTICSEARCH_URL") == "" { fmt.Println("Make sure environment variables `ELASTICSEARCH_URL or used with '-es-url' argument") os.Exit(1) } else if EsUrl == "" && os.Getenv("ELASTICSEARCH_URL") != "" { EsUrl = os.Getenv("ELASTICSEARCH_URL") } if EsIndexPrefix == "" && os.Getenv("ELASTICSEARCH_INDEX_PREFIX") == "" { fmt.Println("Make sure environment variables `ELASTICSEARCH_INDEX_PREFIX` or used with '-es-index-prefix' argument") os.Exit(1) } else if EsIndexPrefix == "" && os.Getenv("ELASTICSEARCH_INDEX_PREFIX") != "" { EsIndexPrefix = os.Getenv("ELASTICSEARCH_INDEX_PREFIX") } } createWorkDir() // Определим куда выводим лог работы switch WorkLogOut { case "file": // Установка и открытие лога для программы fLog, err := os.OpenFile(filepath.Join(DirLog, "1c-log-processor.log"), os.O_RDWR | os.O_CREATE | os.O_APPEND, 0666) if err != nil { fmt.Printf("error opening log file: %v", err) } defer fLog.Close() log.SetOutput(fLog) case "console": log.SetOutput(os.Stdout) } // if Debug { // log.SetOutput(os.Stdout) // } else { // log.SetOutput(fLog) // } // log.SetOutput(fLog) // readDictFile(dictFile) DictObjects = readDictFile(DictFile) // Получаем временную зону и часовой пояс // Если зону возвращает UTC то присваеваем вручную из опций командной строки // Хреновый хак но пока решения нет. Так как на винде возвращает почему-то нули LocalTimeZone, LocalTZOffset = getLocalTimeZone() if LocalTZOffset == "+00:00" { LocalTZOffset = TimeZoneOffset } log.Println(DirCfg, FileCfg, DirIn, DirOut, DirTemp, DirLog, DictFile) // Запуск как сервиса if RunAsWinService { RunWinService() } else { runOperations() } log.Println("Работа программы завершена") }