diff --git a/monitor-agent/node_exporter/VERSION b/monitor-agent/node_exporter/VERSION index 0895ec796..2bca7cbea 100644 --- a/monitor-agent/node_exporter/VERSION +++ b/monitor-agent/node_exporter/VERSION @@ -43,4 +43,7 @@ v3.2.0 Fix log tail chanel closed when handler check active && modify tail reopen v3.2.5 -Fix pcre match panic error \ No newline at end of file +Fix pcre match panic error + +v3.3.0.1 +Support log path with wildcard character diff --git a/monitor-agent/node_exporter/collector/monitor_log_metric_linux.go b/monitor-agent/node_exporter/collector/monitor_log_metric_linux.go index 5516bb18b..e04a809b4 100644 --- a/monitor-agent/node_exporter/collector/monitor_log_metric_linux.go +++ b/monitor-agent/node_exporter/collector/monitor_log_metric_linux.go @@ -15,6 +15,7 @@ import ( "net/http" "reflect" //"regexp" + "os/exec" "strconv" "strings" "sync" @@ -363,6 +364,56 @@ func (c *logMetricMonitorNeObj) start() { } } +func (c *logMetricMonitorNeObj) tailLogFile(logPath string, destroyChan chan int) { + level.Info(monitorLogger).Log("log_metric_start -> tailMultiLog", fmt.Sprintf("path:%s,serviceGroup:%s", logPath, c.ServiceGroup)) + logTailSession, err := tail.TailFile(logPath, tail.Config{Follow: true, ReOpen: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}}) + if err != nil { + level.Error(monitorLogger).Log("msg", fmt.Sprintf("start multi log metric collector fail, path: %s, error: %v", logPath, err)) + return + } + destroyFlag := false + for { + select { + case <-destroyChan: + destroyFlag = true + case line := <-logTailSession.Lines: + if line == nil { + continue + } + //level.Info(monitorLogger).Log("log_metric -> get_new_line", fmt.Sprintf("path:%s,serviceGroup:%s,text:%s", c.Path, c.ServiceGroup, line.Text)) + c.DataChan <- line.Text + } + if destroyFlag { + break + } + } + logTailSession.Stop() + level.Info(monitorLogger).Log("log_metric_end -> tailMultiLog", fmt.Sprintf("path:%s,serviceGroup:%s", logPath, c.ServiceGroup)) +} + +func (c *logMetricMonitorNeObj) startMultiPath() { + level.Info(monitorLogger).Log("log_metric -> startLogMetricMonitorNeObj__startMultiPath", fmt.Sprintf("path:%s,serviceGroup:%s", c.Path, c.ServiceGroup)) + pathList := listMatchLogPath(c.Path) + if len(pathList) == 0 { + level.Warn(monitorLogger).Log("log_metric -> startMultiPath_cannotMatchAnyFile", fmt.Sprintf("path:%s,serviceGroup:%s", c.Path, c.ServiceGroup)) + return + } + c.DataChan = make(chan string, logMetricChanLength) + go c.startHandleTailData() + var destroyChanList []chan int + for _, targetFilePath := range pathList { + tmpDestroyChan := make(chan int, 1) + go c.tailLogFile(targetFilePath, tmpDestroyChan) + destroyChanList = append(destroyChanList, tmpDestroyChan) + } + <-c.DestroyChan + for _, tmpDestroyChan := range destroyChanList { + tmpDestroyChan <- 1 + } + c.TailDataCancelChan <- 1 + level.Info(monitorLogger).Log("log_metric -> startLogMetricMonitorNeObj__endMultiPath", fmt.Sprintf("path:%s,serviceGroup:%s", c.Path, c.ServiceGroup)) +} + func (c *logMetricMonitorNeObj) startFileHandlerCheck() { t := time.NewTicker(1 * time.Minute).C for { @@ -424,7 +475,11 @@ func (c *logMetricMonitorNeObj) new(input *logMetricMonitorNeObj) { initLogMetricGroupNeObj(metricGroupObj) c.MetricGroupConfig = append(c.MetricGroupConfig, metricGroupObj) } - go c.start() + if strings.Contains(c.Path, "*") { + go c.startMultiPath() + } else { + go c.start() + } } // 把所有正则初始化 @@ -1117,3 +1172,30 @@ func getFileLastUpdatedTime(filePath string) (unixTime int64, err error) { unixTime = f.ModTime().Unix() return } + +func listMatchLogPath(inputPath string) (result []string) { + var dirPath, fileName string + if lastPathIndex := strings.LastIndex(inputPath, "/"); lastPathIndex >= 0 { + dirPath = inputPath[:lastPathIndex+1] + fileName = inputPath[lastPathIndex+1:] + } + if fileName == "" { + level.Error(monitorLogger).Log("msg", fmt.Sprintf("log path illgal : %s ", inputPath)) + return + } + fileName = strings.ReplaceAll(fileName, ".", "\\.") + fileName = strings.ReplaceAll(fileName, "*", ".*") + cmdString := fmt.Sprintf("ls %s |grep \"^%s$\"", dirPath, fileName) + cmd := exec.Command("bash", "-c", cmdString) + b, err := cmd.Output() + if err != nil { + level.Error(monitorLogger).Log("msg", fmt.Sprintf("list log path:%s fail : %v ", cmdString, err)) + return + } + for _, row := range strings.Split(string(b), "\n") { + if row != "" { + result = append(result, dirPath+row) + } + } + return +} diff --git a/monitor-server/api/v2/service/log_metric.go b/monitor-server/api/v2/service/log_metric.go index c384acfd7..b8d4062fc 100644 --- a/monitor-server/api/v2/service/log_metric.go +++ b/monitor-server/api/v2/service/log_metric.go @@ -13,6 +13,7 @@ import ( "io" "io/ioutil" "net/http" + "regexp" "sort" "strconv" "strings" @@ -50,6 +51,21 @@ func GetLogMetricMonitor(c *gin.Context) { } } +func validateLogPath(input string) error { + regPath := regexp.MustCompile(`^\/([\w|\.|\-|\*]+\/?)+\.\w+$`) + err := fmt.Errorf("path:%s illegal", input) + if !regPath.MatchString(input) { + return err + } + pathList := strings.Split(input, "/") + for i, v := range pathList { + if i < len(pathList)-1 && strings.Contains(v, "*") { + return err + } + } + return nil +} + func CreateLogMetricMonitor(c *gin.Context) { var param models.LogMetricMonitorCreateDto var list []*models.LogMetricMonitorTable @@ -62,8 +78,7 @@ func CreateLogMetricMonitor(c *gin.Context) { err = fmt.Errorf("Param log_path is empty ") } for _, v := range param.LogPath { - if !strings.HasPrefix(v, "/") { - err = fmt.Errorf("Path:%s illegal ", v) + if err = validateLogPath(v); err != nil { break } } @@ -107,6 +122,10 @@ func UpdateLogMetricMonitor(c *gin.Context) { for _, v := range param.EndpointRel { hostEndpointList = append(hostEndpointList, v.SourceEndpoint) } + if err = validateLogPath(param.LogPath); err != nil { + middleware.ReturnValidateError(c, err.Error()) + return + } // 校验路径是否重复 if list, err = db.GetLogMetricMonitorByCond([]string{param.LogPath}, param.Guid, param.ServiceGroup); err != nil { middleware.ReturnServerHandleError(c, err)