Skip to content

Commit

Permalink
reduce fluentd connection churn during high event-handler load (#48909)…
Browse files Browse the repository at this point in the history
… (#49036)
  • Loading branch information
fspmarshall authored Nov 15, 2024
1 parent 1bdcb97 commit c053ba2
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 44 deletions.
10 changes: 10 additions & 0 deletions integrations/event-handler/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type FluentdConfig struct {

// FluentdCA is a path to fluentd CA
FluentdCA string `help:"fluentd TLS CA file" type:"existingfile" env:"FDWRD_FLUENTD_CA"`

// FluentdMaxConnections caps the number of connections to fluentd. Defaults to a dynamic value
// calculated relative to app-level concurrency.
FluentdMaxConnections int `help:"Maximum number of connections to fluentd" env:"FDWRD_MAX_CONNECTIONS"`
}

// TeleportConfig is Teleport instance configuration
Expand Down Expand Up @@ -238,6 +242,11 @@ func (c *StartCmdConfig) Validate() error {
c.SkipSessionTypes = lib.SliceToAnonymousMap(c.SkipSessionTypesRaw)
c.SkipEventTypes = lib.SliceToAnonymousMap(c.SkipEventTypesRaw)

if c.FluentdMaxConnections < 1 {
// 2x concurrency is effectively uncapped.
c.FluentdMaxConnections = c.Concurrency * 2
}

return nil
}

Expand All @@ -257,6 +266,7 @@ func (c *StartCmdConfig) Dump(ctx context.Context) {
log.WithField("ca", c.FluentdCA).Info("Using Fluentd ca")
log.WithField("cert", c.FluentdCert).Info("Using Fluentd cert")
log.WithField("key", c.FluentdKey).Info("Using Fluentd key")
log.WithField("max_connections", c.FluentdMaxConnections).Info("Using Fluentd max connections")
log.WithField("window-size", c.WindowSize).Info("Using window size")

if c.TeleportIdentityFile != "" {
Expand Down
33 changes: 18 additions & 15 deletions integrations/event-handler/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ func TestStartCmdConfig(t *testing.T) {
Debug: false,
Start: StartCmdConfig{
FluentdConfig: FluentdConfig{
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: path.Join(wd, "testdata", "fake-file"),
FluentdKey: path.Join(wd, "testdata", "fake-file"),
FluentdCA: path.Join(wd, "testdata", "fake-file"),
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: path.Join(wd, "testdata", "fake-file"),
FluentdKey: path.Join(wd, "testdata", "fake-file"),
FluentdCA: path.Join(wd, "testdata", "fake-file"),
FluentdMaxConnections: 10,
},
TeleportConfig: TeleportConfig{
TeleportAddr: "localhost:3025",
Expand Down Expand Up @@ -83,11 +84,12 @@ func TestStartCmdConfig(t *testing.T) {
Debug: true,
Start: StartCmdConfig{
FluentdConfig: FluentdConfig{
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: path.Join(wd, "testdata", "fake-file"),
FluentdKey: path.Join(wd, "testdata", "fake-file"),
FluentdCA: path.Join(wd, "testdata", "fake-file"),
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: path.Join(wd, "testdata", "fake-file"),
FluentdKey: path.Join(wd, "testdata", "fake-file"),
FluentdCA: path.Join(wd, "testdata", "fake-file"),
FluentdMaxConnections: 10,
},
TeleportConfig: TeleportConfig{
TeleportAddr: "localhost:3025",
Expand Down Expand Up @@ -121,11 +123,12 @@ func TestStartCmdConfig(t *testing.T) {
Debug: true,
Start: StartCmdConfig{
FluentdConfig: FluentdConfig{
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: path.Join(wd, "testdata", "fake-file"),
FluentdKey: path.Join(wd, "testdata", "fake-file"),
FluentdCA: path.Join(wd, "testdata", "fake-file"),
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: path.Join(wd, "testdata", "fake-file"),
FluentdKey: path.Join(wd, "testdata", "fake-file"),
FluentdCA: path.Join(wd, "testdata", "fake-file"),
FluentdMaxConnections: 10,
},
TeleportConfig: TeleportConfig{
TeleportAddr: "localhost:3025",
Expand Down
7 changes: 4 additions & 3 deletions integrations/event-handler/fake_fluentd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,10 @@ func (f *FakeFluentd) createServer() error {
// GetClientConfig returns FlientdConfig to connect to this fake fluentd server instance
func (f *FakeFluentd) GetClientConfig() FluentdConfig {
return FluentdConfig{
FluentdCA: f.caCertPath,
FluentdCert: f.clientCertPath,
FluentdKey: f.clientKeyPath,
FluentdCA: f.caCertPath,
FluentdCert: f.clientCertPath,
FluentdKey: f.clientKeyPath,
FluentdMaxConnections: 3,
}
}

Expand Down
35 changes: 27 additions & 8 deletions integrations/event-handler/fluentd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
"golang.org/x/net/http2"

tlib "github.com/gravitational/teleport/integrations/lib"
)
Expand All @@ -40,6 +41,7 @@ const (
type FluentdClient struct {
// client HTTP client to send requests
client *http.Client
sem chan struct{}
}

// NewFluentdClient creates new FluentdClient
Expand All @@ -55,22 +57,34 @@ func NewFluentdClient(c *FluentdConfig) (*FluentdClient, error) {
return nil, trace.BadParameter("both fluentd_cert and fluentd_key should be specified")
}

if c.FluentdMaxConnections <= 0 {
return nil, trace.BadParameter("fluentd_max_connections should be greater than 0")
}

ca, err := getCertPool(c)
if err != nil {
return nil, trace.Wrap(err)
}

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: ca,
Certificates: certs,
},
transport := &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: ca,
Certificates: certs,
},
Timeout: httpTimeout,
MaxIdleConnsPerHost: c.FluentdMaxConnections,
IdleConnTimeout: httpTimeout,
}

if err := http2.ConfigureTransport(transport); err != nil {
return nil, trace.Wrap(err)
}

return &FluentdClient{client: client}, nil
client := &http.Client{
Transport: transport,
Timeout: httpTimeout,
}

return &FluentdClient{client: client, sem: make(chan struct{}, c.FluentdMaxConnections)}, nil
}

// getCertPool reads CA certificate and returns CA cert pool if passed
Expand All @@ -91,6 +105,11 @@ func getCertPool(c *FluentdConfig) (*x509.CertPool, error) {

// Send sends event to fluentd
func (f *FluentdClient) Send(ctx context.Context, url string, b []byte) error {
f.sem <- struct{}{}
defer func() {
<-f.sem
}()

log.WithField("payload", string(b)).Debug("Sending event to Fluentd")

req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(b))
Expand Down
12 changes: 6 additions & 6 deletions integrations/event-handler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/sethvargo/go-limiter v1.0.0
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
golang.org/x/net v0.30.0
golang.org/x/time v0.5.0
google.golang.org/protobuf v1.34.2
)
Expand Down Expand Up @@ -275,15 +276,14 @@ require (
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/api v0.177.0 // indirect
google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240429193739-8cf5692501f6 // indirect
Expand Down
24 changes: 12 additions & 12 deletions integrations/event-handler/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1680,8 +1680,8 @@ golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIi
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -1808,8 +1808,8 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -1857,8 +1857,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -1959,8 +1959,8 @@ golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand All @@ -1976,8 +1976,8 @@ golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA=
golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0=
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -1997,8 +1997,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down

0 comments on commit c053ba2

Please sign in to comment.