Skip to content

Commit

Permalink
DialUDP function
Browse files Browse the repository at this point in the history
  • Loading branch information
DarienRaymond committed Jan 5, 2019
1 parent 21f8bfe commit b52725c
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 10 deletions.
6 changes: 4 additions & 2 deletions app/dns/udpns.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"golang.org/x/net/dns/dnsmessage"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
"v2ray.com/core/common/protocol/dns"
udp_proto "v2ray.com/core/common/protocol/udp"
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal/pubsub"
"v2ray.com/core/common/task"
Expand Down Expand Up @@ -101,7 +101,9 @@ func (s *ClassicNameServer) Cleanup() error {
return nil
}

func (s *ClassicNameServer) HandleResponse(ctx context.Context, payload *buf.Buffer) {
func (s *ClassicNameServer) HandleResponse(ctx context.Context, packet *udp_proto.Packet) {
payload := packet.Payload

var parser dnsmessage.Parser
header, err := parser.Start(payload.Bytes())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion app/proxyman/inbound/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (w *udpWorker) removeConn(id connID) {
func (w *udpWorker) handlePackets() {
receive := w.hub.Receive()
for payload := range receive {
w.callback(payload.Content, payload.Source, payload.OriginalDestination)
w.callback(payload.Payload, payload.Source, payload.Target)
}
}

Expand Down
15 changes: 15 additions & 0 deletions functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"v2ray.com/core/common"
"v2ray.com/core/common/net"
"v2ray.com/core/features/routing"
"v2ray.com/core/transport/internet/udp"
)

// CreateObject creates a new object based on the given V2Ray instance and config. The V2Ray instance may be nil.
Expand Down Expand Up @@ -54,3 +55,17 @@ func Dial(ctx context.Context, v *Instance, dest net.Destination) (net.Conn, err
}
return net.NewConnection(net.ConnectionInputMulti(r.Writer), net.ConnectionOutputMulti(r.Reader)), nil
}

// DialUDP provides a way to exchange UDP packets through V2Ray instance to remote servers.
// Since it is under a proxy context, the LocalAddr() in returned PacketConn will not show the real address.
//
// TODO: SetDeadline() / SetReadDeadline() / SetWriteDeadline() are not implemented.
//
// v2ray:api:beta
func DialUDP(ctx context.Context, v *Instance) (net.PacketConn, error) {
dispatcher := v.GetFeature(routing.DispatcherType())
if dispatcher == nil {
return nil, newError("routing.Dispatcher is not registered in V2Ray core")
}
return udp.DialDispatcher(ctx, dispatcher.(routing.Dispatcher))
}
170 changes: 170 additions & 0 deletions functions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package core_test

import (
"context"
"crypto/rand"
"io"
"testing"

"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"

"v2ray.com/core"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/app/proxyman"
"v2ray.com/core/common"
"v2ray.com/core/common/net"
"v2ray.com/core/common/serial"
"v2ray.com/core/proxy/freedom"
"v2ray.com/core/testing/servers/tcp"
"v2ray.com/core/testing/servers/udp"
)

func xor(b []byte) []byte {
r := make([]byte, len(b))
for i, v := range b {
r[i] = v ^ 'c'
}
return r
}

func xor2(b []byte) []byte {
r := make([]byte, len(b))
for i, v := range b {
r[i] = v ^ 'd'
}
return r
}

func TestV2RayDial(t *testing.T) {
tcpServer := tcp.Server{
MsgProcessor: xor,
}
dest, err := tcpServer.Start()
common.Must(err)
defer tcpServer.Close()

config := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&dispatcher.Config{}),
serial.ToTypedMessage(&proxyman.InboundConfig{}),
serial.ToTypedMessage(&proxyman.OutboundConfig{}),
},
Outbound: []*core.OutboundHandlerConfig{
{
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
},
},
}

cfgBytes, err := proto.Marshal(config)
common.Must(err)

server, err := core.StartInstance("protobuf", cfgBytes)
common.Must(err)
defer server.Close()

conn, err := core.Dial(context.Background(), server, dest)
common.Must(err)
defer conn.Close()

const size = 10240 * 1024
payload := make([]byte, size)
common.Must2(rand.Read(payload))

if _, err := conn.Write(payload); err != nil {
t.Fatal(err)
}

receive := make([]byte, size)
if _, err := io.ReadFull(conn, receive); err != nil {
t.Fatal("failed to read all response: ", err)
}

if r := cmp.Diff(xor(receive), payload); r != "" {
t.Error(r)
}
}

func TestV2RayDialUDP(t *testing.T) {
udpServer1 := udp.Server{
MsgProcessor: xor,
}
dest1, err := udpServer1.Start()
common.Must(err)
defer udpServer1.Close()

udpServer2 := udp.Server{
MsgProcessor: xor2,
}
dest2, err := udpServer2.Start()
common.Must(err)
defer udpServer2.Close()

config := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&dispatcher.Config{}),
serial.ToTypedMessage(&proxyman.InboundConfig{}),
serial.ToTypedMessage(&proxyman.OutboundConfig{}),
},
Outbound: []*core.OutboundHandlerConfig{
{
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
},
},
}

cfgBytes, err := proto.Marshal(config)
common.Must(err)

server, err := core.StartInstance("protobuf", cfgBytes)
common.Must(err)
defer server.Close()

conn, err := core.DialUDP(context.Background(), server)
common.Must(err)
defer conn.Close()

