-
Notifications
You must be signed in to change notification settings - Fork 5
/
Transfer Virtual Connection.go
133 lines (106 loc) · 4.01 KB
/
Transfer Virtual Connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
File Username: Transfer Virtual Connection.go
Copyright: 2021 Peernet s.r.o.
Author: Peter Kleissner
This file defines a virtual connection between a transfer protocol and Peernet messages.
If either the downstream transfer protocol or upstream Peernet messages indicate termination, the virtual connection ceases to exist.
*/
package core
import (
"sync"
"github.com/google/uuid"
)
// VirtualPacketConn is a virtual connection.
type VirtualPacketConn struct {
Peer *PeerInfo
// Stats are maintained by the caller
Stats interface{}
// function to send data to the remote peer
sendData func(data []byte, sequenceNumber uint32, transferID uuid.UUID)
// Sequence number from the first outgoing or incoming packet.
sequenceNumber uint32
// Transfer ID represents a session ID valid only for the duration of the transfer.
transferID uuid.UUID
// data channel
incomingData chan []byte
outgoingData chan []byte
// internal data
closed bool
terminationSignal chan struct{} // The termination signal shall be used by the underlying protocol to detect upstream termination.
reason int // Reason why it was closed
sync.Mutex
}
// newVirtualPacketConn creates a new virtual connection (both incoming and outgoing).
func newVirtualPacketConn(peer *PeerInfo, sendData func(data []byte, sequenceNumber uint32, transferID uuid.UUID)) (v *VirtualPacketConn) {
v = &VirtualPacketConn{
Peer: peer,
sendData: sendData,
incomingData: make(chan []byte, 512),
outgoingData: make(chan []byte),
terminationSignal: make(chan struct{}),
}
go v.writeForward()
return
}
// writeForward forwards outgoing messages
func (v *VirtualPacketConn) writeForward() {
for {
select {
case data := <-v.outgoingData:
v.sendData(data, v.sequenceNumber, v.transferID)
case <-v.terminationSignal:
return
}
}
}
// receiveData receives incoming data via an external message. Non-blocking.
func (v *VirtualPacketConn) receiveData(data []byte) {
if v.IsTerminated() {
return
}
// pass the data on
select {
case v.incomingData <- data:
case <-v.terminationSignal:
default:
// packet lost
}
}
// Terminate closes the connection. Do not call this function manually. Use the underlying protocol's function to close the connection.
// Reason: 404 = Remote peer does not store file (upstream), 2 = Remote termination signal (upstream), 3 = Sequence invalidation or expiration (upstream), 1000+ = Transfer protocol indicated closing (downstream)
func (v *VirtualPacketConn) Terminate(reason int) (err error) {
v.Lock()
defer v.Unlock()
if v.closed { // if already closed, take no action
return
}
v.closed = true
v.reason = reason
close(v.terminationSignal)
return
}
// IsTerminated checks if the connection is terminated
func (v *VirtualPacketConn) IsTerminated() bool {
return v.closed
}
// sequenceTerminate is a wrapper for sequenece termination (invalidation or expiration)
func (v *VirtualPacketConn) sequenceTerminate() {
v.Terminate(3)
}
// Close provides a Close function to be called by the underlying transfer protocol.
// Do not call the function manually; otherwise the underlying transfer protocol may not have time to send a termination message (and the remote peer would subsequently try to reconnect).
// Rather, use the underlying transfer protocol's close function.
func (v *VirtualPacketConn) Close(reason int) (err error) {
v.Peer.Backend.networks.Sequences.InvalidateSequence(v.Peer.PublicKey, v.sequenceNumber, true)
return v.Terminate(reason)
}
// CloseLinger is to be called by the underlying transfer protocol when it will close the socket soon after lingering around.
// Lingering happens to resend packets at the end of transfer, when it is not immediately known whether the remote peer received all packets.
func (v *VirtualPacketConn) CloseLinger(reason int) (err error) {
v.reason = reason
return nil
}
// GetTerminateReason returns the termination reason. 0 = Not yet terminated.
func (v *VirtualPacketConn) GetTerminateReason() int {
return v.reason
}