Skip to content

Commit

Permalink
优化
Browse files Browse the repository at this point in the history
  • Loading branch information
456vv committed Apr 25, 2024
1 parent 6d6baeb commit 6479aca
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 193 deletions.
92 changes: 45 additions & 47 deletions d2d.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package vforward
import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
Expand All @@ -28,7 +27,13 @@ type D2DSwap struct {
//
// int 实时连接数量
func (T *D2DSwap) ConnNum() int {
return T.dd.currUseConns()
var i int = int(atomic.LoadInt32(&T.dd.currUseConn))
// 1个等1个
// 2个等1个
// 3个等2个
// 4个等2个
// 5个等3个
return (i / 2) + (i % 2)
}

// Swap 开始数据交换,它是从连接池中读出空闲连接,进行双方交换数据。
Expand Down Expand Up @@ -62,6 +67,7 @@ func (T *D2DSwap) Swap() error {

// 等待
if T.dd.acp.ConnNum() <= 0 || T.dd.bcp.ConnNum() <= 0 || T.dd.backPooling.isTrue() {
// 延时
wait = delay(wait, maxWait)
continue
}
Expand All @@ -73,6 +79,7 @@ func (T *D2DSwap) Swap() error {
atomic.AddInt32(&T.dd.currUseConn, -1)
continue
}

atomic.AddInt32(&T.dd.currUseConn, 1)
connb, err := T.dd.bGetConn()
if err != nil {
Expand All @@ -84,45 +91,44 @@ func (T *D2DSwap) Swap() error {
continue
}

bufSize := T.dd.ReadBufSize
if bufSize == 0 {
bufSize = DefaultReadBufSize
}
go T.dataCopy(conna, connb)
}
}

go func(conna, connb net.Conn, dd *D2D, T *D2DSwap, bufSize int) {
defer atomic.AddInt32(&dd.currUseConn, -2)
func (T *D2DSwap) dataCopy(conna, connb net.Conn) {
bufSize := T.dd.ReadBufSize
if bufSize == 0 {
bufSize = DefaultReadBufSize
}

if T.closed.isTrue() {
conna.Close()
connb.Close()
return
}
defer atomic.AddInt32(&T.dd.currUseConn, -2)

// 记录当前连接
T.conns.Set(conna, connb)
defer T.conns.Del(conna)

//----------------------------
var err error
if T.Verify != nil {
conna, connb, err = T.Verify(conna, connb)
if err != nil {
T.dd.logf(err.Error())
return
}
}
if T.closed.isTrue() {
conna.Close()
connb.Close()
return
}

//----------------------------
go func(conna, connb net.Conn, dd *D2D, T *D2DSwap, bufSize int) {
copyData(conna, connb, bufSize)
conna.Close()
}(conna, connb, dd, T, bufSize)
// 记录当前连接
T.conns.Set(conna, connb)
defer T.conns.Del(conna)

//----------------------------
copyData(connb, conna, bufSize)
connb.Close()
}(conna, connb, T.dd, T, bufSize)
//----------------------------
var err error
if T.Verify != nil {
conna, connb, err = T.Verify(conna, connb)
if err != nil {
T.dd.logf("验证失败: %s", err)
return
}
}

go func() {
copyData(conna, connb, bufSize)
conna.Close()
}()
copyData(connb, conna, bufSize)
connb.Close()
}

// Close 关闭数据交换 .Swap(),你还可以再次使用 .Swap() 启动。
Expand Down Expand Up @@ -278,12 +284,7 @@ func (T *D2D) Close() error {

// 当前连接数量
func (T *D2D) currUseConns() int {
var i int = int(atomic.LoadInt32(&T.currUseConn))
if i%2 != 0 {
return (i / 2) + 1
} else {
return (i / 2)
}
return int(atomic.LoadInt32(&T.currUseConn)) / 2
}

// 快速取得连接
Expand Down Expand Up @@ -346,8 +347,9 @@ func (T *D2D) examineConn(cp *vconnpool.ConnPool, addr *Addr, verify *func(net.C
return
}

conn = conn.(vconnpool.Conn).RawConn()
conn = conn.(vconnpool.Conn).RawConn() // 不是从池中读取出来的,可以直接转
if *verify != nil && !(*verify)(conn) {
T.logf("%s 连接验证失败", conn.RemoteAddr().String())
conn.Close()
return
}
Expand All @@ -362,9 +364,5 @@ func (T *D2D) examineConn(cp *vconnpool.ConnPool, addr *Addr, verify *func(net.C
}

func (T *D2D) logf(format string, v ...interface{}) {
if T.ErrorLog != nil {
T.ErrorLog.Output(2, fmt.Sprintf(format+"\n", v...))
return
}
log.Printf(format+"\n", v...)
errLog(T.ErrorLog, format, v...)
}
11 changes: 11 additions & 0 deletions forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package vforward

import (
"errors"
"fmt"
"io"
"log"
"net"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -82,3 +84,12 @@ func connectUDP(addr *Addr) (net.Conn, error) {
}
return nil, errors.New("vforward: 远程地址类型是未知的")
}

func errLog(ErrorLog *log.Logger, format string, v ...interface{}) {
str := fmt.Sprintf(format+"\n", v...)
if ErrorLog != nil {
ErrorLog.Output(2, str)
return
}
log.Print(str)
}
Loading

0 comments on commit 6479aca

Please sign in to comment.