const size = 1024
{
payload := make([]byte, size)
common.Must2(rand.Read(payload))

if _, err := conn.WriteTo(payload, &net.UDPAddr{
IP: dest1.Address.IP(),
Port: int(dest1.Port),
}); err != nil {
t.Fatal(err)
}

receive := make([]byte, size)
if _, _, err := conn.ReadFrom(receive); err != nil {
t.Fatal(err)
}

if r := cmp.Diff(xor(receive), payload); r != "" {
t.Error(r)
}
}

{
payload := make([]byte, size)
common.Must2(rand.Read(payload))

if _, err := conn.WriteTo(payload, &net.UDPAddr{
IP: dest2.Address.IP(),
Port: int(dest2.Port),
}); err != nil {
t.Fatal(err)
}

receive := make([]byte, size)
if _, _, err := conn.ReadFrom(receive); err != nil {
t.Fatal(err)
}

if r := cmp.Diff(xor2(receive), payload); r != "" {
t.Error(r)
}
}
}
4 changes: 3 additions & 1 deletion proxy/shadowsocks/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"v2ray.com/core/common/log"
"v2ray.com/core/common/net"
"v2ray.com/core/common/protocol"
udp_proto "v2ray.com/core/common/protocol/udp"
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal"
"v2ray.com/core/common/task"
Expand Down Expand Up @@ -69,12 +70,13 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
}

func (s *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error {
udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, payload *buf.Buffer) {
udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, packet *udp_proto.Packet) {
request := protocol.RequestHeaderFromContext(ctx)
if request == nil {
return
}

payload := packet.Payload
data, err := EncodeUDPPacket(request, payload.Bytes())
payload.Release()
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion proxy/socks/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"v2ray.com/core/common/log"
"v2ray.com/core/common/net"
"v2ray.com/core/common/protocol"
udp_proto "v2ray.com/core/common/protocol/udp"
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal"
"v2ray.com/core/common/task"
Expand Down Expand Up @@ -174,7 +175,8 @@ func (s *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
}

func (s *Server) handleUDPPayload(ctx context.Context, conn internet.Connection, dispatcher routing.Dispatcher) error {
udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, payload *buf.Buffer) {
udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, packet *udp_proto.Packet) {
payload := packet.Payload
newError("writing back UDP response with ", payload.Len(), " bytes").AtDebug().WriteToLog(session.ExportIDToError(ctx))

request := protocol.RequestHeaderFromContext(ctx)
Expand Down
91 changes: 87 additions & 4 deletions transport/internet/udp/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@ package udp

import (
"context"
"io"
"sync"
"time"

"v2ray.com/core/common/signal/done"

"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
"v2ray.com/core/common/protocol/udp"
"v2ray.com/core/common/session"
"v2ray.com/core/common/signal"
"v2ray.com/core/features/routing"
"v2ray.com/core/transport"
)

type ResponseCallback func(ctx context.Context, payload *buf.Buffer)
type ResponseCallback func(ctx context.Context, packet *udp.Packet)

type connEntry struct {
link *transport.Link
Expand Down Expand Up @@ -70,7 +74,7 @@ func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) *c
cancel: removeRay,
}
v.conns[dest] = entry
go handleInput(ctx, entry, v.callback)
go handleInput(ctx, entry, dest, v.callback)
return entry
}

Expand All @@ -89,7 +93,7 @@ func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination,
}
}

func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback) {
func handleInput(ctx context.Context, conn *connEntry, dest net.Destination, callback ResponseCallback) {
defer conn.cancel()

input := conn.link.Reader
Expand All @@ -109,7 +113,86 @@ func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback
}
timer.Update()
for _, b := range mb {
callback(ctx, b)
callback(ctx, &udp.Packet{
Payload: b,
Source: dest,
})
}
}
}

type dispatcherConn struct {
dispatcher *Dispatcher
cache chan *udp.Packet
done *done.Instance
}

func DialDispatcher(ctx context.Context, dispatcher routing.Dispatcher) (net.PacketConn, error) {
c := &dispatcherConn{
cache: make(chan *udp.Packet, 16),
done: done.New(),
}

d := NewDispatcher(dispatcher, c.callback)
c.dispatcher = d
return c, nil
}

func (c *dispatcherConn) callback(ctx context.Context, packet *udp.Packet) {
select {
case <-c.done.Wait():
packet.Payload.Release()
return
case c.cache <- packet:
default:
packet.Payload.Release()
return
}
}

func (c *dispatcherConn) ReadFrom(p []byte) (int, net.Addr, error) {
select {
case <-c.done.Wait():
return 0, nil, io.EOF
case packet := <-c.cache:
n := copy(p, packet.Payload.Bytes())
return n, &net.UDPAddr{
IP: packet.Source.Address.IP(),
Port: int(packet.Source.Port),
}, nil
}
}

func (c *dispatcherConn) WriteTo(p []byte, addr net.Addr) (int, error) {
buffer := buf.New()
raw := buffer.Extend(buf.Size)
n := copy(raw, p)
buffer.Resize(0, int32(n))

ctx := context.Background()
c.dispatcher.Dispatch(ctx, net.DestinationFromAddr(addr), buffer)
return n, nil
}

func (c *dispatcherConn) Close() error {
return c.done.Close()
}

func (c *dispatcherConn) LocalAddr() net.Addr {
return &net.UDPAddr{
IP: []byte{0, 0, 0, 0},
Port: 0,
}
}

func (c *dispatcherConn) SetDeadline(t time.Time) error {
return nil
}

func (c *dispatcherConn) SetReadDeadline(t time.Time) error {
return nil
}

func (c *dispatcherConn) SetWriteDeadline(t time.Time) error {
return nil
}
Loading

0 comments on commit b52725c

Please sign in to comment.