From a40df81df8cc273ed3456307e48b8cc2a0c11d4d Mon Sep 17 00:00:00 2001 From: Peter Boothe Date: Fri, 10 May 2019 17:32:20 -0400 Subject: [PATCH] Save data (#112) Data is now saved when users run a test. Multiple race conditions have been eliminated, as well as multiple goroutine leaks/livelocks/deadlocks. --- legacy/c2s/c2s.go | 178 ++++++++++++++++++++++++++------- legacy/handler/wshandler.go | 35 ++++++- legacy/legacy.go | 107 +++++++++++++++----- legacy/meta/meta.go | 31 ++++++ legacy/metrics/metrics.go | 10 ++ legacy/ndt/server.go | 31 ++++++ legacy/plain/plain.go | 11 +- legacy/plain/plain_test.go | 16 ++- legacy/protocol/protocol.go | 117 +++++++++++++++++----- legacy/s2c/s2c.go | 118 +++++++++++++++++----- legacy/singleserving/server.go | 11 +- ndt-server.go | 11 +- ndt-server_test.go | 21 +--- ndt7/listener/listener.go | 17 ++-- 14 files changed, 553 insertions(+), 161 deletions(-) create mode 100644 legacy/meta/meta.go create mode 100644 legacy/ndt/server.go diff --git a/legacy/c2s/c2s.go b/legacy/c2s/c2s.go index 7410ec29..e2896c94 100644 --- a/legacy/c2s/c2s.go +++ b/legacy/c2s/c2s.go @@ -6,84 +6,188 @@ import ( "strconv" "time" + "github.com/m-lab/ndt-server/legacy/metrics" + "github.com/m-lab/go/warnonerror" + "github.com/m-lab/ndt-server/legacy/ndt" "github.com/m-lab/ndt-server/legacy/protocol" - "github.com/m-lab/ndt-server/legacy/singleserving" ) +// ArchivalData is the data saved by the C2S test. If a researcher wants deeper +// data, then they should use the UUID to get deeper data from tcp-info. +type ArchivalData struct { + // The server and client IP are here as well as in the containing struct + // because happy eyeballs means that we may have a IPv4 control connection + // causing a IPv6 connection to the test port or vice versa. + ServerIP string + ClientIP string + + // This is the only field that is really required. + TestConnectionUUID string + + // These fields are here to enable analyses that don't require joining with tcp-info data. + StartTime time.Time + EndTime time.Time + MeanThroughputMbps float64 + // TODO: Add TCPEngine (bbr, cubic, reno, etc.) + + Error string `json:",omitempty"` +} + // ManageTest manages the c2s test lifecycle. -func ManageTest(ctx context.Context, conn protocol.Connection, f singleserving.Factory) (float64, error) { +func ManageTest(ctx context.Context, controlConn protocol.Connection, s ndt.Server) (*ArchivalData, error) { localContext, localCancel := context.WithTimeout(ctx, 30*time.Second) defer localCancel() + record := &ArchivalData{} - srv, err := f.SingleServingServer("c2s") + srv, err := s.SingleServingServer("c2s") if err != nil { log.Println("Could not start SingleServingServer", err) - return 0, err + metrics.ErrorCount.WithLabelValues("c2s", "StartSingleServingServer") + record.Error = err.Error() + return record, err } - err = protocol.SendJSONMessage(protocol.TestPrepare, strconv.Itoa(srv.Port()), conn) + err = protocol.SendJSONMessage(protocol.TestPrepare, strconv.Itoa(srv.Port()), controlConn) if err != nil { log.Println("Could not send TestPrepare", err) - return 0, err + metrics.ErrorCount.WithLabelValues("c2s", "TestPrepare") + record.Error = err.Error() + return record, err } testConn, err := srv.ServeOnce(localContext) if err != nil { log.Println("Could not successfully ServeOnce", err) - return 0, err + metrics.ErrorCount.WithLabelValues("c2s", "ServeOnce") + record.Error = err.Error() + return record, err } - defer warnonerror.Close(testConn, "Could not close test connection") - err = protocol.SendJSONMessage(protocol.TestStart, "", conn) + // When ManageTest exits, close the test connection. + defer func() { + // Allow the connection-draining goroutine to empty all buffers in support of + // poorly-written clients before we close the connection, but do not block the + // exit of ManageTest on waiting for the test connection to close. + go func() { + time.Sleep(3 * time.Second) + warnonerror.Close(testConn, "Could not close test connection") + }() + }() + + record.TestConnectionUUID = testConn.UUID() + record.ServerIP = testConn.ServerIP() + record.ClientIP = testConn.ClientIP() + + err = protocol.SendJSONMessage(protocol.TestStart, "", controlConn) if err != nil { log.Println("Could not send TestStart", err) - return 0, err + metrics.ErrorCount.WithLabelValues("c2s", "TestStart") + record.Error = err.Error() + return record, err } - seconds := float64(10) - startTime := time.Now() - endTime := startTime.Add(10 * time.Second) - errorTime := endTime.Add(5 * time.Second) - err = testConn.SetReadDeadline(errorTime) - if err != nil { - log.Println("Could not set deadline", err) - return 0, err - } - byteCount, err := testConn.DrainUntil(endTime) + record.StartTime = time.Now() + byteCount, err := drainForeverButMeasureFor(testConn, 10*time.Second) + record.EndTime = time.Now() + log.Println("Ended C2S test on", testConn) if err != nil { if byteCount == 0 { log.Println("Could not drain the test connection", byteCount, err) - return 0, err + metrics.ErrorCount.WithLabelValues("c2s", "Drain") + record.Error = err.Error() + return record, err } // It is possible for the client to reach 10 seconds slightly before the server does. - seconds = time.Now().Sub(startTime).Seconds() + seconds := record.EndTime.Sub(record.StartTime).Seconds() if seconds < 9 { - log.Printf("C2S test only uploaded for %f seconds\n", seconds) - return 0, err - } else if seconds > 11 { - log.Printf("C2S test uploaded-read-loop exited late (%f seconds) because the read stalled. We will continue with the test.\n", seconds) - } else { - log.Printf("C2S test had an error after %f seconds, which is within acceptable bounds. We will continue with the test.\n", seconds) + log.Printf("C2S test client only uploaded for %f seconds\n", seconds) + metrics.ErrorCount.WithLabelValues("c2s", "EarlyExit") + record.Error = err.Error() + return record, err } - } else { - // Empty out the buffer for poorly-behaved clients. - // TODO: ensure this behavior is required by a unit test. - testConn.DrainUntil(errorTime) + // More than 9 seconds is fine. + log.Printf("C2S test had an error (%v) after %f seconds. We will continue with the test.\n", err, seconds) } + throughputValue := 8 * float64(byteCount) / 1000 / 10 + record.MeanThroughputMbps = throughputValue / 1000 // Convert Kbps to Mbps - err = protocol.SendJSONMessage(protocol.TestMsg, strconv.FormatFloat(throughputValue, 'g', -1, 64), conn) + err = protocol.SendJSONMessage(protocol.TestMsg, strconv.FormatFloat(throughputValue, 'g', -1, 64), controlConn) if err != nil { log.Println("Could not send TestMsg with C2S results", err) - return 0, err + metrics.ErrorCount.WithLabelValues("c2s", "TestMsg") + record.Error = err.Error() + return record, err } - err = protocol.SendJSONMessage(protocol.TestFinalize, "", conn) + err = protocol.SendJSONMessage(protocol.TestFinalize, "", controlConn) if err != nil { log.Println("Could not send TestFinalize", err) - return throughputValue, err + metrics.ErrorCount.WithLabelValues("c2s", "TestFinalize") + record.Error = err.Error() + return record, err + } + + return record, nil +} + +// drainForeverButMeasureFor is a generic method for draining a connection while +// measuring the connection for the first part of the drain. This method does +// not close the passed-in Connection, and starts a goroutine which runs until +// that Connection is closed. +func drainForeverButMeasureFor(conn protocol.Connection, d time.Duration) (int64, error) { + type measurement struct { + totalByteCount int64 + err error } + measurements := make(chan measurement) - return throughputValue, nil + // This is the "drain forever" part of this function. Read the passed-in + // connection until the passed-in connection is closed. Only send measurements + // on the measurement channel if the channel can be written to without + // blocking. + go func() { + var totalByteCount int64 + var err error + // Read the connections until the connection is closed. Reading on a closed + // connection returns an error, which terminates the loop and the goroutine. + for err == nil { + var byteCount int64 + byteCount, err = conn.ReadBytes() + totalByteCount += byteCount + // Only write to the channel if it won't block, to ensure the reading process + // goes as fast as possible. + select { + case measurements <- measurement{totalByteCount, err}: + default: + } + } + // After we get an error, drain the channel and then close it. + fullChannel := true + for fullChannel { + select { + case <-measurements: + default: + fullChannel = false + } + } + close(measurements) + }() + + // Read the measurements channel until the timer goes off. + timer := time.NewTimer(d) + var bytesRead int64 + var err error + timerActive := true + for timerActive { + select { + case m := <-measurements: + bytesRead = m.totalByteCount + err = m.err + case <-timer.C: + timerActive = false + } + } + return bytesRead, err } diff --git a/legacy/handler/wshandler.go b/legacy/handler/wshandler.go index 5661c300..87097a3d 100644 --- a/legacy/handler/wshandler.go +++ b/legacy/handler/wshandler.go @@ -3,14 +3,23 @@ package handler import ( "log" "net/http" + "time" "github.com/m-lab/go/warnonerror" "github.com/m-lab/ndt-server/legacy" + "github.com/m-lab/ndt-server/legacy/ndt" "github.com/m-lab/ndt-server/legacy/protocol" "github.com/m-lab/ndt-server/legacy/singleserving" "github.com/m-lab/ndt-server/legacy/ws" ) +// WSHandler is both an ndt.Server and an http.Handler to allow websocket-based +// NDT tests to be run by Go's http libraries. +type WSHandler interface { + ndt.Server + http.Handler +} + type httpFactory struct{} func (hf *httpFactory) SingleServingServer(dir string) (singleserving.Server, error) { @@ -20,7 +29,19 @@ func (hf *httpFactory) SingleServingServer(dir string) (singleserving.Server, er // httpHandler handles requests that come in over HTTP or HTTPS. It should be // created with MakeHTTPHandler() or MakeHTTPSHandler(). type httpHandler struct { - serverFactory singleserving.Factory + serverFactory singleserving.Factory + connectionType ndt.ConnectionType + datadir string +} + +func (s *httpHandler) DataDir() string { return s.datadir } +func (s *httpHandler) ConnectionType() ndt.ConnectionType { return s.connectionType } + +func (s *httpHandler) TestLength() time.Duration { return 10 * time.Second } +func (s *httpHandler) TestMaxTime() time.Duration { return 30 * time.Second } + +func (s *httpHandler) SingleServingServer(dir string) (singleserving.Server, error) { + return s.serverFactory.SingleServingServer(dir) } // ServeHTTP is the command channel for the NDT-WS or NDT-WSS test. All @@ -37,13 +58,15 @@ func (s *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } ws := protocol.AdaptWsConn(wsc) defer warnonerror.Close(ws, "Could not close connection") - legacy.HandleControlChannel(ws, s.serverFactory) + legacy.HandleControlChannel(ws, s) } // NewWS returns a handler suitable for http-based connections. -func NewWS() http.Handler { +func NewWS(datadir string) WSHandler { return &httpHandler{ - serverFactory: &httpFactory{}, + serverFactory: &httpFactory{}, + connectionType: ndt.WS, + datadir: datadir, } } @@ -57,11 +80,13 @@ func (hf *httpsFactory) SingleServingServer(dir string) (singleserving.Server, e } // NewWSS returns a handler suitable for https-based connections. -func NewWSS(certFile, keyFile string) http.Handler { +func NewWSS(datadir, certFile, keyFile string) WSHandler { return &httpHandler{ serverFactory: &httpsFactory{ certFile: certFile, keyFile: keyFile, }, + connectionType: ndt.WSS, + datadir: datadir, } } diff --git a/legacy/legacy.go b/legacy/legacy.go index a2b5662b..64d14fa6 100644 --- a/legacy/legacy.go +++ b/legacy/legacy.go @@ -2,17 +2,24 @@ package legacy import ( "context" + "encoding/json" "fmt" "log" + "os" + "path" "strconv" "strings" "time" + "github.com/m-lab/go/prometheusx" + "github.com/m-lab/go/warnonerror" + "github.com/m-lab/ndt-server/legacy/c2s" + "github.com/m-lab/ndt-server/legacy/meta" legacymetrics "github.com/m-lab/ndt-server/legacy/metrics" + "github.com/m-lab/ndt-server/legacy/ndt" "github.com/m-lab/ndt-server/legacy/protocol" "github.com/m-lab/ndt-server/legacy/s2c" - "github.com/m-lab/ndt-server/legacy/singleserving" ) const ( @@ -21,25 +28,55 @@ const ( cTestStatus = 16 ) -// TODO: run meta test. -func runMetaTest(ws protocol.Connection) { - var err error - var message *protocol.JSONMessage +// NDTResult is the struct that is serialized as JSON to disk as the archival record of an NDT test. +// +// This struct is dual-purpose. It contains the necessary data to allow joining +// with tcp-info data and traceroute-caller data as well as any other UUID-based +// data. It also contains enough data for interested parties to perform +// lightweight data analysis without needing to join with other tools. +type NDTResult struct { + // GitShortCommit is the Git commit (short form) of the running server code. + GitShortCommit string - protocol.SendJSONMessage(protocol.TestPrepare, "", ws) - protocol.SendJSONMessage(protocol.TestStart, "", ws) - for { - message, err = protocol.ReceiveJSONMessage(ws, protocol.TestMsg) - if message.Msg == "" || err != nil { - break - } - log.Println("Meta message: ", message) + // These data members should all be self-describing. In the event of confusion, + // rename them to add clarity rather than adding a comment. + ControlChannelUUID string + Protocol ndt.ConnectionType + ServerIP string + ClientIP string + + StartTime time.Time + EndTime time.Time + C2S *c2s.ArchivalData `json:",omitempty"` + S2C *s2c.ArchivalData `json:",omitempty"` + Meta *meta.ArchivalData `json:",omitempty"` +} + +// SaveData archives the data to disk. +func SaveData(record *NDTResult, datadir string) { + if record == nil { + log.Println("nil record won't be saved") + return } + dir := path.Join(datadir, record.StartTime.Format("2006/01/02")) + err := os.MkdirAll(dir, 0777) if err != nil { - log.Println("Error reading JSON message:", err) + log.Printf("Could not create directory %s: %v\n", dir, err) + return + } + file, err := protocol.UUIDToFile(dir, record.ControlChannelUUID) + if err != nil { + log.Println("Could not open file:", err) return } - protocol.SendJSONMessage(protocol.TestFinalize, "", ws) + defer file.Close() + enc := json.NewEncoder(file) + err = enc.Encode(record) + if err != nil { + log.Println("Could not encode", record, "to", file.Name()) + return + } + log.Println("Wrote", file.Name()) } // HandleControlChannel is the "business logic" of an NDT test. It is designed @@ -47,12 +84,28 @@ func runMetaTest(ws protocol.Connection) { // connection is just a TCP socket, a WS connection, or a WSS connection. It // only needs a connection, and a factory for making single-use servers for // connections of that same type. -func HandleControlChannel(conn protocol.Connection, sf singleserving.Factory) { +func HandleControlChannel(conn protocol.Connection, s ndt.Server) { // Nothing should take more than 45 seconds, and exiting this method should // cause all resources used by the test to be reclaimed. ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second) defer cancel() + log.Println("Handling connection", conn) + defer warnonerror.Close(conn, "Could not close "+conn.String()) + + record := &NDTResult{ + GitShortCommit: prometheusx.GitShortCommit, + StartTime: time.Now(), + ControlChannelUUID: conn.UUID(), + ServerIP: conn.ServerIP(), + ClientIP: conn.ClientIP(), + Protocol: s.ConnectionType(), + } + defer func() { + record.EndTime = time.Now() + SaveData(record, s.DataDir()) + }() + message, err := protocol.ReceiveJSONMessage(conn, protocol.MsgExtendedLogin) if err != nil { log.Println("Error reading JSON message:", err) @@ -84,23 +137,27 @@ func HandleControlChannel(conn protocol.Connection, sf singleserving.Factory) { var c2sRate, s2cRate float64 if runC2s { - c2sRate, err = c2s.ManageTest(ctx, conn, sf) + record.C2S, err = c2s.ManageTest(ctx, conn, s) if err != nil { log.Println("ERROR: manageC2sTest", err) - } else { - legacymetrics.TestRate.WithLabelValues("c2s").Observe(c2sRate / 1000.0) + } + if record.C2S != nil && record.C2S.MeanThroughputMbps != 0 { + c2sRate = record.C2S.MeanThroughputMbps + legacymetrics.TestRate.WithLabelValues("c2s").Observe(c2sRate) } } if runS2c { - s2cRate, err = s2c.ManageTest(ctx, conn, sf) + record.S2C, err = s2c.ManageTest(ctx, conn, s) if err != nil { log.Println("ERROR: manageS2cTest", err) - } else { - legacymetrics.TestRate.WithLabelValues("s2c").Observe(s2cRate / 1000.0) + } + if record.S2C != nil && record.S2C.MeanThroughputMbps != 0 { + s2cRate = record.S2C.MeanThroughputMbps + legacymetrics.TestRate.WithLabelValues("s2c").Observe(s2cRate) } } - log.Printf("NDT: uploaded at %.4f and downloaded at %.4f", c2sRate, s2cRate) - protocol.SendJSONMessage(protocol.MsgResults, fmt.Sprintf("You uploaded at %.4f and downloaded at %.4f", c2sRate, s2cRate), conn) + log.Printf("NDT: uploaded at %.4f Mbps and downloaded at %.4f Mbps", c2sRate, s2cRate) + // For historical reasons, clients expect results in kbps + protocol.SendJSONMessage(protocol.MsgResults, fmt.Sprintf("You uploaded at %.4f and downloaded at %.4f", c2sRate*1000, s2cRate*1000), conn) protocol.SendJSONMessage(protocol.MsgLogout, "", conn) - } diff --git a/legacy/meta/meta.go b/legacy/meta/meta.go new file mode 100644 index 00000000..0b531628 --- /dev/null +++ b/legacy/meta/meta.go @@ -0,0 +1,31 @@ +package meta + +import ( + "log" + + "github.com/m-lab/ndt-server/legacy/protocol" +) + +// TODO: Add fields here when we implement a meta test. +type ArchivalData struct{} + +// TODO: run meta test. +func ManageTest(ws protocol.Connection) { + var err error + var message *protocol.JSONMessage + + protocol.SendJSONMessage(protocol.TestPrepare, "", ws) + protocol.SendJSONMessage(protocol.TestStart, "", ws) + for { + message, err = protocol.ReceiveJSONMessage(ws, protocol.TestMsg) + if message.Msg == "" || err != nil { + break + } + log.Println("Meta message: ", message) + } + if err != nil { + log.Println("Error reading JSON message:", err) + return + } + protocol.SendJSONMessage(protocol.TestFinalize, "", ws) +} diff --git a/legacy/metrics/metrics.go b/legacy/metrics/metrics.go index 9cde3660..9d0b3b55 100644 --- a/legacy/metrics/metrics.go +++ b/legacy/metrics/metrics.go @@ -21,6 +21,8 @@ var ( []string{"direction"}, ) // TestCount exports via prometheus the number of tests run by this server. + // + // TODO: Decide what monitoring we want and transition to that. TestCount = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "ndt_test_total", @@ -28,4 +30,12 @@ var ( }, []string{"direction", "code"}, ) + // ErrorCount exports the number of test failures seen by the server. + ErrorCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ndt_test_errors_total", + Help: "Number of test errors of each type for each test.", + }, + []string{"test", "error"}, + ) ) diff --git a/legacy/ndt/server.go b/legacy/ndt/server.go new file mode 100644 index 00000000..dc76e22e --- /dev/null +++ b/legacy/ndt/server.go @@ -0,0 +1,31 @@ +package ndt + +import ( + "time" + + "github.com/m-lab/ndt-server/legacy/singleserving" +) + +// ConnectionType records whether this test is performed over plain TCP, +// websockets, or secure websockets. +type ConnectionType string + +// The types of connections we support. +var ( + WS = ConnectionType("WS") + WSS = ConnectionType("WSS") + Plain = ConnectionType("PLAIN") +) + +// Server describes the methods implemented by every server of every connection +// type. +type Server interface { + singleserving.Factory + ConnectionType() ConnectionType + DataDir() string + + // TestLength allows us to create fake servers which run tests very quickly. + TestLength() time.Duration + // TestMaxTime allows us to create fake servers which run tests very quickly. + TestMaxTime() time.Duration +} diff --git a/legacy/plain/plain.go b/legacy/plain/plain.go index 59cf195d..7df739db 100644 --- a/legacy/plain/plain.go +++ b/legacy/plain/plain.go @@ -11,6 +11,7 @@ import ( "github.com/m-lab/go/warnonerror" "github.com/m-lab/ndt-server/legacy" + "github.com/m-lab/ndt-server/legacy/ndt" "github.com/m-lab/ndt-server/legacy/protocol" "github.com/m-lab/ndt-server/legacy/singleserving" "github.com/m-lab/ndt-server/metrics" @@ -23,6 +24,7 @@ type plainServer struct { wsAddr string dialer *net.Dialer listener *net.TCPListener + datadir string } func (ps *plainServer) SingleServingServer(direction string) (singleserving.Server, error) { @@ -141,6 +143,12 @@ func (ps *plainServer) ListenAndServe(ctx context.Context, addr string) error { return nil } +func (ps *plainServer) ConnectionType() ndt.ConnectionType { return ndt.Plain } +func (ps *plainServer) DataDir() string { return ps.datadir } + +func (ps *plainServer) TestLength() time.Duration { return 10 * time.Second } +func (ps *plainServer) TestMaxTime() time.Duration { return 30 * time.Second } + func (ps *plainServer) Addr() net.Addr { return ps.listener.Addr() } @@ -155,7 +163,7 @@ type Server interface { // NewServer creates a new TCP listener to serve the client. It forwards all // connection requests that look like HTTP to a different address (assumed to be // on the same host). -func NewServer(wsAddr string) Server { +func NewServer(datadir, wsAddr string) Server { return &plainServer{ wsAddr: wsAddr, // The dialer is only contacting localhost. The timeout should be set to a @@ -163,5 +171,6 @@ func NewServer(wsAddr string) Server { dialer: &net.Dialer{ Timeout: 10 * time.Millisecond, }, + datadir: datadir, } } diff --git a/legacy/plain/plain_test.go b/legacy/plain/plain_test.go index 4e7f85af..fae68f45 100644 --- a/legacy/plain/plain_test.go +++ b/legacy/plain/plain_test.go @@ -2,8 +2,10 @@ package plain_test import ( "context" + "io/ioutil" "net" "net/http" + "os" "testing" "time" @@ -13,6 +15,9 @@ import ( ) func TestNewPlainServer(t *testing.T) { + d, err := ioutil.TempDir("", "TestNewPlainServer") + rtx.Must(err, "Could not create tempdir") + defer os.RemoveAll(d) // Set up the proxied server success := 0 h := &http.ServeMux{} @@ -26,14 +31,14 @@ func TestNewPlainServer(t *testing.T) { } rtx.Must(httpx.ListenAndServeAsync(wsSrv), "Could not start server") // Sanity check that the proxied server is up and running. - _, err := http.Get("http://" + wsSrv.Addr + "/test_url") + _, err = http.Get("http://" + wsSrv.Addr + "/test_url") rtx.Must(err, "Proxied server could not respond to get") if success != 1 { t.Error("GET was unsuccessful") } // Set up the plain server - tcpS := plain.NewServer(wsSrv.Addr) + tcpS := plain.NewServer(d, wsSrv.Addr) ctx, cancel := context.WithCancel(context.Background()) defer cancel() rtx.Must(tcpS.ListenAndServe(ctx, ":0"), "Could not start tcp server") @@ -67,8 +72,11 @@ func TestNewPlainServer(t *testing.T) { } func TestNewPlainServerBrokenForwarding(t *testing.T) { + d, err := ioutil.TempDir("", "TestNewPlainServerBrokenForwarding") + rtx.Must(err, "Could not create tempdir") + defer os.RemoveAll(d) // Set up the plain server. - tcpS := plain.NewServer("127.0.0.1:1") + tcpS := plain.NewServer(d, "127.0.0.1:1") ctx, cancel := context.WithCancel(context.Background()) defer cancel() rtx.Must(tcpS.ListenAndServe(ctx, ":0"), "Could not start tcp server") @@ -76,7 +84,7 @@ func TestNewPlainServerBrokenForwarding(t *testing.T) { client := &http.Client{ Timeout: 10 * time.Millisecond, } - _, err := client.Get("http://" + tcpS.Addr().String() + "/test_url") + _, err = client.Get("http://" + tcpS.Addr().String() + "/test_url") if err == nil { t.Error("This should have failed") } diff --git a/legacy/protocol/protocol.go b/legacy/protocol/protocol.go index 9d61ca4f..8df794ff 100644 --- a/legacy/protocol/protocol.go +++ b/legacy/protocol/protocol.go @@ -6,12 +6,16 @@ import ( "errors" "fmt" "io" + "io/ioutil" + "log" "net" "os" + "path" "reflect" "time" "github.com/m-lab/ndt-server/fdcache" + "github.com/m-lab/uuid" "github.com/gorilla/websocket" "github.com/m-lab/ndt-server/legacy/web100" @@ -76,15 +80,32 @@ func (m MessageType) String() string { // Connection is a general system over which we might be able to read an NDT // message. It contains a subset of the methods of websocket.Conn, in order to -// allow non-websocket-based NDT tests in support of legacy clients, along with -// the new methods "DrainUntil" and "FillUntil". +// allow non-websocket-based NDT tests in support of legacy clients. type Connection interface { ReadMessage() (_ int, p []byte, err error) // The first value in the returned tuple should be ignored. It is included in the API for websocket.Conn compatibility. + ReadBytes() (count int64, err error) WriteMessage(messageType int, data []byte) error - DrainUntil(t time.Time) (bytesRead int64, err error) FillUntil(t time.Time, buffer []byte) (bytesWritten int64, err error) - SetReadDeadline(t time.Time) error + ServerIP() string + ClientIP() string Close() error + UUID() string + String() string +} + +var badUUID = "ERROR_DISCOVERING_UUID" + +// UUIDToFile converts a UUID into a newly-created open file with the extension '.json'. +func UUIDToFile(dir, uuid string) (*os.File, error) { + if uuid == badUUID { + f, err := ioutil.TempFile(dir, badUUID+"XXXXXX.json") + if err != nil { + log.Println("Could not create filename for data") + return nil, err + } + return f, nil + } + return os.Create(path.Join(dir, uuid+".json")) } // Measurable things can be measured over a given timeframe. @@ -134,17 +155,6 @@ func AdaptWsConn(ws *websocket.Conn) MeasuredConnection { return &wsConnection{Conn: ws, measurer: &measurer{}} } -func (ws *wsConnection) DrainUntil(t time.Time) (bytesRead int64, err error) { - for time.Now().Before(t) { - _, buffer, err := ws.ReadMessage() - if err != nil { - return bytesRead, err - } - bytesRead += int64(len(buffer)) - } - return bytesRead, nil -} - func (ws *wsConnection) FillUntil(t time.Time, bytes []byte) (bytesWritten int64, err error) { messageToSend, err := websocket.NewPreparedMessage(websocket.BinaryMessage, bytes) if err != nil { @@ -164,13 +174,49 @@ func (ws *wsConnection) StartMeasuring(ctx context.Context) { ws.measurer.StartMeasuring(ctx, fdcache.GetAndForgetFile(ws.UnderlyingConn())) } +func (ws *wsConnection) UUID() string { + id, err := fdcache.GetUUID(ws.UnderlyingConn()) + if err != nil { + log.Println("Could not discover UUID:", err) + // TODO: increment a metric + return badUUID + } + return id +} + +func (ws *wsConnection) ServerIP() string { + localAddr := ws.UnderlyingConn().LocalAddr().(*net.TCPAddr) + return localAddr.IP.String() +} + +func (ws *wsConnection) ClientIP() string { + remoteAddr := ws.UnderlyingConn().RemoteAddr().(*net.TCPAddr) + return remoteAddr.IP.String() +} + +// ReadBytes reads some bytes and discards them. This method is in service of +// the c2s test. +func (ws *wsConnection) ReadBytes() (int64, error) { + var count int64 + _, buff, err := ws.ReadMessage() + if buff != nil { + count = int64(len(buff)) + } + return count, err +} + +func (ws *wsConnection) String() string { + return ws.LocalAddr().String() + "<=WS(S)=>" + ws.RemoteAddr().String() +} + // netConnection is a utility struct that allows us to use OS sockets and // Websockets using the same set of methods. Its second element is a Reader // because we want to allow the input channel to be buffered. type netConnection struct { net.Conn *measurer - input io.Reader + input io.Reader + c2sBuffer []byte } func (nc *netConnection) ReadMessage() (int, []byte, error) { @@ -191,16 +237,9 @@ func (nc *netConnection) WriteMessage(_messageType int, data []byte) error { return err } -func (nc *netConnection) DrainUntil(t time.Time) (bytesRead int64, err error) { - buff := make([]byte, 8192) - for time.Now().Before(t) { - n, err := nc.Read(buff) - if err != nil { - return bytesRead, err - } - bytesRead += int64(n) - } - return bytesRead, nil +func (nc *netConnection) ReadBytes() (bytesRead int64, err error) { + n, err := nc.input.Read(nc.c2sBuffer) + return int64(n), err } func (nc *netConnection) FillUntil(t time.Time, bytes []byte) (bytesWritten int64, err error) { @@ -218,9 +257,33 @@ func (nc *netConnection) StartMeasuring(ctx context.Context) { nc.measurer.StartMeasuring(ctx, fdcache.GetAndForgetFile(nc)) } +func (nc *netConnection) UUID() string { + id, err := uuid.FromTCPConn(nc.Conn.(*net.TCPConn)) + if err != nil { + log.Println("Could not discover UUID") + // TODO: increment a metric + return badUUID + } + return id +} + +func (nc *netConnection) ServerIP() string { + localAddr := nc.LocalAddr().(*net.TCPAddr) + return localAddr.IP.String() +} + +func (nc *netConnection) ClientIP() string { + remoteAddr := nc.RemoteAddr().(*net.TCPAddr) + return remoteAddr.IP.String() +} + +func (nc *netConnection) String() string { + return nc.LocalAddr().String() + "<=PLAIN=>" + nc.RemoteAddr().String() +} + // AdaptNetConn turns a non-WS-based TCP connection into a protocol.MeasuredConnection. func AdaptNetConn(conn net.Conn, input io.Reader) MeasuredConnection { - return &netConnection{Conn: conn, measurer: &measurer{}, input: input} + return &netConnection{Conn: conn, measurer: &measurer{}, input: input, c2sBuffer: make([]byte, 8192)} } // ReadNDTMessage reads a single NDT message out of the connection. diff --git a/legacy/s2c/s2c.go b/legacy/s2c/s2c.go index 6540dd51..df1deb9d 100644 --- a/legacy/s2c/s2c.go +++ b/legacy/s2c/s2c.go @@ -3,109 +3,173 @@ package s2c import ( "context" "encoding/json" + "errors" "log" "strconv" "time" "github.com/m-lab/go/warnonerror" + "github.com/m-lab/ndt-server/legacy/metrics" + "github.com/m-lab/ndt-server/legacy/ndt" "github.com/m-lab/ndt-server/legacy/protocol" - "github.com/m-lab/ndt-server/legacy/singleserving" ) -// Result is the result object returned to S2C clients as JSON. -type Result struct { +// ArchivalData is the data saved by the S2C test. If a researcher wants deeper +// data, then they should use the UUID to get deeper data from tcp-info. +type ArchivalData struct { + // This is the only field that is really required. + TestConnectionUUID string + + // All subsequent fields are here to enable analyses that don't require joining + // with tcp-info data. + + // The server and client IP are here as well as in the containing struct + // because happy eyeballs means that we may have a IPv4 control connection + // causing a IPv6 connection to the test port or vice versa. + ServerIP string + ClientIP string + + StartTime time.Time + EndTime time.Time + MeanThroughputMbps float64 + MinRTT time.Duration + ClientReportedMbps float64 + // TODO: Add TCPEngine (bbr, cubic, reno, etc.), MaxThroughputKbps, and Jitter + + Error string `json:",omitempty"` +} + +// result is the result object returned to S2C clients as JSON. +type result struct { ThroughputValue string UnsentDataAmount string TotalSentByte string } -func (n *Result) String() string { +func (n *result) String() string { b, _ := json.Marshal(n) return string(b) } // ManageTest manages the s2c test lifecycle -func ManageTest(ctx context.Context, conn protocol.Connection, sf singleserving.Factory) (float64, error) { +func ManageTest(ctx context.Context, controlConn protocol.Connection, s ndt.Server) (*ArchivalData, error) { localCtx, localCancel := context.WithTimeout(ctx, 30*time.Second) defer localCancel() + record := &ArchivalData{} - srv, err := sf.SingleServingServer("s2c") + srv, err := s.SingleServingServer("s2c") if err != nil { log.Println("Could not start single serving server", err) - return 0, err + metrics.ErrorCount.WithLabelValues("s2c", "StartSingleServingServer") + record.Error = err.Error() + return record, err } - err = protocol.SendJSONMessage(protocol.TestPrepare, strconv.Itoa(srv.Port()), conn) + err = protocol.SendJSONMessage(protocol.TestPrepare, strconv.Itoa(srv.Port()), controlConn) if err != nil { log.Println("Could not send TestPrepare", err) - return 0, err + metrics.ErrorCount.WithLabelValues("s2c", "TestPrepare") + record.Error = err.Error() + return record, err } testConn, err := srv.ServeOnce(localCtx) - if err != nil { + if err != nil || testConn == nil { log.Println("Could not successfully ServeOnce", err) - return 0, err + metrics.ErrorCount.WithLabelValues("s2c", "ServeOnce") + if err == nil { + err = errors.New("nil testConn, but also a nil error") + } + record.Error = err.Error() + return record, err } defer warnonerror.Close(testConn, "Could not close test connection") + record.TestConnectionUUID = testConn.UUID() + record.ServerIP = testConn.ServerIP() + record.ClientIP = testConn.ClientIP() dataToSend := make([]byte, 8192) for i := range dataToSend { dataToSend[i] = byte(((i * 101) % (122 - 33)) + 33) } - err = protocol.SendJSONMessage(protocol.TestStart, "", conn) + err = protocol.SendJSONMessage(protocol.TestStart, "", controlConn) if err != nil { log.Println("Could not write TestStart", err) - return 0, err + metrics.ErrorCount.WithLabelValues("s2c", "TestStart") + record.Error = err.Error() + return record, err } testConn.StartMeasuring(localCtx) - + record.StartTime = time.Now() byteCount, err := testConn.FillUntil(time.Now().Add(10*time.Second), dataToSend) + record.EndTime = time.Now() if err != nil { log.Println("Could not FillUntil", err) - return 0, err + metrics.ErrorCount.WithLabelValues("s2c", "FillUntil") + record.Error = err.Error() + return record, err } - metrics, err := testConn.StopMeasuring() + web100metrics, err := testConn.StopMeasuring() if err != nil { log.Println("Could not read metrics", err) - return 0, err + metrics.ErrorCount.WithLabelValues("s2c", "web100Metrics") + record.Error = err.Error() + return record, err } bps := 8 * float64(byteCount) / 10 kbps := bps / 1000 + record.MinRTT = time.Duration(web100metrics.MinRTT) * time.Millisecond + record.MeanThroughputMbps = kbps / 1000 // Convert Kbps to Mbps // Send additional download results to the client. - resultMsg := &Result{ + resultMsg := &result{ // TODO: clean up this logic to use socket stats rather than application-level counters. ThroughputValue: strconv.FormatInt(int64(kbps), 10), UnsentDataAmount: "0", TotalSentByte: strconv.FormatInt(byteCount, 10), // TODO: use actual bytes sent. } - err = protocol.WriteNDTMessage(conn, protocol.TestMsg, resultMsg) + err = protocol.WriteNDTMessage(controlConn, protocol.TestMsg, resultMsg) if err != nil { log.Println("Could not write a TestMsg", err) - return kbps, err + metrics.ErrorCount.WithLabelValues("s2c", "TestMsgSend") + record.Error = err.Error() + return record, err } - clientRateMsg, err := protocol.ReceiveJSONMessage(conn, protocol.TestMsg) + clientRateMsg, err := protocol.ReceiveJSONMessage(controlConn, protocol.TestMsg) if err != nil { + metrics.ErrorCount.WithLabelValues("s2c", "TestMsgRcv") log.Println("Could not receive a TestMsg", err) - return kbps, err + record.Error = err.Error() + return record, err } log.Println("We measured", kbps, "and the client sent us", clientRateMsg) + clientRateKbps, err := strconv.ParseFloat(clientRateMsg.Msg, 64) + if err == nil { + record.ClientReportedMbps = clientRateKbps / 1000 + } else { + log.Println("Could not parse number sent from client") + // Being unable to parse the number should not be a fatal error, so continue. + } - err = protocol.SendMetrics(metrics, conn) + err = protocol.SendMetrics(web100metrics, controlConn) if err != nil { log.Println("Could not SendMetrics", err) - return kbps, err + metrics.ErrorCount.WithLabelValues("s2c", "SendMetrics") + record.Error = err.Error() + return record, err } - err = protocol.SendJSONMessage(protocol.TestFinalize, "", conn) + err = protocol.SendJSONMessage(protocol.TestFinalize, "", controlConn) if err != nil { log.Println("Could not send TestFinalize", err) - return kbps, err + metrics.ErrorCount.WithLabelValues("s2c", "TestFinalize") + record.Error = err.Error() + return record, err } - return kbps, nil + return record, nil } diff --git a/legacy/singleserving/server.go b/legacy/singleserving/server.go index a4e48113..143fa234 100644 --- a/legacy/singleserving/server.go +++ b/legacy/singleserving/server.go @@ -7,6 +7,7 @@ import ( "net/http" "github.com/m-lab/ndt-server/legacy/ws" + "github.com/m-lab/ndt-server/ndt7/listener" "github.com/m-lab/ndt-server/legacy/metrics" "github.com/m-lab/ndt-server/legacy/protocol" @@ -59,7 +60,9 @@ func (s *wsServer) Port() int { func (s *wsServer) ServeOnce(ctx context.Context) (protocol.MeasuredConnection, error) { // This is a single-serving server. After serving one response, shut it down. - defer s.Close() + defer func() { + go s.Close() + }() derivedCtx, derivedCancel := context.WithCancel(ctx) var closeErr error @@ -77,7 +80,7 @@ func (s *wsServer) ServeOnce(ctx context.Context) (protocol.MeasuredConnection, // ensure that the race gets resolved in just one way for the following if(). err := closeErr - if err != http.ErrServerClosed { + if err != nil && err != http.ErrServerClosed { return nil, fmt.Errorf("Server did not close correctly: %v", err) } return s.newConn, s.newConnErr @@ -106,11 +109,11 @@ func StartWS(direction string) (Server, error) { promhttp.InstrumentHandlerCounter(metrics.TestCount.MustCurryWith(prometheus.Labels{"direction": direction}), s)) // Start listening right away to ensure that subsequent connections succeed. - listener, err := net.Listen("tcp", ":0") + tcpl, err := net.Listen("tcp", ":0") if err != nil { return nil, err } - s.listener = tcplistener.RawListener{TCPListener: listener.(*net.TCPListener)} + s.listener = listener.CachingTCPKeepAliveListener{TCPListener: tcpl.(*net.TCPListener)} s.port = s.listener.Addr().(*net.TCPAddr).Port return s, nil } diff --git a/ndt-server.go b/ndt-server.go index 314bcb14..d9a4565b 100644 --- a/ndt-server.go +++ b/ndt-server.go @@ -15,7 +15,6 @@ import ( "github.com/m-lab/go/prometheusx" "github.com/m-lab/go/flagx" - "github.com/m-lab/go/httpx" "github.com/m-lab/go/rtx" legacyhandler "github.com/m-lab/ndt-server/legacy/handler" @@ -108,7 +107,7 @@ func main() { // The legacy protocol serving non-HTTP-based tests - forwards to Ws-based // server if the first three bytes are "GET". - legacyServer := plain.NewServer(*legacyWsPort) + legacyServer := plain.NewServer(*dataDir+"/legacy", *legacyWsPort) rtx.Must( legacyServer.ListenAndServe(ctx, *legacyPort), "Could not start raw server") @@ -118,13 +117,13 @@ func main() { legacyWsMux := http.NewServeMux() legacyWsMux.HandleFunc("/", defaultHandler) legacyWsMux.Handle("/static/", http.StripPrefix("/static", http.FileServer(http.Dir("html")))) - legacyWsMux.Handle("/ndt_protocol", legacyhandler.NewWS()) + legacyWsMux.Handle("/ndt_protocol", legacyhandler.NewWS(*dataDir+"/legacy")) legacyWsServer := &http.Server{ Addr: *legacyWsPort, Handler: logging.MakeAccessLogHandler(legacyWsMux), } log.Println("About to listen for unencrypted legacy NDT tests on " + *legacyWsPort) - rtx.Must(httpx.ListenAndServeAsync(legacyWsServer), "Could not start unencrypted legacy NDT server") + rtx.Must(listener.ListenAndServeAsync(legacyWsServer), "Could not start unencrypted legacy NDT server") defer legacyWsServer.Close() // Only start TLS-based services if certs and keys are provided @@ -133,13 +132,13 @@ func main() { legacyWssMux := http.NewServeMux() legacyWssMux.HandleFunc("/", defaultHandler) legacyWssMux.Handle("/static/", http.StripPrefix("/static", http.FileServer(http.Dir("html")))) - legacyWssMux.Handle("/ndt_protocol", legacyhandler.NewWSS(*certFile, *keyFile)) + legacyWssMux.Handle("/ndt_protocol", legacyhandler.NewWSS(*dataDir+"/legacy", *certFile, *keyFile)) legacyWssServer := &http.Server{ Addr: *legacyWssPort, Handler: logging.MakeAccessLogHandler(legacyWssMux), } log.Println("About to listen for legacy WsS tests on " + *legacyWssPort) - rtx.Must(httpx.ListenAndServeTLSAsync(legacyWssServer, *certFile, *keyFile), "Could not start legacy WsS server") + rtx.Must(listener.ListenAndServeTLSAsync(legacyWssServer, *certFile, *keyFile), "Could not start legacy WsS server") defer legacyWssServer.Close() // The ndt7 listener serving up NDT7 tests, likely on standard ports. diff --git a/ndt-server_test.go b/ndt-server_test.go index 6f3b2ed8..4c32a9d5 100644 --- a/ndt-server_test.go +++ b/ndt-server_test.go @@ -159,43 +159,31 @@ func Test_MainIntegrationTest(t *testing.T) { // /bin/web100clt-without-json-support # No tests disabled. // Test legacy raw JSON clients { - name: "Connect with web100clt (with JSON)", - cmd: "timeout 45s /bin/web100clt-with-json-support --name localhost --port " + legacyPort + " --disablemid --disablesfw", - ignoreData: true, + name: "Connect with web100clt (with JSON)", + cmd: "timeout 45s /bin/web100clt-with-json-support --name localhost --port " + legacyPort + " --disablemid --disablesfw", }, // Test legacy WS clients connected to the HTTP port { name: "Upload & Download legacy WS", cmd: "timeout 45s node ./testdata/unittest_client.js --server=localhost " + " --port=" + wsPort + " --protocol=ws --tests=22", - ignoreData: true, }, { name: "Upload legacy WS", cmd: "timeout 45s node ./testdata/unittest_client.js --server=localhost " + " --port=" + wsPort + " --protocol=ws --tests=18", - ignoreData: true, }, { name: "Download legacy WS", cmd: "timeout 45s node ./testdata/unittest_client.js --server=localhost " + " --port=" + wsPort + " --protocol=ws --tests=20", - ignoreData: true, }, // Test legacy WS clients connecting to the raw port { name: "Connect legacy WS (upload and download) to RAW port", cmd: "timeout 45s node ./testdata/unittest_client.js --server=localhost " + " --port=" + legacyPort + " --protocol=ws --tests=22", - ignoreData: true, }, - { - name: "Connect legacy WS", - cmd: "timeout 45s node ./testdata/unittest_client.js --server=localhost " + - " --port=" + legacyPort + " --protocol=ws --tests=16", - ignoreData: true, - }, - { // Start both tests, but kill the client during the upload test. // This causes the server to wait for a test that never comes. After the @@ -205,26 +193,22 @@ func Test_MainIntegrationTest(t *testing.T) { " --port=" + wsPort + " --protocol=ws --abort-c2s-early --tests=22 & " + "sleep 25", - ignoreData: true, }, // Test WSS clients with the legacy protocol. { name: "Upload legacy WSS", cmd: "timeout 45s node ./testdata/unittest_client.js --server=localhost " + " --port=" + wssPort + " --protocol=wss --acceptinvalidcerts --tests=18", - ignoreData: true, }, { name: "Download legacy WSS", cmd: "timeout 45s node ./testdata/unittest_client.js --server=localhost " + " --port=" + wssPort + " --protocol=wss --acceptinvalidcerts --tests=20", - ignoreData: true, }, { name: "Upload & Download legacy WSS", cmd: "timeout 45s node ./testdata/unittest_client.js --server=localhost " + " --port=" + wssPort + " --protocol=wss --acceptinvalidcerts --tests=22", - ignoreData: true, }, { // Start both tests, but kill the client during the upload test. @@ -235,7 +219,6 @@ func Test_MainIntegrationTest(t *testing.T) { " --port=" + wssPort + " --protocol=wss --acceptinvalidcerts --abort-c2s-early --tests=22 & " + "sleep 25", - ignoreData: true, }, // Test NDT7 clients { diff --git a/ndt7/listener/listener.go b/ndt7/listener/listener.go index c915109e..01feb178 100644 --- a/ndt7/listener/listener.go +++ b/ndt7/listener/listener.go @@ -23,15 +23,15 @@ var logFatalf = log.Fatalf // The code here is adapted from https://golang.org/src/net/http/server.go?s=85391:85432#L2742 -// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted +// CachingTCPKeepAliveListener sets TCP keep-alive timeouts on accepted // connections. It's used by ListenAndServe and ListenAndServeTLS so // dead TCP connections (e.g. closing laptop mid-download) eventually // go away. -type tcpKeepAliveListener struct { +type CachingTCPKeepAliveListener struct { *net.TCPListener } -func (ln tcpKeepAliveListener) Accept() (net.Conn, error) { +func (ln CachingTCPKeepAliveListener) Accept() (net.Conn, error) { tc, err := ln.AcceptTCP() if err != nil { return nil, err @@ -40,6 +40,11 @@ func (ln tcpKeepAliveListener) Accept() (net.Conn, error) { tc.SetKeepAlivePeriod(3 * time.Minute) fp, err := fdcache.TCPConnToFile(tc) if err != nil { + var dest net.Addr + if tc != nil { + dest = tc.RemoteAddr() + } + log.Println("Could not save TCPConnection to fdcache, connection was to", dest) tc.Close() return nil, err } @@ -83,7 +88,7 @@ func ListenAndServeAsync(server *http.Server) error { server.Addr = listener.Addr().String() } // Serve asynchronously. - go serve(server, tcpKeepAliveListener{listener.(*net.TCPListener)}) + go serve(server, CachingTCPKeepAliveListener{listener.(*net.TCPListener)}) return nil } @@ -114,11 +119,11 @@ func ListenAndServeTLSAsync(server *http.Server, certFile, keyFile string) error // GET-able. In ipv6-only contexts it could be, for example, "[::]:3232", and // that URL can't be used for TLS because TLS needs a name or an explicit IP // and [::] doesn't qualify. It is unclear what the right thing to do is in - // this situation, because names and IPs and TLS are suffciently complicated + // this situation, because names and IPs and TLS are sufficiently complicated // that no one thing is the right thing in all situations, so we affirmatively // do nothing in an attempt to avoid making a bad situation worse. // Serve asynchronously. - go serveTLS(server, tcpKeepAliveListener{listener.(*net.TCPListener)}, certFile, keyFile) + go serveTLS(server, CachingTCPKeepAliveListener{listener.(*net.TCPListener)}, certFile, keyFile) return nil }