forked from fluxio/logging
-
Notifications
You must be signed in to change notification settings - Fork 0
/
readers.go
144 lines (120 loc) · 2.96 KB
/
readers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package logging
import (
"bufio"
"bytes"
"fmt"
"io"
"regexp"
"strings"
)
type FilteringLogReader struct {
reader *bufio.Reader
contextRegexp *regexp.Regexp
buf []byte
skipEntry bool
}
func (f *FilteringLogReader) Read(buf []byte) (n int, err error) {
if len(f.buf) > 0 {
n = copy(buf, f.buf)
f.buf = f.buf[n:]
return n, nil
}
var prefix bool
var haveNewLine bool
for f.skipEntry {
f.buf, prefix, err = f.reader.ReadLine()
if err != nil {
return 0, err
}
if haveNewLine {
// When we've read a new line, check to see if it's a continuation.
// If so, keep skipping it.
f.skipEntry = bytes.HasPrefix(f.buf, []byte(continuation))
}
haveNewLine = !prefix
}
// If we haven't read anything in yet, read in some data.
if !haveNewLine {
f.buf, prefix, err = f.reader.ReadLine()
}
return 0, io.EOF
}
type LogReader struct {
reader *bufio.Reader
linebuffer bytes.Buffer
}
func NewLogReader(r io.Reader) *LogReader {
return &LogReader{reader: bufio.NewReader(r)}
}
type logEntry struct {
full string
header string
typ byte
ts string
file string
line string
context string
msg string
}
func (r *LogReader) Next() (entry logEntry, err error) {
for err == nil {
var line string
line, err = r.reader.ReadString('\n')
// ReadString will return io.EOF even if line is non-empty if the last
// line of the file doesn't end in a newline. But for this function,
// we only return an io.EOF error if prefix and msg are empty.
if len(line) > 0 && err == io.EOF {
err = nil
} else if err != nil {
break
}
lineType, lineEntry := determineLineType(line)
if lineType == unknown {
return entry, fmt.Errorf("Malformatted log file. Cannot parse line: %q", line)
}
if lineType == entryCont && len(lineEntry.header) == 0 {
return entry, fmt.Errorf("Starting in the middle of a log file: %q", line)
}
if lineType == entryStart {
entry = lineEntry
} else {
entry.msg += lineEntry.msg
}
next, _ := r.reader.Peek(1)
if len(next) == 0 || next[0] != ' ' {
break
}
}
entry.msg = strings.TrimRight(entry.msg, "\r\n")
return entry, err
}
const (
unknown = iota
entryStart
entryCont
)
var entryStartRegexp = regexp.MustCompile(`^(I|D|E)(\d{4} [\d:\.]{12}[-+\dZ]+) ([\w.]+):(\d+) \((.*)\): `)
func determineLineType(line string) (lineType int, e logEntry) {
if len(line) < len(continuation) {
return unknown, e
}
if line[:len(continuation)] == string(continuation) {
return entryCont, logEntry{full: line, header: string(continuation), msg: line[len(continuation):]}
}
headerPos := entryStartRegexp.FindStringSubmatchIndex(line)
if len(headerPos) == 0 {
return unknown, e
}
part := func(n int) string { return line[headerPos[n*2]:headerPos[n*2+1]] }
e = logEntry{
full: line,
header: part(0),
typ: part(1)[0],
ts: part(2),
file: part(3),
line: part(4),
context: part(5),
msg: line[headerPos[1]:],
}
return entryStart, e
}