Skip to content

Commit

Permalink
Merge pull request #83 from akerouanton/improve-tests
Browse files Browse the repository at this point in the history
Test how the logger behaves in async mode
  • Loading branch information
tagomoris authored Aug 4, 2020
2 parents 28f6a3e + c19c097 commit 89c0329
Show file tree
Hide file tree
Showing 2 changed files with 415 additions and 165 deletions.
38 changes: 29 additions & 9 deletions fluent/fluent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (
"bytes"
"encoding/base64"
"encoding/binary"
"github.com/tinylib/msgp/msgp"
"math/rand"

"github.com/tinylib/msgp/msgp"
)

const (
Expand Down Expand Up @@ -84,6 +85,7 @@ type msgToSend struct {
type Fluent struct {
Config

dialer dialer
stopRunning chan bool
pending chan *msgToSend
wg sync.WaitGroup
Expand All @@ -93,7 +95,20 @@ type Fluent struct {
}

// New creates a new Logger.
func New(config Config) (f *Fluent, err error) {
func New(config Config) (*Fluent, error) {
if config.Timeout == 0 {
config.Timeout = defaultTimeout
}
return newWithDialer(config, &net.Dialer{
Timeout: config.Timeout,
})
}

type dialer interface {
Dial(string, string) (net.Conn, error)
}

func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
if config.FluentNetwork == "" {
config.FluentNetwork = defaultNetwork
}
Expand All @@ -106,9 +121,6 @@ func New(config Config) (f *Fluent, err error) {
if config.FluentSocketPath == "" {
config.FluentSocketPath = defaultSocketPath
}
if config.Timeout == 0 {
config.Timeout = defaultTimeout
}
if config.WriteTimeout == 0 {
config.WriteTimeout = defaultWriteTimeout
}
Expand All @@ -128,15 +140,20 @@ func New(config Config) (f *Fluent, err error) {
fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
config.Async = config.Async || config.AsyncConnect
}

if config.Async {
f = &Fluent{
Config: config,
dialer: d,
pending: make(chan *msgToSend, config.BufferLimit),
}
f.wg.Add(1)
go f.run()
} else {
f = &Fluent{Config: config}
f = &Fluent{
Config: config,
dialer: d,
}
err = f.connect()
}
return
Expand Down Expand Up @@ -340,12 +357,15 @@ func (f *Fluent) close(c net.Conn) {

// connect establishes a new connection using the specified transport.
func (f *Fluent) connect() (err error) {

switch f.Config.FluentNetwork {
case "tcp":
f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout)
f.conn, err = f.dialer.Dial(
f.Config.FluentNetwork,
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
case "unix":
f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentSocketPath, f.Config.Timeout)
f.conn, err = f.dialer.Dial(
f.Config.FluentNetwork,
f.Config.FluentSocketPath)
default:
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
}
Expand Down
Loading

0 comments on commit 89c0329

Please sign in to comment.