Skip to content

Commit

Permalink
Save data (#112)
Browse files Browse the repository at this point in the history
Data is now saved when users run a test.
Multiple race conditions have been eliminated, as well as multiple goroutine leaks/livelocks/deadlocks.
  • Loading branch information
pboothe authored May 10, 2019
1 parent f2094d8 commit a40df81
Show file tree
Hide file tree
Showing 14 changed files with 553 additions and 161 deletions.
178 changes: 141 additions & 37 deletions legacy/c2s/c2s.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,84 +6,188 @@ import (
"strconv"
"time"

"github.com/m-lab/ndt-server/legacy/metrics"

"github.com/m-lab/go/warnonerror"
"github.com/m-lab/ndt-server/legacy/ndt"
"github.com/m-lab/ndt-server/legacy/protocol"
"github.com/m-lab/ndt-server/legacy/singleserving"
)

// ArchivalData is the data saved by the C2S test. If a researcher wants deeper
// data, then they should use the UUID to get deeper data from tcp-info.
type ArchivalData struct {
// The server and client IP are here as well as in the containing struct
// because happy eyeballs means that we may have a IPv4 control connection
// causing a IPv6 connection to the test port or vice versa.
ServerIP string
ClientIP string

// This is the only field that is really required.
TestConnectionUUID string

// These fields are here to enable analyses that don't require joining with tcp-info data.
StartTime time.Time
EndTime time.Time
MeanThroughputMbps float64
// TODO: Add TCPEngine (bbr, cubic, reno, etc.)

Error string `json:",omitempty"`
}

// ManageTest manages the c2s test lifecycle.
func ManageTest(ctx context.Context, conn protocol.Connection, f singleserving.Factory) (float64, error) {
func ManageTest(ctx context.Context, controlConn protocol.Connection, s ndt.Server) (*ArchivalData, error) {
localContext, localCancel := context.WithTimeout(ctx, 30*time.Second)
defer localCancel()
record := &ArchivalData{}

srv, err := f.SingleServingServer("c2s")
srv, err := s.SingleServingServer("c2s")
if err != nil {
log.Println("Could not start SingleServingServer", err)
return 0, err
metrics.ErrorCount.WithLabelValues("c2s", "StartSingleServingServer")
record.Error = err.Error()
return record, err
}

err = protocol.SendJSONMessage(protocol.TestPrepare, strconv.Itoa(srv.Port()), conn)
err = protocol.SendJSONMessage(protocol.TestPrepare, strconv.Itoa(srv.Port()), controlConn)
if err != nil {
log.Println("Could not send TestPrepare", err)
return 0, err
metrics.ErrorCount.WithLabelValues("c2s", "TestPrepare")
record.Error = err.Error()
return record, err
}

testConn, err := srv.ServeOnce(localContext)
if err != nil {
log.Println("Could not successfully ServeOnce", err)
return 0, err
metrics.ErrorCount.WithLabelValues("c2s", "ServeOnce")
record.Error = err.Error()
return record, err
}
defer warnonerror.Close(testConn, "Could not close test connection")

err = protocol.SendJSONMessage(protocol.TestStart, "", conn)
// When ManageTest exits, close the test connection.
defer func() {
// Allow the connection-draining goroutine to empty all buffers in support of
// poorly-written clients before we close the connection, but do not block the
// exit of ManageTest on waiting for the test connection to close.
go func() {
time.Sleep(3 * time.Second)
warnonerror.Close(testConn, "Could not close test connection")
}()
}()

record.TestConnectionUUID = testConn.UUID()
record.ServerIP = testConn.ServerIP()
record.ClientIP = testConn.ClientIP()

err = protocol.SendJSONMessage(protocol.TestStart, "", controlConn)
if err != nil {
log.Println("Could not send TestStart", err)
return 0, err
metrics.ErrorCount.WithLabelValues("c2s", "TestStart")
record.Error = err.Error()
return record, err
}

seconds := float64(10)
startTime := time.Now()
endTime := startTime.Add(10 * time.Second)
errorTime := endTime.Add(5 * time.Second)
err = testConn.SetReadDeadline(errorTime)
if err != nil {
log.Println("Could not set deadline", err)
return 0, err
}
byteCount, err := testConn.DrainUntil(endTime)
record.StartTime = time.Now()
byteCount, err := drainForeverButMeasureFor(testConn, 10*time.Second)
record.EndTime = time.Now()
log.Println("Ended C2S test on", testConn)
if err != nil {
if byteCount == 0 {
log.Println("Could not drain the test connection", byteCount, err)
return 0, err
metrics.ErrorCount.WithLabelValues("c2s", "Drain")
record.Error = err.Error()
return record, err
}
// It is possible for the client to reach 10 seconds slightly before the server does.
seconds = time.Now().Sub(startTime).Seconds()
seconds := record.EndTime.Sub(record.StartTime).Seconds()
if seconds < 9 {
log.Printf("C2S test only uploaded for %f seconds\n", seconds)
return 0, err
} else if seconds > 11 {
log.Printf("C2S test uploaded-read-loop exited late (%f seconds) because the read stalled. We will continue with the test.\n", seconds)
} else {
log.Printf("C2S test had an error after %f seconds, which is within acceptable bounds. We will continue with the test.\n", seconds)
log.Printf("C2S test client only uploaded for %f seconds\n", seconds)
metrics.ErrorCount.WithLabelValues("c2s", "EarlyExit")
record.Error = err.Error()
return record, err
}
} else {
// Empty out the buffer for poorly-behaved clients.
// TODO: ensure this behavior is required by a unit test.
testConn.DrainUntil(errorTime)
// More than 9 seconds is fine.
log.Printf("C2S test had an error (%v) after %f seconds. We will continue with the test.\n", err, seconds)
}

throughputValue := 8 * float64(byteCount) / 1000 / 10
record.MeanThroughputMbps = throughputValue / 1000 // Convert Kbps to Mbps

err = protocol.SendJSONMessage(protocol.TestMsg, strconv.FormatFloat(throughputValue, 'g', -1, 64), conn)
err = protocol.SendJSONMessage(protocol.TestMsg, strconv.FormatFloat(throughputValue, 'g', -1, 64), controlConn)
if err != nil {
log.Println("Could not send TestMsg with C2S results", err)
return 0, err
metrics.ErrorCount.WithLabelValues("c2s", "TestMsg")
record.Error = err.Error()
return record, err
}

err = protocol.SendJSONMessage(protocol.TestFinalize, "", conn)
err = protocol.SendJSONMessage(protocol.TestFinalize, "", controlConn)
if err != nil {
log.Println("Could not send TestFinalize", err)
return throughputValue, err
metrics.ErrorCount.WithLabelValues("c2s", "TestFinalize")
record.Error = err.Error()
return record, err
}

return record, nil
}

// drainForeverButMeasureFor is a generic method for draining a connection while
// measuring the connection for the first part of the drain. This method does
// not close the passed-in Connection, and starts a goroutine which runs until
// that Connection is closed.
func drainForeverButMeasureFor(conn protocol.Connection, d time.Duration) (int64, error) {
type measurement struct {
totalByteCount int64
err error
}
measurements := make(chan measurement)

return throughputValue, nil
// This is the "drain forever" part of this function. Read the passed-in
// connection until the passed-in connection is closed. Only send measurements
// on the measurement channel if the channel can be written to without
// blocking.
go func() {
var totalByteCount int64
var err error
// Read the connections until the connection is closed. Reading on a closed
// connection returns an error, which terminates the loop and the goroutine.
for err == nil {
var byteCount int64
byteCount, err = conn.ReadBytes()
totalByteCount += byteCount
// Only write to the channel if it won't block, to ensure the reading process
// goes as fast as possible.
select {
case measurements <- measurement{totalByteCount, err}:
default:
}
}
// After we get an error, drain the channel and then close it.
fullChannel := true
for fullChannel {
select {
case <-measurements:
default:
fullChannel = false
}
}
close(measurements)
}()

// Read the measurements channel until the timer goes off.
timer := time.NewTimer(d)
var bytesRead int64
var err error
timerActive := true
for timerActive {
select {
case m := <-measurements:
bytesRead = m.totalByteCount
err = m.err
case <-timer.C:
timerActive = false
}
}
return bytesRead, err
}
35 changes: 30 additions & 5 deletions legacy/handler/wshandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,23 @@ package handler
import (
"log"
"net/http"
"time"

"github.com/m-lab/go/warnonerror"
"github.com/m-lab/ndt-server/legacy"
"github.com/m-lab/ndt-server/legacy/ndt"
"github.com/m-lab/ndt-server/legacy/protocol"
"github.com/m-lab/ndt-server/legacy/singleserving"
"github.com/m-lab/ndt-server/legacy/ws"
)

// WSHandler is both an ndt.Server and an http.Handler to allow websocket-based
// NDT tests to be run by Go's http libraries.
type WSHandler interface {
ndt.Server
http.Handler
}

type httpFactory struct{}

func (hf *httpFactory) SingleServingServer(dir string) (singleserving.Server, error) {
Expand All @@ -20,7 +29,19 @@ func (hf *httpFactory) SingleServingServer(dir string) (singleserving.Server, er
// httpHandler handles requests that come in over HTTP or HTTPS. It should be
// created with MakeHTTPHandler() or MakeHTTPSHandler().
type httpHandler struct {
serverFactory singleserving.Factory
serverFactory singleserving.Factory
connectionType ndt.ConnectionType
datadir string
}

func (s *httpHandler) DataDir() string { return s.datadir }
func (s *httpHandler) ConnectionType() ndt.ConnectionType { return s.connectionType }

func (s *httpHandler) TestLength() time.Duration { return 10 * time.Second }
func (s *httpHandler) TestMaxTime() time.Duration { return 30 * time.Second }

func (s *httpHandler) SingleServingServer(dir string) (singleserving.Server, error) {
return s.serverFactory.SingleServingServer(dir)
}

// ServeHTTP is the command channel for the NDT-WS or NDT-WSS test. All
Expand All @@ -37,13 +58,15 @@ func (s *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
ws := protocol.AdaptWsConn(wsc)
defer warnonerror.Close(ws, "Could not close connection")
legacy.HandleControlChannel(ws, s.serverFactory)
legacy.HandleControlChannel(ws, s)
}

// NewWS returns a handler suitable for http-based connections.
func NewWS() http.Handler {
func NewWS(datadir string) WSHandler {
return &httpHandler{
serverFactory: &httpFactory{},
serverFactory: &httpFactory{},
connectionType: ndt.WS,
datadir: datadir,
}
}

Expand All @@ -57,11 +80,13 @@ func (hf *httpsFactory) SingleServingServer(dir string) (singleserving.Server, e
}

// NewWSS returns a handler suitable for https-based connections.
func NewWSS(certFile, keyFile string) http.Handler {
func NewWSS(datadir, certFile, keyFile string) WSHandler {
return &httpHandler{
serverFactory: &httpsFactory{
certFile: certFile,
keyFile: keyFile,
},
connectionType: ndt.WSS,
datadir: datadir,
}
}
Loading

0 comments on commit a40df81

Please sign in to comment.