Skip to content

Commit

Permalink
Merge branch 'main' into shantanu/remove-ruler-configs
Browse files Browse the repository at this point in the history
  • Loading branch information
shantanualsi committed Nov 16, 2023
2 parents 23a7a64 + 8328345 commit 95af9dc
Show file tree
Hide file tree
Showing 126 changed files with 8,237 additions and 3,455 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/doc-validator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
doc-validator:
runs-on: "ubuntu-latest"
container:
image: "grafana/doc-validator:v3.0.0"
image: "grafana/doc-validator:v4.0.0"
steps:
- name: "Checkout code"
uses: "actions/checkout@v3"
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/vulnerability-scan.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ jobs:
snyk:
name: Snyk Scan
runs-on: ubuntu-latest
permissions:
issues: write
pull-requests: write
steps:
- name: Checkout code
uses: actions/checkout@master
Expand Down Expand Up @@ -47,6 +50,9 @@ jobs:
trivy:
name: Trivy Scan
runs-on: ubuntu-20.04
permissions:
issues: write
pull-requests: write
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ issues:
- Error return value of .*.Log.* is not checked
- Error return value of `` is not checked
exclude-rules:
- path: pkg/scheduler/scheduler.go
text: 'SA1019: msg.GetHttpRequest is deprecated: Do not use'
- path: '(.+)_test\.go'
linters:
- goconst
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
* [10733](https://github.com/grafana/loki/pull/10733) **shantanualsi** Add support for case-insensitive logql funtions
* [10727](https://github.com/grafana/loki/pull/10727) **sandeepsukhani** Native otlp ingestion support
* [11051](https://github.com/grafana/loki/pull/11051) Refactor to not use global logger in modules
* [10956](https://github.com/grafana/loki/pull/10956) **jeschkies** do not wrap requests but send pure Protobuf from frontend v2 via scheduler to querier when `-frontend.encoding=protobuf`.

##### Fixes
* [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var.

Expand Down Expand Up @@ -171,6 +173,7 @@
* [10341](https://github.com/grafana/loki/pull/10341) **ashwanthgoli** Deprecate older index types and non-object stores - `aws-dynamo, gcp, gcp-columnkey, bigtable, bigtable-hashed, cassandra, grpc`
* [10344](https://github.com/grafana/loki/pull/10344) **ashwanthgoli** Compactor: deprecate `-boltdb.shipper.compactor.` prefix in favor of `-compactor.`.
* [10073](https://github.com/grafana/loki/pull/10073) **sandeepsukhani,salvacorts,vlad-diachenko** Support attaching structured metadata to log lines.
* [11151](https://github.com/grafana/loki/pull/11151) **ashwanthgoli**: Removes already deprecated configs: `ruler.evaluation-delay-duration`, `boltdb.shipper.compactor.deletion-mode`, `validation.enforce-metric-name` and flags with prefix `-boltdb.shipper.compactor.*`.

##### Fixes

Expand Down
13 changes: 8 additions & 5 deletions clients/cmd/logstash/lib/logstash/outputs/loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class LogStash::Outputs::Loki < LogStash::Outputs::Base
## 'An array of fields to map to labels, if defined only fields in this list will be mapped.'
config :include_fields, :validate => :array, :default => [], :required => false

## 'An array of fields to map to structure metadata, if defined only fields in this list will be mapped.'
config :metadata_fields, :validate => :array, :default => [], :required => false

## 'Backoff configuration. Maximum backoff time between retries. Default 300s'
config :max_delay, :validate => :number, :default => 300, :required => false

Expand All @@ -71,7 +74,7 @@ def register
@logger.info("Loki output plugin", :class => self.class.name)

# initialize Queue and Mutex
@entries = Queue.new
@entries = Queue.new
@mutex = Mutex.new
@stop = false

Expand All @@ -94,7 +97,7 @@ def max_batch_size
@mutex.synchronize do
return if @stop
end

e = @entries.deq
return if e.nil?

Expand Down Expand Up @@ -201,13 +204,13 @@ def is_batch_expired
## Receives logstash events
public
def receive(event)
@entries << Entry.new(event, @message_field, @include_fields)
@entries << Entry.new(event, @message_field, @include_fields, @metadata_fields)
end

def close
@entries.close
@mutex.synchronize do
@stop = true
@mutex.synchronize do
@stop = true
end
@batch_wait_thread.join
@batch_size_thread.join
Expand Down
14 changes: 13 additions & 1 deletion clients/cmd/logstash/lib/logstash/outputs/loki/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,19 @@ def to_json
def build_stream(stream)
values = []
stream['entries'].each { |entry|
values.append([entry['ts'].to_s, entry['line']])
if entry.key?('metadata')
sorted_metadata = entry['metadata'].sort.to_h
values.append([
entry['ts'].to_s,
entry['line'],
sorted_metadata
])
else
values.append([
entry['ts'].to_s,
entry['line']
])
end
}
return {
'stream'=>stream['labels'],
Expand Down
18 changes: 17 additions & 1 deletion clients/cmd/logstash/lib/logstash/outputs/loki/entry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def to_ns(s)
class Entry
include Loki
attr_reader :labels, :entry
def initialize(event,message_field,include_fields)
def initialize(event,message_field,include_fields,metadata_fields)
@entry = {
"ts" => to_ns(event.get("@timestamp")),
"line" => event.get(message_field).to_s
Expand All @@ -21,6 +21,22 @@ def initialize(event,message_field,include_fields)
next if include_fields.length() > 0 and not include_fields.include?(key)
@labels[key] = value.to_s
}

# Unlike include_fields we should skip if no metadata_fields provided
if metadata_fields.length() > 0
@metadata = {}
event.to_hash.each { |key,value|
next if key.start_with?('@')
next if value.is_a?(Hash)
next if metadata_fields.length() > 0 and not metadata_fields.include?(key)
@metadata[key] = value.to_s
}

# Add @metadata to @entry if there was a match
if @metadata.size > 0
@entry.merge!('metadata' => @metadata)
end
end
end
end
end
2 changes: 1 addition & 1 deletion clients/cmd/logstash/logstash-output-loki.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-loki'
s.version = '1.1.0'
s.version = '1.2.0'
s.authors = ['Aditya C S','Cyril Tovena']
s.email = ['[email protected]','[email protected]']

Expand Down
15 changes: 15 additions & 0 deletions clients/cmd/logstash/loki-metadata.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
input {
generator {
message => "Hello world!"
count => 10
add_field => {cluster=> "foo" namespace=>"bar" trace_id=> "trace_001"}
}
}

output {
loki {
url => "http://localhost:3100"
include_fields => ["cluster"]
metadata_fields => ["trace_id"]
}
}
3 changes: 3 additions & 0 deletions clients/cmd/logstash/loki.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ output {
# If include_fields is set, only fields in this list will be sent to Loki as labels.
#include_fields => ["service","host","app","env"] #default empty array, all labels included.

# If metadata_fields is set, fields in this list will be sent to Loki as structured metadata for the associated log.
#metadata_fields => ["trace_id"] #default empty array, no structure metadata will be included

#batch_wait => 1 ## in seconds #default 1 second

#batch_size => 102400 #bytes #default 102400 bytes
Expand Down
21 changes: 15 additions & 6 deletions clients/cmd/logstash/spec/outputs/loki/entry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,40 @@
{'@path' => '/path/to/file.log'},
},
'host' => '172.0.0.1',
'trace_id' => 'trace_001',
'@timestamp' => Time.now
}
)
}

it 'labels extracted should not contains object and metadata or timestamp' do
entry = Entry.new(event,"message", [])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'host' => '172.0.0.1', 'foo'=>'5'})
entry = Entry.new(event,"message", [], [])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'host' => '172.0.0.1', 'foo'=>'5', 'trace_id'=>'trace_001'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
end

it 'labels extracted should only contain allowlisted labels' do
entry = Entry.new(event, "message", %w[agent foo])
entry = Entry.new(event, "message", %w[agent foo], [])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'foo'=>'5'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
end

it 'labels and structured metadata extracted should only contain allow listed labels and metadata' do
entry = Entry.new(event, "message", %w[agent foo], %w[trace_id])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'foo'=>'5'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
expect(entry.entry['metadata']).to eql({'trace_id' => 'trace_001'})
end
end

context 'test batch generation with label order' do
let (:entries) {[
Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", []),
Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log", []),
Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message", []),
Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], []),
Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log", [], []),
Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message", [], []),

]}
let (:expected) {
Expand Down
55 changes: 49 additions & 6 deletions clients/cmd/logstash/spec/outputs/loki_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@

context 'when adding en entry to the batch' do
let (:simple_loki_config) {{'url' => 'http://localhost:3100'}}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])}
let (:lbs) {{"buzz"=>"bar","cluster"=>"us-central1"}.sort.to_h}
let (:include_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"] }}
let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", ["cluster"])}
let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", ["cluster"], [])}
let (:include_lbs) {{"cluster"=>"us-central1"}.sort.to_h}

it 'should not add empty line' do
plugin = LogStash::Plugin.lookup("output", "loki").new(simple_loki_config)
emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo", [])
emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo", [], [])
expect(plugin.add_entry_to_batch(emptyEntry)).to eql true
expect(plugin.batch).to eql nil
end
Expand Down Expand Up @@ -83,8 +83,51 @@
end
end

context 'when building json from batch to send' do
let (:basic_loki_config) {{'url' => 'http://localhost:3100'}}
let (:basic_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", [], [])}
let (:include_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"] }}
let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], [])}
let (:metadata_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"], 'metadata_fields' => ["trace_id"] }}
let (:metadata_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], ["trace_id"])}
let (:metadata_multi_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"], 'metadata_fields' => ["trace_id", "user_id"] }}
let (:metadata_multi_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","user_id"=>"user_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], ["trace_id", "user_id"])}

