Skip to content

Commit

Permalink
add arbitrary metadata scan (+ display on monitoring screen)
Browse files Browse the repository at this point in the history
  • Loading branch information
pyke369 committed Sep 26, 2016
1 parent bb78fff commit 3f0d66e
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 24 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ You may optionally build a Debian package by typing the following command at the
The `tcpsplice` binary will be installed by the package in the `/usr/sbin` directory, with additional
startup scripts and a default configuration file in `/etc/tcpsplice.conf`.

$ sudo dpkg -i tcpsplice_1.0.1_amd64.deb
$ sudo dpkg -i tcpsplice_1.0.2_amd64.deb
Selecting previously unselected package tcpsplice.
Unpacking tcpsplice (from tcpsplice_1.0.1_amd64.deb) ...
Setting up tcpsplice (1.0.1) ...
Unpacking tcpsplice (from tcpsplice_1.0.2_amd64.deb) ...
Setting up tcpsplice (1.0.2) ...
6 changes: 6 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
tcpsplice (1.0.2) stable; urgency=medium

* add arbitrary metadata scan (+ display on monitoring screen)

-- Pierre-Yves Kerembellec <[email protected]> Mon, 26 Sep 2016 22:20:56 +0200

tcpsplice (1.0.1) stable; urgency=medium

* initial version
Expand Down
45 changes: 33 additions & 12 deletions monitor.html
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
float: left;
clear: both;
width: 100%;
min-width: 1000px;
min-width: 1180px;
height: 30px;
margin-bottom: 20px;
background-color: #f0f0f0;
Expand All @@ -38,7 +38,7 @@
float: left;
clear: both;
width: 100%;
min-width: 1000px;
min-width: 1180px;
height: 30px;
border: 1px solid #e0e0e0;
border-radius: 4px 4px 0px 0px;
Expand All @@ -58,18 +58,22 @@
{
clear: both;
width: 100%;
min-width: 1000px;
min-width: 1180px;
min-height: 38px;
margin-bottom: 10px;
border: 1px solid #e0e0e0;
border-top: none;
border-radius: 0px 0px 4px 4px;
font-size: 26px;
color: #e0e0e0;
}
.session
{
width: 100%;
border-bottom: 1px solid #e0e0e0;
font-family: arial, verdana;
font-size: 13px;
color: #404040;
}
.session:nth-child(even)
{
Expand All @@ -79,14 +83,19 @@
{
background-color: #f8f8f8;
}
.session:last-child
{
border-radius: 0px 0px 4px 4px;
}
.session:hover
{
background-color: #d0d0d0;
}
.session:first-child:hover
{
background-color: #f8f8f8 !important;
}
.session:last-child
{
border-radius: 0px 0px 4px 4px;
border-style: none !important;
}
.session span
{
display: inline-block;
Expand Down Expand Up @@ -116,7 +125,7 @@
}
.small
{
width: 50px;
width: 70px;
text-align: center !important;
}
</style>
Expand Down Expand Up @@ -146,7 +155,7 @@

function terminate(id)
{
$.ajax({ url: sprintf('/close/%s', id) });
$.ajax({ url: sprintf('/abort/%s', id) });
return false;
}

Expand Down Expand Up @@ -178,19 +187,31 @@
'<span style="float:right">%d connection%s</span>' +
'</div><div class="sessions">',
service, total[1], total[0], ids.length, ids.length > 1 ? "s" : "");
if (ids.length > 0)
{
services += '<div class="session"><span class="large">source address</span><span class="large">target address</span>' +
'<span class="medium">duration</span><span class="xlarge">uplink &#9650;</span>' +
'<span class="xlarge">downlink &#9660;</span><span class="large">metadata</span><span class="small">abort</span></div>';
}
else
{
services += '<div style="text-align:center;">no active session</div>';
}
$.each(ids, function(index, session)
{
info = sessions[session[0]];
services += sprintf('<div class="session%s">' +
'<span class="large">%s</span><span class="large">%s</span><span class="medium">%s</span>' +
'<span class="xlarge">%s - %.1fMbps - %.1fMbps &#9650;</span>' +
'<span class="xlarge">%s - %.1fMbps - %.1fMbps &#9660;</span>' +
'<span class="xlarge">%s - %.1fMbps - %.1fMbps</span>' +
'<span class="xlarge">%s - %.1fMbps - %.1fMbps</span>' +
'<span class="large">%s</span>' +
'<span class="small"><a href="#" onclick="return terminate(\'%s\');">&#10060;</a></span>' +
'</div>',
info.done ? " done" : "",
info.source, info.target, age(info.duration),
size(info.bytes[0]), info.mean[0], info.last[0],
size(info.bytes[1]), info.mean[1], info.last[1], session[0]);
size(info.bytes[1]), info.mean[1], info.last[1],
info.meta, session[0]);
});
services += '</div>';
});
Expand Down
52 changes: 43 additions & 9 deletions tcpsplice.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,23 @@ import (
"net/http"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
)

const progname = "tcpsplice"
const version = "1.0.1"
const version = "1.0.2"

type Session struct {
id, service, source, target string
id, service, source, target, meta string
started, active, last time.Time
sourceRead, targetWritten, targetRead, sourceWritten int64
sourceMeanThroughput, targetMeanThroughput float64
sourceLastThroughput, targetLastThroughput float64
loggued, done bool
closer chan bool
abort chan bool
}

