Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP rhp3 #178

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 97 additions & 11 deletions renterhost/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,9 @@ func (m *Mux) AcceptStream() (*Stream, error) {
}
}

// DialStream creates a new Stream.
//
// Unlike e.g. net.Dial, this does not perform any I/O; the peer will not be
// aware of the new Stream until Write is called.
func (m *Mux) DialStream() (*Stream, error) {
// NewStream creates a new Stream. The peer will not be aware of the new Stream
// until Write is called.
func (m *Mux) NewStream() (*Stream, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.err != nil {
Expand Down Expand Up @@ -284,7 +282,18 @@ func Dial(conn net.Conn, theirKey ed25519.PublicKey) (*Mux, error) {
if err != nil {
return nil, fmt.Errorf("encryption handshake failed: %w", err)
}
settings, err := initiateSettingsHandshake(conn, defaultConnSettings, aead)

// the siad implementation determines the initial packet size for the handshake
// based on the connection's IP address instead of hardcoding it.
ours := defaultConnSettings
host, _, err := net.SplitHostPort(conn.RemoteAddr().String())
if err != nil {
return nil, fmt.Errorf("could not parse remote address: %w", err)
} else if net.ParseIP(host).To4() != nil {
ours.RequestedPacketSize = 1460 // IPv4 MTU
}

settings, err := initiateSettingsHandshake(conn, ours, aead)
if err != nil {
return nil, fmt.Errorf("settings handshake failed: %w", err)
}
Expand Down Expand Up @@ -320,6 +329,20 @@ type Stream struct {
err error
readBuf []byte
rd, wd time.Time // deadlines

// SiaMux handles the subscriber handshake asynchronously, it is expected
// that writes to the peer will continue uninterupted before the handshake
// is complete. The subscriber must be prependend to the first call to
// Write(), but the response may not be sent until future calls to Write()
// have completed. This means the response must be read from the read buffer
// before other calls to Read() can complete.
writeMu sync.Mutex // guards the lazy write buffer
writeBuf []byte
// guards calls to Read(). Calls to Read() should take an RLock on the mutex.
// Calling exclusiveReader() should takes a WLock to prevent other calls
// to Read() while the exclusive reader is open. A mutex benchmarked better
// than the channel that the existing siamux uses.
readMu sync.RWMutex
}

// LocalAddr returns the underlying connection's LocalAddr.
Expand Down Expand Up @@ -364,7 +387,7 @@ func (s *Stream) SetWriteDeadline(t time.Time) error {
}

// consumeFrame stores a frame in s.read and waits for it to be consumed by
// (*Stream).Read calls.
// (*Stream).read calls.
func (s *Stream) consumeFrame(h frameHeader, payload []byte) {
s.cond.L.Lock()
defer s.cond.L.Unlock()
Expand All @@ -391,8 +414,8 @@ func (s *Stream) consumeFrame(h frameHeader, payload []byte) {
}
}

// Read reads data from the Stream.
func (s *Stream) Read(p []byte) (int, error) {
// read reads data from the Stream.
func (s *Stream) read(p []byte) (int, error) {
s.cond.L.Lock()
defer s.cond.L.Unlock()
if !s.rd.IsZero() {
Expand All @@ -417,7 +440,7 @@ func (s *Stream) Read(p []byte) (int, error) {
}

// Write writes data to the Stream.
func (s *Stream) Write(p []byte) (int, error) {
func (s *Stream) write(p []byte) (int, error) {
buf := bytes.NewBuffer(p)
for buf.Len() > 0 {
// check for error
Expand All @@ -437,14 +460,53 @@ func (s *Stream) Write(p []byte) (int, error) {
return len(p), nil
}

// lazyWrite prepends data to the next frame written to the stream.
func (s *Stream) lazyWrite(p []byte) {
s.writeMu.Lock()
s.writeBuf = append(s.writeBuf, p...)
s.writeMu.Unlock()
}

// Read reads data from the Stream.
func (s *Stream) Read(p []byte) (int, error) {
s.readMu.RLock()
defer s.readMu.RUnlock()
return s.read(p)
}

// Write writes data to the Stream.
func (s *Stream) Write(p []byte) (n int, err error) {
s.writeMu.Lock()
var m int
if len(s.writeBuf) != 0 {
m = len(s.writeBuf)
p = append(s.writeBuf, p...)
s.writeBuf = nil
}
s.writeMu.Unlock()
n, err = s.write(p)
if n >= m {
n -= m
}
return
}

// exclusiveReader returns a reader with exclusive access to the stream's read
// buffer. Direct reads from the stream are blocked until the returned reader
// is closed.
func (s *Stream) exclusiveReader() io.ReadCloser {
s.readMu.Lock()
return &exclusiveReader{s: s}
}

// Close closes the Stream. The underlying connection is not closed.
func (s *Stream) Close() error {
h := frameHeader{
id: s.id,
flags: flagFinal,
}
err := s.m.bufferFrame(h, nil, s.wd)
if err == ErrPeerClosedStream {
if errors.Is(err, ErrPeerClosedStream) {
err = nil
}

Expand All @@ -461,4 +523,28 @@ func (s *Stream) Close() error {
return err
}

// exclusiveReader is a reader that provides exclusive read access to a stream's
// underlying read buffer. Any other calls to Read() on the underlying stream
// will be blocked until the exclusive reader is closed.
type exclusiveReader struct {
s *Stream
closed bool
}

// Read reads data from the underlying stream. Implements io.Reader.
func (r *exclusiveReader) Read(p []byte) (int, error) {
if r.closed {
return 0, ErrClosedStream
}
return r.s.read(p)
}

// Close closes the reader and unlocks the underlying stream. The underlying
// stream is not closed.
func (r *exclusiveReader) Close() error {
r.closed = true
r.s.readMu.Unlock()
return nil
}

var _ net.Conn = (*Stream)(nil)
152 changes: 6 additions & 146 deletions renterhost/mux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net"
"os"
"sync"
Expand All @@ -18,7 +17,6 @@ import (
"gitlab.com/NebulousLabs/siamux/mux"
"golang.org/x/crypto/chacha20poly1305"
"lukechampine.com/frand"
"lukechampine.com/us/renterhost"
)

func TestMux(t *testing.T) {
Expand Down Expand Up @@ -64,7 +62,7 @@ func TestMux(t *testing.T) {
t.Fatal(err)
}
defer m.Close()
s, err := m.DialStream()
s, err := m.NewStream()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -132,7 +130,7 @@ func TestManyStreams(t *testing.T) {
wg.Add(1)
go func(i int) {
defer wg.Done()
s, err := m.DialStream()
s, err := m.NewStream()
if err != nil {
errChan <- err
return
Expand Down Expand Up @@ -220,7 +218,7 @@ func TestDeadline(t *testing.T) {
defer m.Close()

// a Read deadline should not timeout a Write
s, err := m.DialStream()
s, err := m.NewStream()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -270,7 +268,7 @@ func TestDeadline(t *testing.T) {
}
for i, test := range tests {
err := func() error {
s, err := m.DialStream()
s, err := m.NewStream()
if err != nil {
return err
}
Expand Down Expand Up @@ -346,7 +344,7 @@ func TestCompatibility(t *testing.T) {
t.Fatal(err)
}
defer m.Close()
s, err := m.DialStream()
s, err := m.NewStream()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -482,7 +480,7 @@ func BenchmarkMux(b *testing.B) {
for j := 0; j < numStreams; j++ {
go func() {
defer wg.Done()
s, err := m.DialStream()
s, err := m.NewStream()
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -552,141 +550,3 @@ func BenchmarkConn(b *testing.B) {
}
}
}

func BenchmarkSiaMux(b *testing.B) {
for _, numStreams := range []int{1, 2, 10, 100, 500, 1000} {
b.Run(fmt.Sprint(numStreams), func(b *testing.B) {
sk, pk := mux.GenerateED25519KeyPair()
l, err := net.Listen("tcp", ":0")
if err != nil {
b.Fatal(err)
}
defer l.Close()
serverCh := make(chan error, 1)
go func() {
serverCh <- func() error {
conn, err := l.Accept()
if err != nil {
return err
}
server, err := mux.NewServerMux(context.Background(), conn, pk, sk, log.DiscardLogger, func(*mux.Mux) {}, func(*mux.Mux) {})
if err != nil {
panic(err)
}
defer server.Close()
for {
s, err := server.AcceptStream()
if err != nil {
return err
}
go func() {
io.Copy(ioutil.Discard, s)
s.Close()
}()
}
}()
}()
defer func() {
if err := <-serverCh; err != nil && err != io.EOF {
b.Fatal(err)
}
}()

conn, err := net.Dial("tcp", l.Addr().String())
if err != nil {
b.Fatal(err)
}
client, err := mux.NewClientMux(context.Background(), conn, pk, log.DiscardLogger, func(*mux.Mux) {}, func(*mux.Mux) {})
if err != nil {
b.Fatal(err)
}
defer client.Close()

// open each stream in a separate goroutine
bufSize := defaultConnSettings.maxPayloadSize()
buf := make([]byte, bufSize)
b.ResetTimer()
b.SetBytes(int64(bufSize * numStreams))
b.ReportAllocs()
start := time.Now()
var wg sync.WaitGroup
wg.Add(numStreams)
for j := 0; j < numStreams; j++ {
go func() {
defer wg.Done()
stream, err := client.NewStream()
if err != nil {
panic(err)
}
defer stream.Close()
for i := 0; i < b.N; i++ {
if _, err := stream.Write(buf); err != nil {
panic(err)
}
}
}()
}
wg.Wait()
b.ReportMetric(float64(b.N*numStreams)/time.Since(start).Seconds(), "conns/sec")
})
}
}

func BenchmarkRHP2(b *testing.B) {
serverKey := ed25519.NewKeyFromSeed(frand.Bytes(ed25519.SeedSize))
l, err := net.Listen("tcp", ":0")
if err != nil {
b.Fatal(err)
}
defer l.Close()
serverCh := make(chan error, 1)
go func() {
serverCh <- func() error {
conn, err := l.Accept()
if err != nil {
return err
}
s, err := renterhost.NewHostSession(conn, serverKey)
if err != nil {
return err
}
defer s.Close()
var req renterhost.RPCWriteRequest
for i := 0; i < b.N; i++ {
if err := s.ReadResponse(&req, math.MaxUint64); err != nil {
return err
}
}
return nil
}()
}()
defer func() {
if err := <-serverCh; err != nil {
b.Log(err)
}
}()

conn, err := net.Dial("tcp", l.Addr().String())
if err != nil {
b.Fatal(err)
}
s, err := renterhost.NewRenterSession(conn, serverKey.Public().(ed25519.PublicKey))
if err != nil {
b.Fatal(err)
}
defer s.Close()

req := &renterhost.RPCWriteRequest{
Actions: []renterhost.RPCWriteAction{{
Data: make([]byte, defaultConnSettings.maxPayloadSize()),
}},
}
b.ResetTimer()
b.SetBytes(int64(len(req.Actions[0].Data)))
b.ReportAllocs()
for i := 0; i < b.N; i++ {
if err := s.WriteResponse(req, nil); err != nil {
b.Fatal(err)
}
}
}
Loading