Skip to content

Commit

Permalink
Use URLs in configuration instead of a token field
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Farina committed Jul 15, 2014
1 parent 8984e22 commit 3a190f2
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 86 deletions.
2 changes: 1 addition & 1 deletion Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

26 changes: 6 additions & 20 deletions Godeps/_workspace/src/github.com/logplex/logplexc/grind_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions Godeps/_workspace/src/github.com/logplex/logplexc/logplexc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 17 additions & 8 deletions Godeps/_workspace/src/github.com/logplex/logplexc/minimal.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ setting up most of what one needs to demonstrate the entire system
end-to-end. It installs everything into a subdirectory
``integration/tmp``. An example is provided below::

$ PGSRC=git-repo-directory-for-postgres ./integration/Makefile

$ godep go build ./integration/logplexd
$ ./logplexd &
https://127.0.0.1:44786 # (dynamically generated)
# (dynamically generated)
https://token:[email protected]:32906

$ PGSRC=git-repo-directory-for-postgres \
LOGPLEX_URL=https://token:[email protected]:32096 \
./integration/Makefile

$ godep go build
$ LOGPLEX_URL=https://127.0.0.1:44786 \
SERVE_DB_DIR=./integration/tmp \
./pg_logplexcollector &
$ SERVE_DB_DIR=./integration/tmp ./pg_logplexcollector &

$ ./integration/tmp/postgres/bin/postgres -D ./integration/tmp/testdb &
[...messages from logplexd, postgres, and pg_logplexcollector here...]
Expand Down
4 changes: 2 additions & 2 deletions integration/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ postgres: $(TMP)postgres/bin/pg_config
pg_logfebe: $(TMP)postgres/lib/pg_logfebe.so

$(TMP)serves.new:
printf '{"serves": [{"i": "test identity", "t": "test-token", "p": "%s", "name": "humanname"}]}\n' \
$(realpath $(TMP))/testdb/log.sock > $@
printf '{"serves": [{"i": "test identity", "url": "%s", "p": "%s", "name": "humanname"}]}\n' \
$(LOGPLEX_URL) $(realpath $(TMP))/testdb/log.sock > $@

# Copy files from other projects useful for testing.

Expand Down
10 changes: 9 additions & 1 deletion integration/logplexd/logplexd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"os"
"os/signal"
)
Expand All @@ -26,7 +27,14 @@ func (*LogplexPrint) ServeHTTP(w http.ResponseWriter, r *http.Request) {

func main() {
s := httptest.NewTLSServer(&LogplexPrint{})
fmt.Println(s.URL)
u, err := url.Parse(s.URL)
if err != nil {
log.Printf("httptest generated a bad URL: %v", s.URL)
}

u.User = url.UserPassword("token",
"t.9d19ac58-0597-4ea0-94b0-45778803597c")
fmt.Println(u)

// Signal handling:
sigch := make(chan os.Signal)
Expand Down
24 changes: 4 additions & 20 deletions pg_logplexcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"strconv"
Expand Down Expand Up @@ -174,7 +173,7 @@ func processLogRec(lr *logRecord, lpc *logplexc.Client, sr *serveRecord,
catOptionalField("Hint", lr.ErrHint)
catOptionalField("Query", lr.UserQuery)

err := lpc.BufferMessage(time.Now(),
err := lpc.BufferMessage(134, time.Now(),
"postgres",
"postgres."+strconv.Itoa(int(lr.Pid)),
msgFmtBuf.Bytes())
Expand Down Expand Up @@ -239,7 +238,7 @@ func logWorker(die dieCh, rwc io.ReadWriteCloser, cfg logplexc.Config,
}

// Set up client with serve
cfg.Token = sr.T
cfg.Logplex = sr.u
client, err := logplexc.NewClient(&cfg)
if err != nil {
exit(err)
Expand All @@ -253,7 +252,7 @@ func logWorker(die dieCh, rwc io.ReadWriteCloser, cfg logplexc.Config,
processLogMsg(die, client, msgInit, sr, exit)
}

func listen(die dieCh, logplexUrl url.URL, sr *serveRecord) {
func listen(die dieCh, sr *serveRecord) {
// Begin listening
l, err := net.Listen("unix", sr.P)
if err != nil {
Expand Down Expand Up @@ -293,15 +292,10 @@ func listen(die dieCh, logplexUrl url.URL, sr *serveRecord) {
}

templateConfig := logplexc.Config{
Logplex: logplexUrl,
HttpClient: client,
RequestSizeTrigger: 100 * KB,
Concurrency: 3,
Period: time.Second / 4,

// Set at connection start-up when the client
// self-identifies.
Token: "",
}

for {
Expand Down Expand Up @@ -352,16 +346,6 @@ func main() {
}
}()

if os.Getenv("LOGPLEX_URL") == "" {
log.Fatal("LOGPLEX_URL is unset")
}

logplexUrl, err := url.Parse(os.Getenv("LOGPLEX_URL"))
if err != nil {
log.Fatalf("LOGPLEX_URL: could not parse: %q",
os.Getenv("LOGPLEX_URL"))
}

// Set up serve database and perform its input checking
sdbDir := os.Getenv("SERVE_DB_DIR")
if sdbDir == "" {
Expand Down Expand Up @@ -408,7 +392,7 @@ func main() {
snap := sdb.Snapshot()
for i := range snap {
os.Remove(snap[i].P)
go listen(die, *logplexUrl, &snap[i])
go listen(die, &snap[i])
}
}

Expand Down
22 changes: 16 additions & 6 deletions serve_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@
// serves.new must have at least the following structure:
//
// {"serves": [
// {"i": "identity1": "t": "token1", "p": "/var/run/cluster1/log.sock"},
// {"i": "identity2": "t": "token2", "p": "/var/run/cluster2/log.sock"}
// ]
// {"i": "identity1",
// "url": "https://token:[email protected]/logs",
// "p": "/var/run/cluster1/log.sock"},
// {"i": "identity1",
// "url": "https://token:[email protected]/logs",
// "p": "/var/run/cluster2/log.sock"},
// ]
// }
//
// Any other auxiliary keys and values as siblings to the "serves" key
Expand All @@ -54,6 +58,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
"os"
"path"
"sync"
Expand All @@ -66,7 +71,7 @@ type sKey struct {

type serveRecord struct {
sKey
T string
u url.URL

// Auxiliary fields for formatting
Name string
Expand Down Expand Up @@ -376,7 +381,12 @@ func projectFromJson(v interface{}) (*serveRecord, error) {
return nil, err
}

tok, err := lookup("t")
urlText, err := lookup("url")
if err != nil {
return nil, err
}

u, err := url.Parse(urlText)
if err != nil {
return nil, err
}
Expand All @@ -390,7 +400,7 @@ func projectFromJson(v interface{}) (*serveRecord, error) {
name, _ := lookup("name")

return &serveRecord{sKey: sKey{P: path, I: ident},
T: tok, Name: name}, nil
u: *u, Name: name}, nil
}

func (t *serveDb) parse(contents []byte) (map[sKey]*serveRecord, error) {
Expand Down
Loading

0 comments on commit 3a190f2

Please sign in to comment.