Skip to content

Commit

Permalink
Merge pull request #149 from m-lab/sandbox-soltesz-unified-schema
Browse files Browse the repository at this point in the history
Unify ndt5 and ndt7 result schema
  • Loading branch information
stephen-soltesz authored Jul 17, 2019
2 parents e64c572 + f6b9529 commit d9ebeca
Show file tree
Hide file tree
Showing 18 changed files with 246 additions and 179 deletions.
24 changes: 16 additions & 8 deletions data/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"time"

"github.com/m-lab/ndt-server/ndt5/c2s"
ndt5data "github.com/m-lab/ndt-server/ndt5/data"
"github.com/m-lab/ndt-server/ndt5/meta"
"github.com/m-lab/ndt-server/ndt5/control"
"github.com/m-lab/ndt-server/ndt5/s2c"

"github.com/m-lab/ndt-server/ndt7/model"
)

// NDTResult is the struct that is serialized as JSON to disk as the archival record of an NDT test.
// NDTResult is the struct that is serialized as JSON to disk as the archival
// record of an NDT test.
//
// This struct is dual-purpose. It contains the necessary data to allow joining
// with tcp-info data and traceroute-caller data as well as any other UUID-based
Expand All @@ -18,19 +20,25 @@ import (
type NDTResult struct {
// GitShortCommit is the Git commit (short form) of the running server code.
GitShortCommit string
// Version is the symbolic version (if any) of the running server code.
Version string

// All data members should all be self-describing. In the event of confusion,
// rename them to add clarity rather than adding a comment.
NDT5Metadata *ndt5data.Metadata `json:",omitempty"`

ServerIP string
ServerPort int
ClientIP string
ClientPort int

StartTime time.Time
EndTime time.Time
C2S *c2s.ArchivalData `json:",omitempty"`
S2C *s2c.ArchivalData `json:",omitempty"`
Meta meta.ArchivalData `json:",omitempty"`

// ndt5
Control *control.ArchivalData `json:",omitempty"`
C2S *c2s.ArchivalData `json:",omitempty"`
S2C *s2c.ArchivalData `json:",omitempty"`

// ndt7
Upload *model.ArchivalData `json:",omitempty"`
Download *model.ArchivalData `json:",omitempty"`
}
14 changes: 8 additions & 6 deletions ndt5/c2s/c2s.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ type ArchivalData struct {
// The server and client IP are here as well as in the containing struct
// because happy eyeballs means that we may have a IPv4 control connection
// causing a IPv6 connection to the test port or vice versa.
ServerIP string
ClientIP string
ServerIP string
ServerPort int
ClientIP string
ClientPort int

// This is the only field that is really required.
TestConnectionUUID string
UUID string

// These fields are here to enable analyses that don't require joining with tcp-info data.
StartTime time.Time
Expand Down Expand Up @@ -79,9 +81,9 @@ func ManageTest(ctx context.Context, controlConn protocol.Connection, s ndt.Serv
}()
}()

record.TestConnectionUUID = testConn.UUID()
record.ServerIP = testConn.ServerIP()
record.ClientIP = testConn.ClientIP()
record.UUID = testConn.UUID()
record.ServerIP, record.ServerPort = testConn.ServerIPAndPort()
record.ClientIP, record.ClientPort = testConn.ClientIPAndPort()

err = m.SendMessage(protocol.TestStart, []byte{})
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions ndt5/control/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package control

import (
"github.com/m-lab/ndt-server/ndt5/meta"
"github.com/m-lab/ndt-server/ndt5/ndt"
)

type ArchivalData struct {
// These data members should all be self-describing. In the event of confusion,
// rename them to add clarity rather than adding a comment.
UUID string
Protocol ndt.ConnectionType
MessageProtocol string
ClientMetadata meta.ArchivalData `json:",omitempty" bigquery:"-"`
}
11 changes: 0 additions & 11 deletions ndt5/data/data.go

This file was deleted.

5 changes: 0 additions & 5 deletions ndt5/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ var maxClientMessages = 20
// ArchivalData contains all meta data reported by the client.
type ArchivalData map[string]string

type archiveErr struct {
archivalData ArchivalData
err error
}

// ManageTest runs the meta tests. If the given ctx is canceled or the meta test
// takes longer than 15sec, then ManageTest will return after the next ReceiveMessage.
// The given protocolMessager should have its own connection timeout to prevent
Expand Down
24 changes: 15 additions & 9 deletions ndt5/ndt5.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ import (
"time"

"github.com/m-lab/go/rtx"
"github.com/m-lab/ndt-server/ndt5/control"
"github.com/m-lab/ndt-server/version"

"github.com/m-lab/go/prometheusx"
"github.com/m-lab/go/warnonerror"

"github.com/m-lab/ndt-server/data"
"github.com/m-lab/ndt-server/metrics"
"github.com/m-lab/ndt-server/ndt5/c2s"
ndt5data "github.com/m-lab/ndt-server/ndt5/data"
"github.com/m-lab/ndt-server/ndt5/meta"
ndt5metrics "github.com/m-lab/ndt-server/ndt5/metrics"
"github.com/m-lab/ndt-server/ndt5/ndt"
Expand Down Expand Up @@ -48,7 +49,7 @@ func SaveData(record *data.NDTResult, datadir string) {
log.Printf("Could not create directory %s: %v\n", dir, err)
return
}
file, err := protocol.UUIDToFile(dir, record.NDT5Metadata.ControlChannelUUID)
file, err := protocol.UUIDToFile(dir, record.Control.UUID)
if err != nil {
log.Println("Could not open file:", err)
return
Expand Down Expand Up @@ -117,15 +118,20 @@ func handleControlChannel(conn protocol.Connection, s ndt.Server) {
log.Println("Handling connection", conn)
defer warnonerror.Close(conn, "Could not close "+conn.String())

sIP, sPort := conn.ServerIPAndPort()
cIP, cPort := conn.ClientIPAndPort()
record := &data.NDTResult{
GitShortCommit: prometheusx.GitShortCommit,
Version: version.Version,
StartTime: time.Now(),
NDT5Metadata: &ndt5data.Metadata{
ControlChannelUUID: conn.UUID(),
Protocol: s.ConnectionType(),
Control: &control.ArchivalData{
UUID: conn.UUID(),
Protocol: s.ConnectionType(),
},
ServerIP: conn.ServerIP(),
ClientIP: conn.ClientIP(),
ServerIP: sIP,
ServerPort: sPort,
ClientIP: cIP,
ClientPort: cPort,
}
defer func() {
record.EndTime = time.Now()
Expand Down Expand Up @@ -174,7 +180,7 @@ func handleControlChannel(conn protocol.Connection, s ndt.Server) {
ndt5metrics.ClientRequestedTestSuites.WithLabelValues(strings.Join(suites, "-")).Inc()

m := conn.Messager()
record.NDT5Metadata.MessageProtocol = m.Encoding().String()
record.Control.MessageProtocol = m.Encoding().String()
rtx.PanicOnError(
m.SendMessage(protocol.SrvQueue, []byte("0")),
"SrvQueue - Could not send SrvQueue")
Expand Down Expand Up @@ -203,7 +209,7 @@ func handleControlChannel(conn protocol.Connection, s ndt.Server) {
rtx.PanicOnError(err, "S2C - Could not run s2c test")
}
if runMeta {
record.Meta, err = meta.ManageTest(ctx, m)
record.Control.ClientMetadata, err = meta.ManageTest(ctx, m)
rtx.PanicOnError(err, "META - Could not run meta test")
}
speedMsg := fmt.Sprintf("You uploaded at %.4f and downloaded at %.4f", c2sRate*1000, s2cRate*1000)
Expand Down
20 changes: 10 additions & 10 deletions ndt5/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ type Connection interface {
ReadBytes() (count int64, err error)
WriteMessage(messageType int, data []byte) error
FillUntil(t time.Time, buffer []byte) (bytesWritten int64, err error)
ServerIP() string
ClientIP() string
ServerIPAndPort() (string, int)
ClientIPAndPort() (string, int)
Close() error
UUID() string
String() string
Expand Down Expand Up @@ -190,14 +190,14 @@ func (ws *wsConnection) UUID() string {
return id
}

func (ws *wsConnection) ServerIP() string {
func (ws *wsConnection) ServerIPAndPort() (string, int) {
localAddr := ws.UnderlyingConn().LocalAddr().(*net.TCPAddr)
return localAddr.IP.String()
return localAddr.IP.String(), localAddr.Port
}

func (ws *wsConnection) ClientIP() string {
func (ws *wsConnection) ClientIPAndPort() (string, int) {
remoteAddr := ws.UnderlyingConn().RemoteAddr().(*net.TCPAddr)
return remoteAddr.IP.String()
return remoteAddr.IP.String(), remoteAddr.Port
}

// ReadBytes reads some bytes and discards them. This method is in service of
Expand Down Expand Up @@ -283,14 +283,14 @@ func (nc *netConnection) UUID() string {
return id
}

func (nc *netConnection) ServerIP() string {
func (nc *netConnection) ServerIPAndPort() (string, int) {
localAddr := nc.LocalAddr().(*net.TCPAddr)
return localAddr.IP.String()
return localAddr.IP.String(), localAddr.Port
}

func (nc *netConnection) ClientIP() string {
func (nc *netConnection) ClientIPAndPort() (string, int) {
remoteAddr := nc.RemoteAddr().(*net.TCPAddr)
return remoteAddr.IP.String()
return remoteAddr.IP.String(), remoteAddr.Port
}

func (nc *netConnection) String() string {
Expand Down
14 changes: 8 additions & 6 deletions ndt5/s2c/s2c.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ import (
// data, then they should use the UUID to get deeper data from tcp-info.
type ArchivalData struct {
// This is the only field that is really required.
TestConnectionUUID string
UUID string

// All subsequent fields are here to enable analyses that don't require joining
// with tcp-info data.

// The server and client IP are here as well as in the containing struct
// because happy eyeballs means that we may have a IPv4 control connection
// causing a IPv6 connection to the test port or vice versa.
ServerIP string
ClientIP string
ServerIP string
ServerPort int
ClientIP string
ClientPort int

StartTime time.Time
EndTime time.Time
Expand Down Expand Up @@ -73,9 +75,9 @@ func ManageTest(ctx context.Context, controlConn protocol.Connection, s ndt.Serv
return record, err
}
defer warnonerror.Close(testConn, "Could not close test connection")
record.TestConnectionUUID = testConn.UUID()
record.ServerIP = testConn.ServerIP()
record.ClientIP = testConn.ClientIP()
record.UUID = testConn.UUID()
record.ServerIP, record.ServerPort = testConn.ServerIPAndPort()
record.ClientIP, record.ClientPort = testConn.ClientIPAndPort()

dataToSend := make([]byte, 8192)
for i := range dataToSend {
Expand Down
4 changes: 3 additions & 1 deletion ndt7/download/measurer/measurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func loop(
}
defer sockfp.Close()
t0 := time.Now()
resultsfp.StartTest()
defer resultsfp.EndTest()
ticker := time.NewTicker(spec.MinMeasurementInterval)
defer ticker.Stop()
sentConnectionInfo := false
Expand All @@ -80,7 +82,7 @@ func loop(
measurement.ConnectionInfo = &model.ConnectionInfo{
Client: conn.RemoteAddr().String(),
Server: conn.LocalAddr().String(),
UUID: "urn:mlab:" + resultsfp.UUID,
UUID: resultsfp.Data.UUID,
}
sentConnectionInfo = true
}
Expand Down
48 changes: 45 additions & 3 deletions ndt7/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@ package handler
import (
"context"
"fmt"
"net"
"net/http"
"time"

"github.com/gorilla/websocket"
"github.com/m-lab/go/prometheusx"
"github.com/m-lab/go/warnonerror"
"github.com/m-lab/ndt-server/data"
"github.com/m-lab/ndt-server/logging"
"github.com/m-lab/ndt-server/ndt7/download"
"github.com/m-lab/ndt-server/ndt7/results"
"github.com/m-lab/ndt-server/ndt7/spec"
"github.com/m-lab/ndt-server/ndt7/upload"
"github.com/m-lab/ndt-server/version"
)

// Handler handles ndt7 subtests.
Expand All @@ -21,7 +26,7 @@ type Handler struct {
Upgrader websocket.Upgrader

// DataDir is the directory where results are saved.
DataDir string
DataDir string
}

// warnAndClose emits message as a warning and the sends a Bad Request
Expand Down Expand Up @@ -65,13 +70,50 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ
// and close(2), we'll end up keeping sockets that caused an error in the
// code above (e.g. because the handshake was not okay) alive for the time
// in which the corresponding *os.File is kept in cache.
defer warnonerror.Close(conn, "download: ignoring conn.Close result")
defer warnonerror.Close(conn, "downloadOrUpload: ignoring conn.Close result")
logging.Logger.Debug("downloadOrUpload: opening results file")
resultfp, err := results.OpenFor(request, conn, h.DataDir, kind)
if err != nil {
return // error already printed
}
defer warnonerror.Close(resultfp, "download: ignoring resultfp.Close result")

// Collect test metadata.
// NOTE: unless we plan to run the NDT server over different protocols than TCP,
// then we expect RemoteAddr and LocalAddr to always return net.TCPAddr types.
clientAddr, ok := conn.RemoteAddr().(*net.TCPAddr)
if !ok {
clientAddr = &net.TCPAddr{IP: net.ParseIP("::1"), Port: 1}
}
serverAddr, ok := conn.LocalAddr().(*net.TCPAddr)
if !ok {
serverAddr = &net.TCPAddr{IP: net.ParseIP("::1"), Port: 1}
}
result := &data.NDTResult{
GitShortCommit: prometheusx.GitShortCommit,
Version: version.Version,
ClientIP: clientAddr.IP.String(),
ClientPort: clientAddr.Port,
ServerIP: serverAddr.IP.String(),
ServerPort: serverAddr.Port,
StartTime: time.Now(),
}
// Guarantee that we record an end time, even if tester panics.
defer func() {
// TODO(m-lab/ndt-server/issues/152): Simplify interface between result.File and data.NDTResult.
result.EndTime = time.Now()
if kind == spec.SubtestDownload {
result.Download = resultfp.Data
} else if kind == spec.SubtestUpload {
result.Upload = resultfp.Data
} else {
logging.Logger.Warn(string(kind) + ": data not saved")
}
if err := resultfp.WriteResult(result); err != nil {
logging.Logger.WithError(err).Warn("failed to write result")
}
warnonerror.Close(resultfp, string(kind)+": ignoring resultfp.Close error")
}()

tester(request.Context(), conn, resultfp)
}

Expand Down
14 changes: 14 additions & 0 deletions ndt7/model/archivaldata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package model

import "time"

// ArchivalData saves all instantaneous measurements over the lifetime of a test.
type ArchivalData struct {
UUID string
StartTime time.Time
EndTime time.Time
ServerMeasurements []Measurement
ClientMeasurements []Measurement
// TODO(m-lab/ndt-server/issues/151): remove bigquery tag.
ClientMetadata map[string]string `json:",omitempty" bigquery:"-"`
}
2 changes: 1 addition & 1 deletion ndt7/model/measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type Measurement struct {
AppInfo *AppInfo `json:"app_info,omitempty"`

// ConnectionInfo contains connection information.
ConnectionInfo *ConnectionInfo `json:"connection_info,omitempty"`
ConnectionInfo *ConnectionInfo `json:"connection_info,omitempty" bigquery:"-"`

// BBRInfo is the data measured using TCP BBR instrumentation.
BBRInfo *BBRInfo `json:"bbr_info,omitempty"`
Expand Down
Loading

0 comments on commit d9ebeca

Please sign in to comment.