diff --git a/README.md b/README.md index fd76df4..49c3fbe 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ You may optionally build a Debian package by typing the following command at the The `tcpsplice` binary will be installed by the package in the `/usr/sbin` directory, with additional startup scripts and a default configuration file in `/etc/tcpsplice.conf`. - $ sudo dpkg -i tcpsplice_1.0.1_amd64.deb + $ sudo dpkg -i tcpsplice_1.0.2_amd64.deb Selecting previously unselected package tcpsplice. - Unpacking tcpsplice (from tcpsplice_1.0.1_amd64.deb) ... - Setting up tcpsplice (1.0.1) ... + Unpacking tcpsplice (from tcpsplice_1.0.2_amd64.deb) ... + Setting up tcpsplice (1.0.2) ... diff --git a/debian/changelog b/debian/changelog index 9c94479..363b4c7 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +tcpsplice (1.0.2) stable; urgency=medium + + * add arbitrary metadata scan (+ display on monitoring screen) + + -- Pierre-Yves Kerembellec Mon, 26 Sep 2016 22:20:56 +0200 + tcpsplice (1.0.1) stable; urgency=medium * initial version diff --git a/monitor.html b/monitor.html index a803023..109b89b 100644 --- a/monitor.html +++ b/monitor.html @@ -16,7 +16,7 @@ float: left; clear: both; width: 100%; - min-width: 1000px; + min-width: 1180px; height: 30px; margin-bottom: 20px; background-color: #f0f0f0; @@ -38,7 +38,7 @@ float: left; clear: both; width: 100%; - min-width: 1000px; + min-width: 1180px; height: 30px; border: 1px solid #e0e0e0; border-radius: 4px 4px 0px 0px; @@ -58,18 +58,22 @@ { clear: both; width: 100%; - min-width: 1000px; + min-width: 1180px; min-height: 38px; margin-bottom: 10px; border: 1px solid #e0e0e0; border-top: none; border-radius: 0px 0px 4px 4px; + font-size: 26px; + color: #e0e0e0; } .session { width: 100%; + border-bottom: 1px solid #e0e0e0; font-family: arial, verdana; font-size: 13px; + color: #404040; } .session:nth-child(even) { @@ -79,14 +83,19 @@ { background-color: #f8f8f8; } -.session:last-child -{ - border-radius: 0px 0px 4px 4px; -} .session:hover { background-color: #d0d0d0; } +.session:first-child:hover +{ + background-color: #f8f8f8 !important; +} +.session:last-child +{ + border-radius: 0px 0px 4px 4px; + border-style: none !important; +} .session span { display: inline-block; @@ -116,7 +125,7 @@ } .small { - width: 50px; + width: 70px; text-align: center !important; } @@ -146,7 +155,7 @@ function terminate(id) { - $.ajax({ url: sprintf('/close/%s', id) }); + $.ajax({ url: sprintf('/abort/%s', id) }); return false; } @@ -178,19 +187,31 @@ '%d connection%s' + '
', service, total[1], total[0], ids.length, ids.length > 1 ? "s" : ""); + if (ids.length > 0) + { + services += '
source addresstarget address' + + 'durationuplink ▲' + + 'downlink ▼metadataabort
'; + } + else + { + services += '
no active session
'; + } $.each(ids, function(index, session) { info = sessions[session[0]]; services += sprintf('
' + '%s%s%s' + - '%s - %.1fMbps - %.1fMbps ▲' + - '%s - %.1fMbps - %.1fMbps ▼' + + '%s - %.1fMbps - %.1fMbps' + + '%s - %.1fMbps - %.1fMbps' + + '%s' + '' + '
', info.done ? " done" : "", info.source, info.target, age(info.duration), size(info.bytes[0]), info.mean[0], info.last[0], - size(info.bytes[1]), info.mean[1], info.last[1], session[0]); + size(info.bytes[1]), info.mean[1], info.last[1], + info.meta, session[0]); }); services += '
'; }); diff --git a/tcpsplice.go b/tcpsplice.go index 0af9bff..23b2568 100644 --- a/tcpsplice.go +++ b/tcpsplice.go @@ -12,22 +12,23 @@ import ( "net/http" "os" "path/filepath" + "regexp" "strings" "sync" "time" ) const progname = "tcpsplice" -const version = "1.0.1" +const version = "1.0.2" type Session struct { - id, service, source, target string + id, service, source, target, meta string started, active, last time.Time sourceRead, targetWritten, targetRead, sourceWritten int64 sourceMeanThroughput, targetMeanThroughput float64 sourceLastThroughput, targetLastThroughput float64 loggued, done bool - closer chan bool + abort chan bool } var ( @@ -54,6 +55,8 @@ func serviceHandler(service string, listener *net.TCPListener) { sessionMinimumSize := config.GetSizeBounds(fmt.Sprintf("services.%s.session_minimum_size", service), 1024, 0, math.MaxInt64) writeTimeout := time.Second * time.Duration(config.GetDurationBounds(fmt.Sprintf("services.%s.write_timeout", service), 10, 2, 60)) idleTimeout := time.Second * time.Duration(config.GetDurationBounds(fmt.Sprintf("services.%s.idle_timeout", service), 60, 0, 300)) + metaSize := int(config.GetSizeBounds(fmt.Sprintf("services.%s.meta_size", service), 16*1024, 0, 64*1024)) + metaScan, _ := regexp.Compile(config.GetString(fmt.Sprintf("services.%s.meta_scan", service), "[")) source.SetReadBuffer(int(incomingSize)) source.SetWriteBuffer(int(incomingSize)) target.(*net.TCPConn).SetReadBuffer(int(outgoingSize)) @@ -66,7 +69,8 @@ func serviceHandler(service string, listener *net.TCPListener) { started: time.Now(), active: time.Now(), last: time.Time{}, - closer: make(chan bool, 1), + meta: "-", + abort: make(chan bool, 1), } if logMinimumSize == 0 { session.loggued = true @@ -77,9 +81,23 @@ func serviceHandler(service string, listener *net.TCPListener) { } go func() { data := make([]byte, incomingSize) + meta := make([]byte, 0, metaSize) for { read, err := source.Read(data) if read > 0 { + if metaScan != nil && session.meta == "-" && len(meta) < metaSize { + length := int(math.Min(float64(read), float64(metaSize-len(meta)))) + meta = append(meta, data[:length]...) + if metaScan.Match(meta) { + session.meta = "" + for _, part := range metaScan.FindSubmatch(meta)[1:] { + session.meta += string(part) + } + if session.meta == "" { + session.meta = "-" + } + } + } session.sourceRead += int64(read) session.active = time.Now() target.SetWriteDeadline(time.Now().Add(writeTimeout)) @@ -100,11 +118,25 @@ func serviceHandler(service string, listener *net.TCPListener) { target.Close() }() data := make([]byte, outgoingSize) + meta := make([]byte, 0, metaSize) close := false for !close { target.SetReadDeadline(time.Now().Add(time.Second)) read, err := target.Read(data) if read > 0 { + if metaScan != nil && session.meta == "-" && len(meta) < metaSize { + length := int(math.Min(float64(read), float64(metaSize-len(meta)))) + meta = append(meta, data[:length]...) + if metaScan.Match(meta) { + session.meta = "" + for _, part := range metaScan.FindSubmatch(meta)[1:] { + session.meta += string(part) + } + if session.meta == "" { + session.meta = "-" + } + } + } session.targetRead += int64(read) session.active = time.Now() source.SetWriteDeadline(time.Now().Add(writeTimeout)) @@ -131,7 +163,7 @@ func serviceHandler(service string, listener *net.TCPListener) { statistics <- session } select { - case close = <-session.closer: + case close = <-session.abort: default: } if idleTimeout != 0 && time.Now().Sub(session.active) >= idleTimeout { @@ -187,6 +219,7 @@ func monitorHandler(response http.ResponseWriter, request *http.Request) { "bytes": [2]int64{session.sourceRead, session.targetRead}, "mean": [2]float64{session.sourceMeanThroughput, session.targetMeanThroughput}, "last": [2]float64{session.sourceLastThroughput, session.targetLastThroughput}, + "meta": session.meta, "done": session.done, } } @@ -198,13 +231,13 @@ func monitorHandler(response http.ResponseWriter, request *http.Request) { if json, err := json.Marshal(output); err == nil { response.Write(json) } - } else if strings.Index(request.URL.Path, "/close/") == 0 { + } else if strings.Index(request.URL.Path, "/abort/") == 0 { sessionsLock.RLock() for id, session := range sessions { if id == request.URL.Path[7:] && !session.done { session.done = true - session.closer <- true - logger.Warn(map[string]interface{}{"type": "close", "id": id, "service": session.service, "source": session.source, "target": session.target}) + session.abort <- true + logger.Warn(map[string]interface{}{"type": "abort", "id": id, "service": session.service, "source": session.source, "target": session.target}) break } } @@ -316,7 +349,7 @@ func main() { case session := <-statistics: sessionsLock.Lock() if sessions[session.id] == nil { - sessions[session.id] = &Session{service: session.service, source: session.source, target: session.target, started: session.started, closer: session.closer} + sessions[session.id] = &Session{service: session.service, source: session.source, target: session.target, started: session.started, abort: session.abort} } sessionsLock.Unlock() duration := math.Max(float64(time.Now().Sub(session.started))/float64(time.Second), 0.001) @@ -332,6 +365,7 @@ func main() { } sessions[session.id].last = session.last sessions[session.id].done = session.done + sessions[session.id].meta = session.meta sessions[session.id].sourceRead = session.sourceRead sessions[session.id].targetWritten = session.targetWritten sessions[session.id].targetRead = session.targetRead