var (
Expand All @@ -54,6 +55,8 @@ func serviceHandler(service string, listener *net.TCPListener) {
sessionMinimumSize := config.GetSizeBounds(fmt.Sprintf("services.%s.session_minimum_size", service), 1024, 0, math.MaxInt64)
writeTimeout := time.Second * time.Duration(config.GetDurationBounds(fmt.Sprintf("services.%s.write_timeout", service), 10, 2, 60))
idleTimeout := time.Second * time.Duration(config.GetDurationBounds(fmt.Sprintf("services.%s.idle_timeout", service), 60, 0, 300))
metaSize := int(config.GetSizeBounds(fmt.Sprintf("services.%s.meta_size", service), 16*1024, 0, 64*1024))
metaScan, _ := regexp.Compile(config.GetString(fmt.Sprintf("services.%s.meta_scan", service), "["))
source.SetReadBuffer(int(incomingSize))
source.SetWriteBuffer(int(incomingSize))
target.(*net.TCPConn).SetReadBuffer(int(outgoingSize))
Expand All @@ -66,7 +69,8 @@ func serviceHandler(service string, listener *net.TCPListener) {
started: time.Now(),
active: time.Now(),
last: time.Time{},
closer: make(chan bool, 1),
meta: "-",
abort: make(chan bool, 1),
}
if logMinimumSize == 0 {
session.loggued = true
Expand All @@ -77,9 +81,23 @@ func serviceHandler(service string, listener *net.TCPListener) {
}
go func() {
data := make([]byte, incomingSize)
meta := make([]byte, 0, metaSize)
for {
read, err := source.Read(data)
if read > 0 {
if metaScan != nil && session.meta == "-" && len(meta) < metaSize {
length := int(math.Min(float64(read), float64(metaSize-len(meta))))
meta = append(meta, data[:length]...)
if metaScan.Match(meta) {
session.meta = ""
for _, part := range metaScan.FindSubmatch(meta)[1:] {
session.meta += string(part)
}
if session.meta == "" {
session.meta = "-"
}
}
}
session.sourceRead += int64(read)
session.active = time.Now()
target.SetWriteDeadline(time.Now().Add(writeTimeout))
Expand All @@ -100,11 +118,25 @@ func serviceHandler(service string, listener *net.TCPListener) {
target.Close()
}()
data := make([]byte, outgoingSize)
meta := make([]byte, 0, metaSize)
close := false
for !close {
target.SetReadDeadline(time.Now().Add(time.Second))
read, err := target.Read(data)
if read > 0 {
if metaScan != nil && session.meta == "-" && len(meta) < metaSize {
length := int(math.Min(float64(read), float64(metaSize-len(meta))))
meta = append(meta, data[:length]...)
if metaScan.Match(meta) {
session.meta = ""
for _, part := range metaScan.FindSubmatch(meta)[1:] {
session.meta += string(part)
}
if session.meta == "" {
session.meta = "-"
}
}
}
session.targetRead += int64(read)
session.active = time.Now()
source.SetWriteDeadline(time.Now().Add(writeTimeout))
Expand All @@ -131,7 +163,7 @@ func serviceHandler(service string, listener *net.TCPListener) {
statistics <- session
}
select {
case close = <-session.closer:
case close = <-session.abort:
default:
}
if idleTimeout != 0 && time.Now().Sub(session.active) >= idleTimeout {
Expand Down Expand Up @@ -187,6 +219,7 @@ func monitorHandler(response http.ResponseWriter, request *http.Request) {
"bytes": [2]int64{session.sourceRead, session.targetRead},
"mean": [2]float64{session.sourceMeanThroughput, session.targetMeanThroughput},
"last": [2]float64{session.sourceLastThroughput, session.targetLastThroughput},
"meta": session.meta,
"done": session.done,
}
}
Expand All @@ -198,13 +231,13 @@ func monitorHandler(response http.ResponseWriter, request *http.Request) {
if json, err := json.Marshal(output); err == nil {
response.Write(json)
}
} else if strings.Index(request.URL.Path, "/close/") == 0 {
} else if strings.Index(request.URL.Path, "/abort/") == 0 {
sessionsLock.RLock()
for id, session := range sessions {
if id == request.URL.Path[7:] && !session.done {
session.done = true
session.closer <- true
logger.Warn(map[string]interface{}{"type": "close", "id": id, "service": session.service, "source": session.source, "target": session.target})
session.abort <- true
logger.Warn(map[string]interface{}{"type": "abort", "id": id, "service": session.service, "source": session.source, "target": session.target})
break
}
}
Expand Down Expand Up @@ -316,7 +349,7 @@ func main() {
case session := <-statistics:
sessionsLock.Lock()
if sessions[session.id] == nil {
sessions[session.id] = &Session{service: session.service, source: session.source, target: session.target, started: session.started, closer: session.closer}
sessions[session.id] = &Session{service: session.service, source: session.source, target: session.target, started: session.started, abort: session.abort}
}
sessionsLock.Unlock()
duration := math.Max(float64(time.Now().Sub(session.started))/float64(time.Second), 0.001)
Expand All @@ -332,6 +365,7 @@ func main() {
}
sessions[session.id].last = session.last
sessions[session.id].done = session.done
sessions[session.id].meta = session.meta
sessions[session.id].sourceRead = session.sourceRead
sessions[session.id].targetWritten = session.targetWritten
sessions[session.id].targetRead = session.targetRead
Expand Down

0 comments on commit 3f0d66e

Please sign in to comment.