it 'should not include labels or metadata' do
plugin = LogStash::Plugin.lookup("output", "loki").new(basic_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(basic_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"buzz":"bar","cluster":"us-central1","trace_id":"trace_001"},"values":[["1000000000","foobuzz"]]}]}'
end

it 'should include metadata with no labels' do
plugin = LogStash::Plugin.lookup("output", "loki").new(metadata_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(metadata_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz",{"trace_id":"trace_001"}]]}]}'
end

it 'should include labels with no metadata' do
plugin = LogStash::Plugin.lookup("output", "loki").new(include_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(include_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz"]]}]}'
end

it 'should include labels with multiple metadata' do
plugin = LogStash::Plugin.lookup("output", "loki").new(metadata_multi_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(metadata_multi_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz",{"trace_id":"trace_001","user_id":"user_001"}]]}]}'
end
end

context 'batch expiration' do
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])}

it 'should not expire if empty' do
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5}))
Expand Down Expand Up @@ -147,13 +190,13 @@
loki.receive(event)
sent.deq
sleep(0.01) # Adding a minimal sleep. In few cases @batch=nil might happen after evaluating for nil
expect(loki.batch).to be_nil
expect(loki.batch).to be_nil
loki.close
end
end

context 'http requests' do
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])}

it 'should send credentials' do
conf = {
Expand Down
2 changes: 1 addition & 1 deletion clients/cmd/promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func main() {
exit(1)
}
serverCfg := &config.Config.ServerConfig.Config
serverCfg.Log = util_log.InitLogger(serverCfg, prometheus.DefaultRegisterer, true, false)
serverCfg.Log = util_log.InitLogger(serverCfg, prometheus.DefaultRegisterer, false)

// Use Stderr instead of files for the klog.
klog.SetOutput(os.Stderr)
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func Test_dropStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/labelallow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func Test_addLabelStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/labeldrop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func Test_dropLabelStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

tests := []struct {
Expand Down
6 changes: 3 additions & 3 deletions clients/pkg/logentry/stages/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func Test_multilineStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString("3s")}
Expand Down Expand Up @@ -52,7 +52,7 @@ func Test_multilineStage_MultiStreams(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString("3s")}
Expand Down Expand Up @@ -97,7 +97,7 @@ func Test_multilineStage_MaxWaitTime(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

maxWait := 2 * time.Second
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/pack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func Test_packStage_Run(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

tests := []struct {
Expand Down
Loading

0 comments on commit 95af9dc

Please sign in to comment.