From f13874378091a380b7953a66bc57e9ebe6973539 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Tue, 28 May 2024 21:21:59 +0000 Subject: [PATCH 1/4] add v4 chunk format to chunks-inspect tool Signed-off-by: Edward Welch --- cmd/chunks-inspect/loki.go | 131 ++++++++++++++++++++++++++++++++++--- cmd/chunks-inspect/main.go | 6 +- 2 files changed, 126 insertions(+), 11 deletions(-) diff --git a/cmd/chunks-inspect/loki.go b/cmd/chunks-inspect/loki.go index d8fd5d0a913fd..802fb7ddf8dc3 100644 --- a/cmd/chunks-inspect/loki.go +++ b/cmd/chunks-inspect/loki.go @@ -59,6 +59,7 @@ const ( chunkFormatV1 chunkFormatV2 chunkFormatV3 + chunkFormatV4 ) type LokiChunk struct { @@ -89,9 +90,15 @@ type LokiBlock struct { computedChecksum uint32 } +type label struct { + name string + val string +} + type LokiEntry struct { - timestamp int64 - line string + timestamp int64 + line string + structuredMetadata []label } func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) { @@ -122,6 +129,7 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) { return nil, fmt.Errorf("failed to read rawData for Loki chunk into memory: %w", err) } + //Magic Number if num := binary.BigEndian.Uint32(data[0:4]); num != 0x012EE56A { return nil, fmt.Errorf("invalid magic number: %0x", num) } @@ -129,6 +137,7 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) { // Chunk format is at position 4 f := data[4] + // Compression compression, err := getCompression(f, data[5]) if err != nil { return nil, fmt.Errorf("failed to read compression: %w", err) @@ -136,13 +145,77 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) { // return &LokiChunk{encoding: compression}, nil - metasOffset := binary.BigEndian.Uint64(data[len(data)-8:]) + // This was copied from memchunk.go newByteChunk() as a helper function to use here also + // In format >=v4 there are multiple metadata sections at the end of the chunk file, + // the offset and length of each is stored as the last bytes in the file + readSectionLenAndOffset := func(idx int) (uint64, uint64) { + lenAndOffsetPos := len(data) - (idx * 16) + lenAndOffset := data[lenAndOffsetPos : lenAndOffsetPos+16] + return binary.BigEndian.Uint64(lenAndOffset[:8]), binary.BigEndian.Uint64(lenAndOffset[8:]) + } + + // Chunk formats v1-v3 had a single metadata section which was stored at the end of the chunk with the last 8 bytes a pointer to the beginning of the metadata table section + metasOffset := uint64(0) + metasLen := uint64(0) + if f < chunkFormatV4 { + // Metadata table is at the end, read the last 8 bytes to get the offset to the start of the table + metasOffset = binary.BigEndian.Uint64(data[len(data)-8:]) + // Exclude the last 8 bytes which is the offset, and the 4 bytes before that which is the checksum from the length + metasLen = uint64(len(data)-(8+4)) - metasOffset + } else { + // the chunk metas section is index 1 + metasLen, metasOffset = readSectionLenAndOffset(1) + } - metadata := data[metasOffset : len(data)-(8+4)] + // Read metadata + metadata := data[metasOffset : metasOffset+metasLen] - metaChecksum := binary.BigEndian.Uint32(data[len(data)-12 : len(data)-8]) + // Read metadata block checksum, the last 4 bytes before the last 8 bytes containing the metadata table start offset + metaChecksum := binary.BigEndian.Uint32(data[metasOffset+metasLen:]) computedMetaChecksum := crc32.Checksum(metadata, castagnoliTable) + // If chunkFormat >= v4 we also need to read the structured metadata section + var structuredMetadataSymbols []string + if f >= chunkFormatV4 { + // the chunk metas section is index 2 + structuredMetadataLength, structuredMetadataOffset := readSectionLenAndOffset(2) + + //expCRC := binary.BigEndian.Uint32(data[structuredMetadataOffset+structuredMetadataLength:]) + //computedMetaChecksum := crc32.Checksum(lb, castagnoliTable) + + structuredMetadata := data[structuredMetadataOffset : structuredMetadataOffset+structuredMetadataLength] + + // Structured Metadata is "normalized" or "compressed" by storing an index to a string with each log line, and then all the strings in the chunk metadata section + // Here we need to extract the list of string from the metadata to be used for look ups when decompressing the log lines + // First we read the number of symbols + symbols, n := binary.Uvarint(structuredMetadata) + if n <= 0 { + return nil, fmt.Errorf("failed to read number of labels in structured metadata") + } + structuredMetadata = structuredMetadata[n:] + + // Next we need to decompress the list of strings + lr, err := compression.readerFn(bytes.NewReader(structuredMetadata)) + decompressed, err := io.ReadAll(lr) + if err != nil { + return nil, err + } + + structuredMetadataSymbols = make([]string, 0, symbols) + // Read every label and add it to a map for easy lookup + for i := 0; i < int(symbols); i++ { + strLen, read := binary.Uvarint(decompressed) + if read <= 0 { + return nil, fmt.Errorf("expected to find a length for a structured metadata string but did not find one") + } + //strLen will be the length of the label, we read one byte passed because the [:end] is not inclusive + //we then reslice the slice to be one byte past because the [start:] is inclusive + str := string(decompressed[:strLen+1]) + structuredMetadataSymbols = append(structuredMetadataSymbols, str) + decompressed = decompressed[strLen+1:] + } + } + blocks, n := binary.Uvarint(metadata) if n <= 0 { return nil, fmt.Errorf("failed to read number of blocks") @@ -158,13 +231,19 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) { for ix := 0; ix < int(blocks); ix++ { block := LokiBlock{} + // Read number of entries in block block.numEntries, metadata, err = readUvarint(err, metadata) + // Read block minimum time block.minT, metadata, err = readVarint(err, metadata) + // Read block max time block.maxT, metadata, err = readVarint(err, metadata) + // Read offset to block data block.dataOffset, metadata, err = readUvarint(err, metadata) if f >= chunkFormatV3 { + // Read uncompressed size block.uncompSize, metadata, err = readUvarint(err, metadata) } + // Read block length dataLength := uint64(0) dataLength, metadata, err = readUvarint(err, metadata) @@ -175,14 +254,14 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) { block.rawData = data[block.dataOffset : block.dataOffset+dataLength] block.storedChecksum = binary.BigEndian.Uint32(data[block.dataOffset+dataLength : block.dataOffset+dataLength+4]) block.computedChecksum = crc32.Checksum(block.rawData, castagnoliTable) - block.originalData, block.entries, err = parseLokiBlock(compression, block.rawData) + block.originalData, block.entries, err = parseLokiBlock(compression, block.rawData, structuredMetadataSymbols) lokiChunk.blocks = append(lokiChunk.blocks, block) } return lokiChunk, nil } -func parseLokiBlock(compression Encoding, data []byte) ([]byte, []LokiEntry, error) { +func parseLokiBlock(compression Encoding, data []byte, symbols []string) ([]byte, []LokiEntry, error) { r, err := compression.readerFn(bytes.NewReader(data)) if err != nil { return nil, nil, err @@ -208,13 +287,45 @@ func parseLokiBlock(compression Encoding, data []byte) ([]byte, []LokiEntry, err if len(decompressed) < int(lineLength) { return origDecompressed, nil, fmt.Errorf("not enough line data, need %d, got %d", lineLength, len(decompressed)) } + line := string(decompressed[0:lineLength]) + decompressed = decompressed[lineLength:] + + var structuredMetdata []label + if symbols != nil && len(symbols) > 0 { + // The length of the symbols section is encoded first, but we don't really need it here because everything is a Uvarint + // Read it to advance the buffer to the next element. + _, decompressed, err = readUvarint(err, decompressed) + + // Read number of structured metadata pairs + var structuredMetadataPairs uint64 + structuredMetadataPairs, decompressed, err = readUvarint(err, decompressed) + if err != nil { + return origDecompressed, nil, err + } + structuredMetdata = make([]label, 0, structuredMetadataPairs) + // Read all the pairs + for i := 0; i < int(structuredMetadataPairs); i++ { + var nameIdx uint64 + nameIdx, decompressed, err = readUvarint(err, decompressed) + if err != nil { + return origDecompressed, nil, err + } + var valIdx uint64 + valIdx, decompressed, err = readUvarint(err, decompressed) + if err != nil { + return origDecompressed, nil, err + } + lbl := label{name: symbols[nameIdx], val: symbols[valIdx]} + structuredMetdata = append(structuredMetdata, lbl) + } + } entries = append(entries, LokiEntry{ - timestamp: timestamp, - line: string(decompressed[0:lineLength]), + timestamp: timestamp, + line: line, + structuredMetadata: structuredMetdata, }) - decompressed = decompressed[lineLength:] } return origDecompressed, entries, nil diff --git a/cmd/chunks-inspect/main.go b/cmd/chunks-inspect/main.go index c8202e18b0a67..5152711489181 100644 --- a/cmd/chunks-inspect/main.go +++ b/cmd/chunks-inspect/main.go @@ -108,7 +108,11 @@ func printFile(filename string, blockDetails, printLines, storeBlocks bool) { if printLines { for _, l := range b.entries { - fmt.Printf("%v\t%s\n", time.Unix(0, l.timestamp).In(timezone).Format(format), strings.TrimSpace(l.line)) + fmt.Printf("%v\t%s", time.Unix(0, l.timestamp).In(timezone).Format(format), strings.TrimSpace(l.line)) + for _, s := range l.structuredMetadata { + fmt.Printf("\t%s=%s", s.name, s.val) + } + fmt.Println() } } From 4abfce5390bdb3156d9634d2d5007ff7cb6a7995 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Tue, 28 May 2024 21:42:08 +0000 Subject: [PATCH 2/4] fix Signed-off-by: Edward Welch --- cmd/chunks-inspect/loki.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cmd/chunks-inspect/loki.go b/cmd/chunks-inspect/loki.go index 802fb7ddf8dc3..2f74ab6ea98aa 100644 --- a/cmd/chunks-inspect/loki.go +++ b/cmd/chunks-inspect/loki.go @@ -204,15 +204,18 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) { structuredMetadataSymbols = make([]string, 0, symbols) // Read every label and add it to a map for easy lookup for i := 0; i < int(symbols); i++ { + // Read the length of the string strLen, read := binary.Uvarint(decompressed) if read <= 0 { return nil, fmt.Errorf("expected to find a length for a structured metadata string but did not find one") } - //strLen will be the length of the label, we read one byte passed because the [:end] is not inclusive - //we then reslice the slice to be one byte past because the [start:] is inclusive - str := string(decompressed[:strLen+1]) + decompressed = decompressed[read:] + + // Read the bytes of the string and advance the buffer + str := string(decompressed[:strLen]) + decompressed = decompressed[strLen:] + // Append to our slice of symbols structuredMetadataSymbols = append(structuredMetadataSymbols, str) - decompressed = decompressed[strLen+1:] } } From 1a31b1fecbdae25397a35cac92d12bcbcb394a5b Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Wed, 29 May 2024 02:06:12 +0000 Subject: [PATCH 3/4] read two fields for structured metadata even if there is none included Signed-off-by: Edward Welch --- cmd/chunks-inspect/loki.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/chunks-inspect/loki.go b/cmd/chunks-inspect/loki.go index 2f74ab6ea98aa..5370c917b0d21 100644 --- a/cmd/chunks-inspect/loki.go +++ b/cmd/chunks-inspect/loki.go @@ -257,14 +257,14 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) { block.rawData = data[block.dataOffset : block.dataOffset+dataLength] block.storedChecksum = binary.BigEndian.Uint32(data[block.dataOffset+dataLength : block.dataOffset+dataLength+4]) block.computedChecksum = crc32.Checksum(block.rawData, castagnoliTable) - block.originalData, block.entries, err = parseLokiBlock(compression, block.rawData, structuredMetadataSymbols) + block.originalData, block.entries, err = parseLokiBlock(f, compression, block.rawData, structuredMetadataSymbols) lokiChunk.blocks = append(lokiChunk.blocks, block) } return lokiChunk, nil } -func parseLokiBlock(compression Encoding, data []byte, symbols []string) ([]byte, []LokiEntry, error) { +func parseLokiBlock(format byte, compression Encoding, data []byte, symbols []string) ([]byte, []LokiEntry, error) { r, err := compression.readerFn(bytes.NewReader(data)) if err != nil { return nil, nil, err @@ -294,7 +294,7 @@ func parseLokiBlock(compression Encoding, data []byte, symbols []string) ([]byte decompressed = decompressed[lineLength:] var structuredMetdata []label - if symbols != nil && len(symbols) > 0 { + if format >= chunkFormatV4 { // The length of the symbols section is encoded first, but we don't really need it here because everything is a Uvarint // Read it to advance the buffer to the next element. _, decompressed, err = readUvarint(err, decompressed) From b8c7f510dc6d3fbe7f7753a1a2232ac107577bf6 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Fri, 13 Dec 2024 20:57:58 +0000 Subject: [PATCH 4/4] cleanup formatting a bit Signed-off-by: Edward Welch --- cmd/chunks-inspect/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/chunks-inspect/main.go b/cmd/chunks-inspect/main.go index 5152711489181..ea20ecfd9e728 100644 --- a/cmd/chunks-inspect/main.go +++ b/cmd/chunks-inspect/main.go @@ -108,11 +108,11 @@ func printFile(filename string, blockDetails, printLines, storeBlocks bool) { if printLines { for _, l := range b.entries { - fmt.Printf("%v\t%s", time.Unix(0, l.timestamp).In(timezone).Format(format), strings.TrimSpace(l.line)) + fmt.Printf("TS(%v) LINE(%s) STRUCTURED_METADATA(", time.Unix(0, l.timestamp).In(timezone).Format(format), strings.TrimSpace(l.line)) for _, s := range l.structuredMetadata { - fmt.Printf("\t%s=%s", s.name, s.val) + fmt.Printf("%s=%s ", s.name, s.val) } - fmt.Println() + fmt.Println(")") } }