forked from moriyoshi/s3-sftp-proxy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
229 lines (201 loc) · 4.6 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package main
import (
"context"
"fmt"
"io"
"net"
"sync"
"time"
"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
)
type Server struct {
*ssh.ServerConfig
*S3Buckets
*PhantomObjectMap
ReaderLookbackBufferSize int
ReaderMinChunkSize int
ListerLookbackBufferSize int
Log interface {
DebugLogger
InfoLogger
ErrorLogger
}
Now func() time.Time
}
func asHandlers(handlers interface {
sftp.FileReader
sftp.FileWriter
sftp.FileCmder
sftp.FileLister
}) sftp.Handlers {
return sftp.Handlers{handlers, handlers, handlers, handlers}
}
func (s *Server) HandleChannel(ctx context.Context, bucket *S3Bucket, sshCh ssh.Channel, reqs <-chan *ssh.Request) {
defer s.Log.Debug("HandleChannel ended")
server := sftp.NewRequestServer(
sshCh,
asHandlers(
&S3BucketIO{
Ctx: ctx,
Bucket: bucket,
ReaderLookbackBufferSize: s.ReaderLookbackBufferSize,
ReaderMinChunkSize: s.ReaderMinChunkSize,
ListerLookbackBufferSize: s.ListerLookbackBufferSize,
Log: s.Log,
PhantomObjectMap: s.PhantomObjectMap,
Perms: bucket.Perms,
ServerSideEncryption: &bucket.ServerSideEncryption,
Now: s.Now,
},
),
)
innerCtx, cancel := context.WithCancel(ctx)
defer cancel()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer s.Log.Debug("HandleChannel.discardRequest ended")
defer wg.Done()
defer cancel()
outer:
for {
select {
case <-innerCtx.Done():
break outer
case req := <-reqs:
if req == nil {
break outer
}
ok := false
if req.Type == "subsystem" && string(req.Payload[4:]) == "sftp" {
ok = true
}
req.Reply(ok, nil)
}
}
}()
wg.Add(1)
go func() {
defer s.Log.Debug("HandleChannel.serve ended")
defer wg.Done()
defer cancel()
go func() {
<-innerCtx.Done()
server.Close()
}()
if err := server.Serve(); err != io.EOF {
s.Log.Error(err.Error())
}
}()
wg.Wait()
}
func (s *Server) HandleClient(ctx context.Context, conn *net.TCPConn) error {
defer s.Log.Debug("HandleClient ended")
defer func() {
F(s.Log.Info, "connection from client %s closed", conn.RemoteAddr().String())
conn.Close()
}()
F(s.Log.Info, "connected from client %s", conn.RemoteAddr().String())
innerCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-innerCtx.Done()
conn.SetDeadline(time.Unix(1, 0))
}()
// Before use, a handshake must be performed on the incoming net.Conn.
sconn, chans, reqs, err := ssh.NewServerConn(conn, s.ServerConfig)
if err != nil {
return err
}
F(s.Log.Info, "user %s logged in", sconn.User())
bucket, ok := s.UserToBucketMap[sconn.User()]
if !ok {
return fmt.Errorf("unknown error: no bucket designated to user %s found", sconn.User())
}
wg := sync.WaitGroup{}
wg.Add(1)
go func(reqs <-chan *ssh.Request) {
defer wg.Done()
defer s.Log.Debug("HandleClient.requestHandler ended")
for _ = range reqs {
}
}(reqs)
wg.Add(1)
go func(chans <-chan ssh.NewChannel) {
defer wg.Done()
defer cancel()
defer s.Log.Debug("HandleClient.channelHandler ended")
for newSSHCh := range chans {
if newSSHCh.ChannelType() != "session" {
newSSHCh.Reject(ssh.UnknownChannelType, "unknown channel type")
F(s.Log.Info, "unknown channel type: %s", newSSHCh.ChannelType())
continue
}
F(s.Log.Info, "channel: %s", newSSHCh.ChannelType())
sshCh, reqs, err := newSSHCh.Accept()
if err != nil {
F(s.Log.Error, "could not accept channel: %s", err.Error())
break
}
wg.Add(1)
go func() {
defer wg.Done()
s.HandleChannel(innerCtx, bucket, sshCh, reqs)
}()
}
}(chans)
wg.Wait()
return nil
}
func (s *Server) RunListenerEventLoop(ctx context.Context, lsnr *net.TCPListener) error {
defer s.Log.Debug("RunListenerEventLoop ended")
wg := sync.WaitGroup{}
connChan := make(chan *net.TCPConn)
var err error
wg.Add(1)
go func() {
defer s.Log.Debug("RunListenerEventLoop.connHandler ended")
defer wg.Done()
defer close(connChan)
outer:
for {
var conn *net.TCPConn
conn, err = lsnr.AcceptTCP()
if err != nil {
return
}
select {
case <-ctx.Done():
conn.Close()
break outer
case connChan <- conn:
}
}
}()
outer:
for {
select {
case conn := <-connChan:
wg.Add(1)
go func() {
defer wg.Done()
err := s.HandleClient(ctx, conn)
if err != nil {
s.Log.Error(err.Error())
}
}()
case <-ctx.Done():
lsnr.SetDeadline(time.Unix(1, 0))
break outer
}
}
// drain
for _ = range connChan {
}
wg.Wait()
if IsTimeout(err) {
err = nil
}
return err
}