diff --git a/cmd/chunks-inspect/go.mod b/cmd/chunks-inspect/go.mod deleted file mode 100644 index 6f197091f44e8..0000000000000 --- a/cmd/chunks-inspect/go.mod +++ /dev/null @@ -1,10 +0,0 @@ -module github.com/grafana/loki/cmd/chunks-inspect - -go 1.15 - -require ( - github.com/frankban/quicktest v1.7.2 // indirect - github.com/golang/snappy v0.0.1 - github.com/klauspost/compress v1.11.7 - github.com/pierrec/lz4 v2.3.0+incompatible -) diff --git a/cmd/chunks-inspect/go.sum b/cmd/chunks-inspect/go.sum deleted file mode 100644 index 4800a9690ba55..0000000000000 --- a/cmd/chunks-inspect/go.sum +++ /dev/null @@ -1,15 +0,0 @@ -github.com/frankban/quicktest v1.7.2 h1:2QxQoC1TS09S7fhCPsrvqYdvP1H5M1P1ih5ABm3BTYk= -github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg= -github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/pierrec/lz4 v2.3.0+incompatible h1:CZzRn4Ut9GbUkHlQ7jqBXeZQV41ZSKWFc302ZU6lUTk= -github.com/pierrec/lz4 v2.3.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= diff --git a/cmd/chunks-inspect/loki.go b/cmd/chunks-inspect/loki.go index d8fd5d0a913fd..596bdcd24467c 100644 --- a/cmd/chunks-inspect/loki.go +++ b/cmd/chunks-inspect/loki.go @@ -1,97 +1,19 @@ package main import ( - "bytes" - "compress/gzip" "encoding/binary" "fmt" - "hash/crc32" "io" - "github.com/golang/snappy" - "github.com/klauspost/compress/flate" - "github.com/klauspost/compress/zstd" - "github.com/pierrec/lz4" -) - -type Encoding struct { - code int - name string - readerFn func(io.Reader) (io.Reader, error) -} - -func (e Encoding) String() string { - return e.name -} - -// The table gets initialized with sync.Once but may still cause a race -// with any other use of the crc32 package anywhere. Thus we initialize it -// before. -var castagnoliTable *crc32.Table - -func init() { - castagnoliTable = crc32.MakeTable(crc32.Castagnoli) -} - -var ( - encNone = Encoding{code: 0, name: "none", readerFn: func(reader io.Reader) (io.Reader, error) { return reader, nil }} - encGZIP = Encoding{code: 1, name: "gzip", readerFn: func(reader io.Reader) (io.Reader, error) { return gzip.NewReader(reader) }} - encDumb = Encoding{code: 2, name: "dumb", readerFn: func(reader io.Reader) (io.Reader, error) { return reader, nil }} - encLZ4 = Encoding{code: 3, name: "lz4", readerFn: func(reader io.Reader) (io.Reader, error) { return lz4.NewReader(reader), nil }} - encSnappy = Encoding{code: 4, name: "snappy", readerFn: func(reader io.Reader) (io.Reader, error) { return snappy.NewReader(reader), nil }} - enclz4_256k = Encoding{code: 5, name: "lz4-256k", readerFn: func(reader io.Reader) (io.Reader, error) { return lz4.NewReader(reader), nil }} - enclz4_1M = Encoding{code: 6, name: "lz4-1M", readerFn: func(reader io.Reader) (io.Reader, error) { return lz4.NewReader(reader), nil }} - enclz4_4M = Encoding{code: 7, name: "lz4-4M", readerFn: func(reader io.Reader) (io.Reader, error) { return lz4.NewReader(reader), nil }} - encFlate = Encoding{code: 8, name: "flate", readerFn: func(reader io.Reader) (io.Reader, error) { return flate.NewReader(reader), nil }} - encZstd = Encoding{code: 9, name: "zstd", readerFn: func(reader io.Reader) (io.Reader, error) { - r, err := zstd.NewReader(reader) - if err != nil { - panic(err) - } - return r, nil - }} - - Encodings = []Encoding{encNone, encGZIP, encDumb, encLZ4, encSnappy, enclz4_256k, enclz4_1M, enclz4_4M, encFlate, encZstd} -) - -const ( - _ byte = iota - chunkFormatV1 - chunkFormatV2 - chunkFormatV3 + "github.com/grafana/loki/pkg/chunkenc" ) type LokiChunk struct { - format byte - encoding Encoding - - blocks []LokiBlock - - metadataChecksum uint32 - computedMetadataChecksum uint32 -} - -type LokiBlock struct { - numEntries uint64 // number of log lines in this block - minT int64 // minimum timestamp, unix nanoseconds - maxT int64 // max timestamp, unix nanoseconds - - dataOffset uint64 // offset in the data-part of chunks file - - uncompSize uint64 // size of the original data uncompressed - - rawData []byte // data as stored in chunk file, compressed - originalData []byte // data uncompressed from rawData - - // parsed rawData - entries []LokiEntry - storedChecksum uint32 - computedChecksum uint32 -} - -type LokiEntry struct { - timestamp int64 - line string + format byte + encoding chunkenc.Encoding + compressedSize int + uncompressedSize int + blocks []chunkenc.Block } func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) { @@ -122,6 +44,18 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) { return nil, fmt.Errorf("failed to read rawData for Loki chunk into memory: %w", err) } + c, _ := chunkenc.NewByteChunk(data, 0, 0) + encoding := c.Encoding() + compressedSize := c.CompressedSize() + uncompressedSize := c.UncompressedSize() + from, through := c.Bounds() + + bs := c.Blocks(from, through) + err := c.Close() + if err != nil { + return nil, err + } + if num := binary.BigEndian.Uint32(data[0:4]); num != 0x012EE56A { return nil, fmt.Errorf("invalid magic number: %0x", num) } @@ -129,135 +63,13 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) { // Chunk format is at position 4 f := data[4] - compression, err := getCompression(f, data[5]) - if err != nil { - return nil, fmt.Errorf("failed to read compression: %w", err) - } - - // return &LokiChunk{encoding: compression}, nil - - metasOffset := binary.BigEndian.Uint64(data[len(data)-8:]) - - metadata := data[metasOffset : len(data)-(8+4)] - - metaChecksum := binary.BigEndian.Uint32(data[len(data)-12 : len(data)-8]) - computedMetaChecksum := crc32.Checksum(metadata, castagnoliTable) - - blocks, n := binary.Uvarint(metadata) - if n <= 0 { - return nil, fmt.Errorf("failed to read number of blocks") - } - metadata = metadata[n:] - lokiChunk := &LokiChunk{ - format: f, - encoding: compression, - metadataChecksum: metaChecksum, - computedMetadataChecksum: computedMetaChecksum, - } - - for ix := 0; ix < int(blocks); ix++ { - block := LokiBlock{} - block.numEntries, metadata, err = readUvarint(err, metadata) - block.minT, metadata, err = readVarint(err, metadata) - block.maxT, metadata, err = readVarint(err, metadata) - block.dataOffset, metadata, err = readUvarint(err, metadata) - if f >= chunkFormatV3 { - block.uncompSize, metadata, err = readUvarint(err, metadata) - } - dataLength := uint64(0) - dataLength, metadata, err = readUvarint(err, metadata) - - if err != nil { - return nil, err - } - - block.rawData = data[block.dataOffset : block.dataOffset+dataLength] - block.storedChecksum = binary.BigEndian.Uint32(data[block.dataOffset+dataLength : block.dataOffset+dataLength+4]) - block.computedChecksum = crc32.Checksum(block.rawData, castagnoliTable) - block.originalData, block.entries, err = parseLokiBlock(compression, block.rawData) - lokiChunk.blocks = append(lokiChunk.blocks, block) + format: f, + encoding: encoding, + compressedSize: compressedSize, + uncompressedSize: uncompressedSize, + blocks: bs, } return lokiChunk, nil } - -func parseLokiBlock(compression Encoding, data []byte) ([]byte, []LokiEntry, error) { - r, err := compression.readerFn(bytes.NewReader(data)) - if err != nil { - return nil, nil, err - } - - decompressed, err := io.ReadAll(r) - origDecompressed := decompressed - if err != nil { - return nil, nil, err - } - - entries := []LokiEntry(nil) - for len(decompressed) > 0 { - var timestamp int64 - var lineLength uint64 - - timestamp, decompressed, err = readVarint(err, decompressed) - lineLength, decompressed, err = readUvarint(err, decompressed) - if err != nil { - return origDecompressed, nil, err - } - - if len(decompressed) < int(lineLength) { - return origDecompressed, nil, fmt.Errorf("not enough line data, need %d, got %d", lineLength, len(decompressed)) - } - - entries = append(entries, LokiEntry{ - timestamp: timestamp, - line: string(decompressed[0:lineLength]), - }) - - decompressed = decompressed[lineLength:] - } - - return origDecompressed, entries, nil -} - -func readVarint(prevErr error, buf []byte) (int64, []byte, error) { - if prevErr != nil { - return 0, buf, prevErr - } - - val, n := binary.Varint(buf) - if n <= 0 { - return 0, nil, fmt.Errorf("varint: %d", n) - } - return val, buf[n:], nil -} - -func readUvarint(prevErr error, buf []byte) (uint64, []byte, error) { - if prevErr != nil { - return 0, buf, prevErr - } - - val, n := binary.Uvarint(buf) - if n <= 0 { - return 0, nil, fmt.Errorf("varint: %d", n) - } - return val, buf[n:], nil -} - -func getCompression(format byte, code byte) (Encoding, error) { - if format == chunkFormatV1 { - return encGZIP, nil - } - - if format >= chunkFormatV2 { - for _, e := range Encodings { - if e.code == int(code) { - return e, nil - } - } - - return encNone, fmt.Errorf("unknown encoding: %d", code) - } - - return encNone, fmt.Errorf("unknown format: %d", format) -} diff --git a/cmd/chunks-inspect/main.go b/cmd/chunks-inspect/main.go index c25f621845b06..1a3593f936b18 100644 --- a/cmd/chunks-inspect/main.go +++ b/cmd/chunks-inspect/main.go @@ -1,13 +1,17 @@ package main import ( - "crypto/sha256" + "context" "flag" "fmt" "log" "os" "strings" "time" + + "github.com/grafana/loki/pkg/chunkenc" + + logql "github.com/grafana/loki/pkg/logql/log" ) const format = "2006-01-02 15:04:05.000000 MST" @@ -17,15 +21,15 @@ var timezone = time.UTC func main() { blocks := flag.Bool("b", false, "print block details") lines := flag.Bool("l", false, "print log lines") - storeBlocks := flag.Bool("s", false, "store blocks, using input filename, and appending block index to it") + export := flag.Bool("e", false, "export log lines to file") flag.Parse() for _, f := range flag.Args() { - printFile(f, *blocks, *lines, *storeBlocks) + printFile(f, *blocks, *lines, *export) } } -func printFile(filename string, blockDetails, printLines, storeBlocks bool) { +func printFile(filename string, blockDetails, printLines bool, exportLogLines bool) { f, err := os.Open(filename) if err != nil { log.Printf("%s: %v", filename, err) @@ -33,12 +37,6 @@ func printFile(filename string, blockDetails, printLines, storeBlocks bool) { } defer f.Close() - si, err := f.Stat() - if err != nil { - log.Println("failed to stat file", err) - return - } - h, err := DecodeHeader(f) if err != nil { log.Printf("%s: %v", filename, err) @@ -67,65 +65,64 @@ func printFile(filename string, blockDetails, printLines, storeBlocks bool) { fmt.Println("Format (Version):", lokiChunk.format) fmt.Println("Encoding:", lokiChunk.encoding) - fmt.Print("Blocks Metadata Checksum: ", fmt.Sprintf("%08x", lokiChunk.metadataChecksum)) - if lokiChunk.metadataChecksum == lokiChunk.computedMetadataChecksum { - fmt.Println(" OK") - } else { - fmt.Println(" BAD, computed checksum:", fmt.Sprintf("%08x", lokiChunk.computedMetadataChecksum)) - } if blockDetails { fmt.Println("Found", len(lokiChunk.blocks), "block(s)") } else { fmt.Println("Found", len(lokiChunk.blocks), "block(s), use -b to show block details") } - if len(lokiChunk.blocks) > 0 { - fmt.Println("Minimum time (from first block):", time.Unix(0, lokiChunk.blocks[0].minT).In(timezone).Format(format)) - fmt.Println("Maximum time (from last block):", time.Unix(0, lokiChunk.blocks[len(lokiChunk.blocks)-1].maxT).In(timezone).Format(format)) - } if blockDetails { fmt.Println() } - totalSize := 0 - + pipeline := logql.NewNoopPipeline() for ix, b := range lokiChunk.blocks { if blockDetails { - cksum := "" - if b.storedChecksum == b.computedChecksum { - cksum = fmt.Sprintf("%08x OK", b.storedChecksum) - } else { - cksum = fmt.Sprintf("%08x BAD (computed: %08x)", b.storedChecksum, b.computedChecksum) - } - fmt.Printf("Block %4d: position: %8d, original length: %6d (stored: %6d, ratio: %.2f), minT: %v maxT: %v, checksum: %s\n", - ix, b.dataOffset, len(b.originalData), len(b.rawData), float64(len(b.originalData))/float64(len(b.rawData)), - time.Unix(0, b.minT).In(timezone).Format(format), time.Unix(0, b.maxT).In(timezone).Format(format), - cksum) - fmt.Printf("Block %4d: digest compressed: %02x, original: %02x\n", ix, sha256.Sum256(b.rawData), sha256.Sum256(b.originalData)) + fmt.Printf("Block %4d: position: %8d, minT: %v maxT: %v\n", + ix, b.Offset(), + time.Unix(0, b.MinTime()).In(timezone).Format(format), + time.Unix(0, b.MaxTime()).In(timezone).Format(format), + ) } - totalSize += len(b.originalData) - if printLines { - for _, l := range b.entries { - fmt.Printf("%v\t%s\n", time.Unix(0, l.timestamp).In(timezone).Format(format), strings.TrimSpace(l.line)) + iter := b.Iterator(context.Background(), pipeline.ForStream(nil)) + for iter.Next() { + e := iter.Entry() + fmt.Printf("%v\t%s\n", e.Timestamp.In(timezone).Format(format), strings.TrimSpace(e.Line)) + if e.StructuredMetadata != nil { + fmt.Println("Structured Metadata:") + for _, meta := range e.StructuredMetadata { + fmt.Println("\t", meta.Name, "=", meta.Value) + } + } } } - - if storeBlocks { - writeBlockToFile(b.rawData, ix, fmt.Sprintf("%s.block.%d", filename, ix)) - writeBlockToFile(b.originalData, ix, fmt.Sprintf("%s.original.%d", filename, ix)) - } } - fmt.Println("Total size of original data:", totalSize, "file size:", si.Size(), "ratio:", fmt.Sprintf("%0.3g", float64(totalSize)/float64(si.Size()))) + if exportLogLines { + exportLogLinesToFile(lokiChunk.blocks, fmt.Sprintf("%s.log", filename)) + } } -func writeBlockToFile(data []byte, blockIndex int, filename string) { - err := os.WriteFile(filename, data, 0644) +func exportLogLinesToFile(blocks []chunkenc.Block, filename string) { + + f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - log.Println("Failed to store block", blockIndex, "to file", filename, "due to error:", err) - } else { - log.Println("Stored block", blockIndex, "to file", filename) + log.Printf("Failed to open (create) file '%s' for writing.\n", filename) + } + defer f.Close() + + pipeline := logql.NewNoopPipeline() + for _, b := range blocks { + iter := b.Iterator(context.Background(), pipeline.ForStream(nil)) + for iter.Next() { + e := iter.Entry() + _, err := f.Write([]byte(e.Line + "\n")) + if err != nil { + log.Printf("Failed to write to file '%s'.\n", filename) + return + } + } } } diff --git a/cmd/chunks-inspect/time.go b/cmd/chunks-inspect/time.go index aac421cc89123..b6c255f5f00b0 100644 --- a/cmd/chunks-inspect/time.go +++ b/cmd/chunks-inspect/time.go @@ -34,14 +34,14 @@ func (t *Time) UnmarshalJSON(b []byte) error { p := strings.Split(string(b), ".") switch len(p) { case 1: - v, err := strconv.ParseInt(string(p[0]), 10, 64) + v, err := strconv.ParseInt(p[0], 10, 64) if err != nil { return err } *t = Time(v * second) case 2: - v, err := strconv.ParseInt(string(p[0]), 10, 64) + v, err := strconv.ParseInt(p[0], 10, 64) if err != nil { return err }