diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index b7b662f..7b5f9cf 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -11,7 +11,7 @@ }, { "ImportPath": "github.com/logplex/logplexc", - "Rev": "2180ec7d8e887d3fde2f7a28ee315baaf2ac0c8d" + "Rev": "502c237239b3264b15e83ef411be39a1e362dc36" } ] } diff --git a/Godeps/_workspace/src/github.com/deafbybeheading/femebe/.gitignore b/Godeps/_workspace/src/github.com/deafbybeheading/femebe/.gitignore deleted file mode 100644 index 1e259b3..0000000 --- a/Godeps/_workspace/src/github.com/deafbybeheading/femebe/.gitignore +++ /dev/null @@ -1 +0,0 @@ -perf-smoke.config diff --git a/Godeps/_workspace/src/github.com/logplex/logplexc/grind_test.go b/Godeps/_workspace/src/github.com/logplex/logplexc/grind_test.go index d63b520..d791ef8 100644 --- a/Godeps/_workspace/src/github.com/logplex/logplexc/grind_test.go +++ b/Godeps/_workspace/src/github.com/logplex/logplexc/grind_test.go @@ -36,7 +36,7 @@ func (n *NoopTripper) RoundTrip(req *http.Request) (*http.Response, error) { var BogusLogplexUrl url.URL func init() { - url, err := url.Parse("https://locahost:23456") + url, err := url.Parse("https://token:a-token@locahost:23456") if err != nil { log.Fatalf("Could not parse url: %v", err) } @@ -55,7 +55,6 @@ func BenchmarkStartup(b *testing.B) { RequestSizeTrigger: 100, Concurrency: 3, Period: 3 * time.Second, - Token: "a-token", } for i := 0; i < b.N; i += 1 { @@ -101,7 +100,8 @@ comparison only.`) for i := 0; i < inputConcur; i += 1 { go func() { for i := 0; i < perGoroutinePayload; i += 1 { - c.BufferMessage(t, "UK", "CharlesDickens", log) + c.BufferMessage(134, t, "UK", "CharlesDickens", + log) } done <- true @@ -135,7 +135,6 @@ func NewNoopClient(f interface { RequestSizeTrigger: sizeTrigger, Concurrency: 3, Period: 3 * time.Second, - Token: "a-token", } c, err := NewClient(&cfg) @@ -163,17 +162,15 @@ func BenchmarkFanInOut(b *testing.B) { } // Try logging to a real, live endpoint URL and token, specified by -// LOGPLEX_URL and LOGPLEX_TOKEN. +// LOGPLEX_URL. // // This is deceptively fast because dropping will be very common, even // on localhost. func BenchmarkToUrl(b *testing.B) { b.StopTimer() - if os.Getenv("LOGPLEX_URL") == "" || - os.Getenv("LOGPLEX_TOKEN") == "" { - b.Fatal("Skipping, no LOGPLEX_URL and LOGPLEX_TOKEN " + - "environment variable set") + if os.Getenv("LOGPLEX_URL") == "" { + b.Fatal("Skipping, no LOGPLEX_URL environment variable set") return } @@ -183,16 +180,6 @@ func BenchmarkToUrl(b *testing.B) { os.Getenv("LOGPLEX_URL"), err) } - token := os.Getenv("LOGPLEX_TOKEN") - if token == "" { - b.Fatalf("Invalid LOGPLEX_TOKEN set: %q", token) - } - - if err != nil { - b.Fatalf("Could not parse logplex endpoint %q: %v", - os.Getenv("LOGPLEX_URL"), err) - } - client := *http.DefaultClient client.Transport = &http.Transport{ TLSClientConfig: &tls.Config{ @@ -206,7 +193,6 @@ func BenchmarkToUrl(b *testing.B) { RequestSizeTrigger: 100 * KB, Concurrency: 3, Period: 3 * time.Second, - Token: token, } c, err := NewClient(&cfg) diff --git a/Godeps/_workspace/src/github.com/logplex/logplexc/logplexc.go b/Godeps/_workspace/src/github.com/logplex/logplexc/logplexc.go index c1950fb..f1f75bd 100644 --- a/Godeps/_workspace/src/github.com/logplex/logplexc/logplexc.go +++ b/Godeps/_workspace/src/github.com/logplex/logplexc/logplexc.go @@ -81,7 +81,6 @@ type Client struct { type Config struct { Logplex url.URL - Token string HttpClient http.Client RequestSizeTrigger int Concurrency int @@ -96,7 +95,6 @@ func NewClient(cfg *Config) (*Client, error) { c, err := NewMiniClient( &MiniConfig{ Logplex: cfg.Logplex, - Token: cfg.Token, HttpClient: cfg.HttpClient, }) @@ -182,9 +180,10 @@ func (m *Client) Close() { } func (m *Client) BufferMessage( - when time.Time, host string, procId string, log []byte) error { + priority int, when time.Time, host string, procId string, + log []byte) error { - s := m.c.BufferMessage(when, host, procId, log) + s := m.c.BufferMessage(priority, when, host, procId, log) if s.Buffered >= m.RequestSizeTrigger || m.timeTrigger == TimeTriggerImmediate { m.maybeWork() diff --git a/Godeps/_workspace/src/github.com/logplex/logplexc/minimal.go b/Godeps/_workspace/src/github.com/logplex/logplexc/minimal.go index c118b2a..aba24e8 100644 --- a/Godeps/_workspace/src/github.com/logplex/logplexc/minimal.go +++ b/Godeps/_workspace/src/github.com/logplex/logplexc/minimal.go @@ -2,6 +2,7 @@ package logplexc import ( "bytes" + "errors" "fmt" "net/http" "net/url" @@ -25,7 +26,6 @@ type MiniStats struct { // constructing a client instance. type MiniConfig struct { Logplex url.URL - Token string HttpClient http.Client } @@ -46,6 +46,9 @@ type MiniClient struct { // Configuration that should not be mutated after creation MiniConfig + // Cached copy of the token, extracted from the Logplex URL. + token string + reqInFlight sync.WaitGroup // Messages that have been collected but not yet sent. @@ -61,12 +64,17 @@ func NewMiniClient(cfg *MiniConfig) (client *MiniClient, err error) { // Make a private copy c.MiniConfig = *cfg - // If the username and password weren't part of the URL, use - // the logplex-token as the password - if c.Logplex.User == nil { - c.Logplex.User = url.UserPassword("token", c.Token) + if c.MiniConfig.Logplex.User == nil { + return nil, errors.New("No logplex user information provided") + } + + token, ok := c.MiniConfig.Logplex.User.Password() + if !ok { + return nil, errors.New("No logplex password provided") } + c.token = token + return &c, nil } @@ -97,10 +105,11 @@ func (c *MiniClient) Statistics() MiniStats { // context, so at worst it seems a buggy or malicious emitter of logs // can cause problems for themselves only. func (c *MiniClient) BufferMessage( - when time.Time, host string, procId string, log []byte) MiniStats { + priority int, when time.Time, host string, procId string, + log []byte) MiniStats { ts := when.UTC().Format(time.RFC3339) - syslogPrefix := "<134>1 " + ts + " " + host + " " + - c.Token + " " + procId + " - - " + syslogPrefix := "<" + strconv.Itoa(priority) + ">1 " + ts + " " + + host + " " + c.token + " " + procId + " - - " msgLen := len(syslogPrefix) + len(log) // Avoid racing against other operations that may want to swap diff --git a/README.rst b/README.rst index 0c463a2..e34e0c5 100644 --- a/README.rst +++ b/README.rst @@ -24,16 +24,17 @@ setting up most of what one needs to demonstrate the entire system end-to-end. It installs everything into a subdirectory ``integration/tmp``. An example is provided below:: - $ PGSRC=git-repo-directory-for-postgres ./integration/Makefile - $ godep go build ./integration/logplexd $ ./logplexd & - https://127.0.0.1:44786 # (dynamically generated) + # (dynamically generated) + https://token:t.9d19ac58-0597-4ea0-94b0-45778803597c@127.0.0.1:32906 + + $ PGSRC=git-repo-directory-for-postgres \ + LOGPLEX_URL=https://token:t.9d19ac58-0597-4ea0-94b0-45778803597c@127.0.0.1:32096 \ + ./integration/Makefile $ godep go build - $ LOGPLEX_URL=https://127.0.0.1:44786 \ - SERVE_DB_DIR=./integration/tmp \ - ./pg_logplexcollector & + $ SERVE_DB_DIR=./integration/tmp ./pg_logplexcollector & $ ./integration/tmp/postgres/bin/postgres -D ./integration/tmp/testdb & [...messages from logplexd, postgres, and pg_logplexcollector here...] diff --git a/integration/Makefile b/integration/Makefile index 4968cd4..d699bdc 100755 --- a/integration/Makefile +++ b/integration/Makefile @@ -29,8 +29,8 @@ postgres: $(TMP)postgres/bin/pg_config pg_logfebe: $(TMP)postgres/lib/pg_logfebe.so $(TMP)serves.new: - printf '{"serves": [{"i": "test identity", "t": "test-token", "p": "%s", "name": "humanname"}]}\n' \ - $(realpath $(TMP))/testdb/log.sock > $@ + printf '{"serves": [{"i": "test identity", "url": "%s", "p": "%s", "name": "humanname"}]}\n' \ + $(LOGPLEX_URL) $(realpath $(TMP))/testdb/log.sock > $@ # Copy files from other projects useful for testing. diff --git a/integration/logplexd/logplexd.go b/integration/logplexd/logplexd.go index 693e21a..e04b0c1 100644 --- a/integration/logplexd/logplexd.go +++ b/integration/logplexd/logplexd.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httptest" "net/http/httputil" + "net/url" "os" "os/signal" ) @@ -26,7 +27,14 @@ func (*LogplexPrint) ServeHTTP(w http.ResponseWriter, r *http.Request) { func main() { s := httptest.NewTLSServer(&LogplexPrint{}) - fmt.Println(s.URL) + u, err := url.Parse(s.URL) + if err != nil { + log.Printf("httptest generated a bad URL: %v", s.URL) + } + + u.User = url.UserPassword("token", + "t.9d19ac58-0597-4ea0-94b0-45778803597c") + fmt.Println(u) // Signal handling: sigch := make(chan os.Signal) diff --git a/pg_logplexcollector.go b/pg_logplexcollector.go index 0b2dfa0..9ed8019 100644 --- a/pg_logplexcollector.go +++ b/pg_logplexcollector.go @@ -7,7 +7,6 @@ import ( "log" "net" "net/http" - "net/url" "os" "os/signal" "strconv" @@ -174,7 +173,7 @@ func processLogRec(lr *logRecord, lpc *logplexc.Client, sr *serveRecord, catOptionalField("Hint", lr.ErrHint) catOptionalField("Query", lr.UserQuery) - err := lpc.BufferMessage(time.Now(), + err := lpc.BufferMessage(134, time.Now(), "postgres", "postgres."+strconv.Itoa(int(lr.Pid)), msgFmtBuf.Bytes()) @@ -239,7 +238,7 @@ func logWorker(die dieCh, rwc io.ReadWriteCloser, cfg logplexc.Config, } // Set up client with serve - cfg.Token = sr.T + cfg.Logplex = sr.u client, err := logplexc.NewClient(&cfg) if err != nil { exit(err) @@ -253,7 +252,7 @@ func logWorker(die dieCh, rwc io.ReadWriteCloser, cfg logplexc.Config, processLogMsg(die, client, msgInit, sr, exit) } -func listen(die dieCh, logplexUrl url.URL, sr *serveRecord) { +func listen(die dieCh, sr *serveRecord) { // Begin listening l, err := net.Listen("unix", sr.P) if err != nil { @@ -293,15 +292,10 @@ func listen(die dieCh, logplexUrl url.URL, sr *serveRecord) { } templateConfig := logplexc.Config{ - Logplex: logplexUrl, HttpClient: client, RequestSizeTrigger: 100 * KB, Concurrency: 3, Period: time.Second / 4, - - // Set at connection start-up when the client - // self-identifies. - Token: "", } for { @@ -352,16 +346,6 @@ func main() { } }() - if os.Getenv("LOGPLEX_URL") == "" { - log.Fatal("LOGPLEX_URL is unset") - } - - logplexUrl, err := url.Parse(os.Getenv("LOGPLEX_URL")) - if err != nil { - log.Fatalf("LOGPLEX_URL: could not parse: %q", - os.Getenv("LOGPLEX_URL")) - } - // Set up serve database and perform its input checking sdbDir := os.Getenv("SERVE_DB_DIR") if sdbDir == "" { @@ -408,7 +392,7 @@ func main() { snap := sdb.Snapshot() for i := range snap { os.Remove(snap[i].P) - go listen(die, *logplexUrl, &snap[i]) + go listen(die, &snap[i]) } } diff --git a/serve_db.go b/serve_db.go index bd3d6fb..c9e7c1a 100644 --- a/serve_db.go +++ b/serve_db.go @@ -39,9 +39,13 @@ // serves.new must have at least the following structure: // // {"serves": [ -// {"i": "identity1": "t": "token1", "p": "/var/run/cluster1/log.sock"}, -// {"i": "identity2": "t": "token2", "p": "/var/run/cluster2/log.sock"} -// ] +// {"i": "identity1", +// "url": "https://token:token-1@some-host.io/logs", +// "p": "/var/run/cluster1/log.sock"}, +// {"i": "identity1", +// "url": "https://token:token-2@some-host.io/logs", +// "p": "/var/run/cluster2/log.sock"}, +// ] // } // // Any other auxiliary keys and values as siblings to the "serves" key @@ -54,6 +58,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net/url" "os" "path" "sync" @@ -66,7 +71,7 @@ type sKey struct { type serveRecord struct { sKey - T string + u url.URL // Auxiliary fields for formatting Name string @@ -376,7 +381,12 @@ func projectFromJson(v interface{}) (*serveRecord, error) { return nil, err } - tok, err := lookup("t") + urlText, err := lookup("url") + if err != nil { + return nil, err + } + + u, err := url.Parse(urlText) if err != nil { return nil, err } @@ -390,7 +400,7 @@ func projectFromJson(v interface{}) (*serveRecord, error) { name, _ := lookup("name") return &serveRecord{sKey: sKey{P: path, I: ident}, - T: tok, Name: name}, nil + u: *u, Name: name}, nil } func (t *serveDb) parse(contents []byte) (map[sKey]*serveRecord, error) { diff --git a/serve_db_test.go b/serve_db_test.go index 4a75082..e3aac8e 100644 --- a/serve_db_test.go +++ b/serve_db_test.go @@ -2,7 +2,9 @@ package main import ( "io/ioutil" + "net/url" "os" + "reflect" "testing" ) @@ -18,39 +20,57 @@ func (f *fixturePair) check(t *testing.T, sdb *serveDb) { t.Fatalf("Expected to find identifier %q", triplet.I) } - if triplet.T != rec.T { - t.Fatalf("Expected to resolve to %v, "+ - "but got %v instead", triplet.T, rec.T) + if !reflect.DeepEqual(triplet.u, rec.u) { + t.Fatalf("Expected to resolve to %+v, "+ + "but got %v instead", triplet.u, rec.u) } } } +func mustParseURL(us string) url.URL { + u, err := url.Parse(us) + if err != nil { + panic(err) + } + return *u +} + var fixtures = []fixturePair{ { json: []byte(`{"serves": ` + - `[{"i": "apple", "t": "chocolate", ` + + `[{"i": "apple", "url": "https://token:chocolate@localhost", ` + `"p": "/p1/log.sock"}, ` + - `{"i": "banana", "t": "vanilla", ` + + `{"i": "banana", "url": "https://token:vanilla@localhost", ` + `"p": "/p2/log.sock"}]}`), triplets: []serveRecord{ - {sKey{I: "apple", P: "/p1/log.sock"}, "chocolate", + {sKey{I: "apple", P: "/p1/log.sock"}, + mustParseURL( + "https://token:chocolate@localhost"), "brown"}, - {sKey{I: "banana", P: "/p2/log.sock"}, "vanilla", + {sKey{I: "banana", P: "/p2/log.sock"}, + mustParseURL( + "https://token:vanilla@localhost"), "white"}, }, }, { json: []byte(`{"serves": ` + - `[{"i": "bed", "t": "pillow", ` + + `[{"i": "bed", ` + + `"url": "https://token:pillow@localhost", ` + `"p": "/p1/log.sock"}, ` + - `{"i": "nightstand", "t": "alarm clock", ` + + `{"i": "nightstand", ` + + `"url": "https://token:alarm-clock@localhost", ` + `"p": "/p2/log.sock"}]}`), triplets: []serveRecord{ {sKey{I: "bed", P: "/p1/log.sock"}, - "pillow", "white"}, + mustParseURL( + "https://token:pillow@localhost"), + "white"}, {sKey{I: "nightstand", P: "/p2/log.sock"}, - "alarm clock", "black"}, + mustParseURL( + "https://token:alarm-clock@localhost"), + "black"}, }, }, } diff --git a/version_message_test.go b/version_message_test.go index 946f665..e08c493 100644 --- a/version_message_test.go +++ b/version_message_test.go @@ -9,7 +9,8 @@ import ( "errors" "testing" - "github.com/deafbybeheading/femebe" + "github.com/deafbybeheading/femebe/buf" + "github.com/deafbybeheading/femebe/core" ) var versionCheckTests = []struct { @@ -29,10 +30,10 @@ var versionCheckTests = []struct { func TestVersionCheck(t *testing.T) { for i, tt := range versionCheckTests { - msgInit := func(dst *femebe.Message, exit exitFn) { - buf := bytes.Buffer{} - femebe.WriteCString(&buf, tt.Version) - dst.InitFromBytes('V', buf.Bytes()) + msgInit := func(dst *core.Message, exit exitFn) { + b := bytes.Buffer{} + buf.WriteCString(&b, tt.Version) + dst.InitFromBytes('V', b.Bytes()) } ok := true @@ -50,7 +51,7 @@ func TestVersionCheck(t *testing.T) { func TestVersionMsgInitErr(t *testing.T) { theErr := errors.New("An error; e.g. network difficulties") - msgInit := func(dst *femebe.Message, exit exitFn) { + msgInit := func(dst *core.Message, exit exitFn) { exit(theErr) }