Skip to content

Commit

Permalink
#851 Update node exporter add multi log path
Browse files Browse the repository at this point in the history
  • Loading branch information
zgyzgyhero committed Dec 5, 2024
1 parent 6768581 commit b51ec4d
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 4 deletions.
5 changes: 4 additions & 1 deletion monitor-agent/node_exporter/VERSION
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ v3.2.0
Fix log tail chanel closed when handler check active && modify tail reopen

v3.2.5
Fix pcre match panic error
Fix pcre match panic error

v3.3.0.1
Support log path with wildcard character
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"net/http"
"reflect"
//"regexp"
"os/exec"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -363,6 +364,56 @@ func (c *logMetricMonitorNeObj) start() {
}
}

func (c *logMetricMonitorNeObj) tailLogFile(logPath string, destroyChan chan int) {
level.Info(monitorLogger).Log("log_metric_start -> tailMultiLog", fmt.Sprintf("path:%s,serviceGroup:%s", logPath, c.ServiceGroup))
logTailSession, err := tail.TailFile(logPath, tail.Config{Follow: true, ReOpen: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}})
if err != nil {
level.Error(monitorLogger).Log("msg", fmt.Sprintf("start multi log metric collector fail, path: %s, error: %v", logPath, err))
return
}
destroyFlag := false
for {
select {
case <-destroyChan:
destroyFlag = true
case line := <-logTailSession.Lines:
if line == nil {
continue
}
//level.Info(monitorLogger).Log("log_metric -> get_new_line", fmt.Sprintf("path:%s,serviceGroup:%s,text:%s", c.Path, c.ServiceGroup, line.Text))
c.DataChan <- line.Text
}
if destroyFlag {
break
}
}
logTailSession.Stop()
level.Info(monitorLogger).Log("log_metric_end -> tailMultiLog", fmt.Sprintf("path:%s,serviceGroup:%s", logPath, c.ServiceGroup))
}

func (c *logMetricMonitorNeObj) startMultiPath() {
level.Info(monitorLogger).Log("log_metric -> startLogMetricMonitorNeObj__startMultiPath", fmt.Sprintf("path:%s,serviceGroup:%s", c.Path, c.ServiceGroup))
pathList := listMatchLogPath(c.Path)
if len(pathList) == 0 {
level.Warn(monitorLogger).Log("log_metric -> startMultiPath_cannotMatchAnyFile", fmt.Sprintf("path:%s,serviceGroup:%s", c.Path, c.ServiceGroup))
return
}
c.DataChan = make(chan string, logMetricChanLength)
go c.startHandleTailData()
var destroyChanList []chan int
for _, targetFilePath := range pathList {
tmpDestroyChan := make(chan int, 1)
go c.tailLogFile(targetFilePath, tmpDestroyChan)
destroyChanList = append(destroyChanList, tmpDestroyChan)
}
<-c.DestroyChan
for _, tmpDestroyChan := range destroyChanList {
tmpDestroyChan <- 1
}
c.TailDataCancelChan <- 1
level.Info(monitorLogger).Log("log_metric -> startLogMetricMonitorNeObj__endMultiPath", fmt.Sprintf("path:%s,serviceGroup:%s", c.Path, c.ServiceGroup))
}

func (c *logMetricMonitorNeObj) startFileHandlerCheck() {
t := time.NewTicker(1 * time.Minute).C
for {
Expand Down Expand Up @@ -424,7 +475,11 @@ func (c *logMetricMonitorNeObj) new(input *logMetricMonitorNeObj) {
initLogMetricGroupNeObj(metricGroupObj)
c.MetricGroupConfig = append(c.MetricGroupConfig, metricGroupObj)
}
go c.start()
if strings.Contains(c.Path, "*") {
go c.startMultiPath()
} else {
go c.start()
}
}

// 把所有正则初始化
Expand Down Expand Up @@ -1117,3 +1172,30 @@ func getFileLastUpdatedTime(filePath string) (unixTime int64, err error) {
unixTime = f.ModTime().Unix()
return
}

func listMatchLogPath(inputPath string) (result []string) {
var dirPath, fileName string
if lastPathIndex := strings.LastIndex(inputPath, "/"); lastPathIndex >= 0 {
dirPath = inputPath[:lastPathIndex+1]
fileName = inputPath[lastPathIndex+1:]
}
if fileName == "" {
level.Error(monitorLogger).Log("msg", fmt.Sprintf("log path illgal : %s ", inputPath))
return
}
fileName = strings.ReplaceAll(fileName, ".", "\\.")
fileName = strings.ReplaceAll(fileName, "*", ".*")
cmdString := fmt.Sprintf("ls %s |grep \"^%s$\"", dirPath, fileName)
cmd := exec.Command("bash", "-c", cmdString)
b, err := cmd.Output()
if err != nil {
level.Error(monitorLogger).Log("msg", fmt.Sprintf("list log path:%s fail : %v ", cmdString, err))
return
}
for _, row := range strings.Split(string(b), "\n") {
if row != "" {
result = append(result, dirPath+row)
}
}
return
}
23 changes: 21 additions & 2 deletions monitor-server/api/v2/service/log_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io"
"io/ioutil"
"net/http"
"regexp"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -50,6 +51,21 @@ func GetLogMetricMonitor(c *gin.Context) {
}
}

func validateLogPath(input string) error {
regPath := regexp.MustCompile(`^\/([\w|\.|\-|\*]+\/?)+\.\w+$`)
err := fmt.Errorf("path:%s illegal", input)
if !regPath.MatchString(input) {
return err
}
pathList := strings.Split(input, "/")
for i, v := range pathList {
if i < len(pathList)-1 && strings.Contains(v, "*") {
return err
}
}
return nil
}

func CreateLogMetricMonitor(c *gin.Context) {
var param models.LogMetricMonitorCreateDto
var list []*models.LogMetricMonitorTable
Expand All @@ -62,8 +78,7 @@ func CreateLogMetricMonitor(c *gin.Context) {
err = fmt.Errorf("Param log_path is empty ")
}
for _, v := range param.LogPath {
if !strings.HasPrefix(v, "/") {
err = fmt.Errorf("Path:%s illegal ", v)
if err = validateLogPath(v); err != nil {
break
}
}
Expand Down Expand Up @@ -107,6 +122,10 @@ func UpdateLogMetricMonitor(c *gin.Context) {
for _, v := range param.EndpointRel {
hostEndpointList = append(hostEndpointList, v.SourceEndpoint)
}
if err = validateLogPath(param.LogPath); err != nil {
middleware.ReturnValidateError(c, err.Error())
return
}
// 校验路径是否重复
if list, err = db.GetLogMetricMonitorByCond([]string{param.LogPath}, param.Guid, param.ServiceGroup); err != nil {
middleware.ReturnServerHandleError(c, err)
Expand Down

0 comments on commit b51ec4d

Please sign in to comment.