-
Notifications
You must be signed in to change notification settings - Fork 668
/
SinkChannelImpl.java
246 lines (197 loc) · 10.6 KB
/
SinkChannelImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
/*
* Copyright (c) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
*/
package sun.nio.ch;
import java.io.FileDescriptor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.Pipe;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
/**
* Pipe.SinkChannel implementation based on socket connection.
*/
// 管道中的写通道的实现类,向这里写入数据。读写通道往往共享一个Buffer来直接通信
class SinkChannelImpl extends Pipe.SinkChannel implements SelChImpl {
/** The SocketChannel assoicated with this pipe */
// 实际使用的通道
final SocketChannel socketChannel;
/*▼ 构造器 ████████████████████████████████████████████████████████████████████████████████┓ */
SinkChannelImpl(SelectorProvider provider, SocketChannel socketChannel) {
super(provider);
this.socketChannel = socketChannel;
}
/*▲ 构造器 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 关闭 ████████████████████████████████████████████████████████████████████████████████┓ */
// 实现对"可选择"通道的关闭操作
protected void implCloseSelectableChannel() throws IOException {
// 如果当前通道上已经没有已注册的SelectionKey,则销毁通道
if(!isRegistered()) {
kill();
}
}
// 销毁当前通道,即释放对Socket文件描述符的引用
public void kill() throws IOException {
socketChannel.close();
}
/*▲ 关闭 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 读/写操作 ████████████████████████████████████████████████████████████████████████████████┓ */
// 向当前通道中写入src包含的内容
public int write(ByteBuffer src) throws IOException {
try {
return socketChannel.write(src);
} catch(AsynchronousCloseException x) {
close();
throw x;
}
}
// 向当前通道中写入srcs中各个缓冲区包含的内容
public long write(ByteBuffer[] srcs) throws IOException {
try {
return socketChannel.write(srcs);
} catch(AsynchronousCloseException x) {
close();
throw x;
}
}
// 向当前通道中写入srcs[offset, offset+length-1]中各个缓冲区包含的内容
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
if((offset<0) || (length<0) || (offset>srcs.length - length)) {
throw new IndexOutOfBoundsException();
}
try {
return write(Util.subsequence(srcs, offset, length));
} catch(AsynchronousCloseException x) {
close();
throw x;
}
}
/*▲ 读/写操作 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 监听参数/事件 ████████████████████████████████████████████████████████████████████████████████┓ */
/*
* 翻译Sink通道监听的事件,返回对ops的翻译结果
*
* 方向:Java层 --> native层
* SelectionKey.XXX --> Net.XXX
*/
public int translateInterestOps(int ops) {
int newOps = 0;
if((ops & SelectionKey.OP_WRITE) != 0) {
newOps |= Net.POLLOUT;
}
return newOps;
}
/*▲ 监听参数/事件 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 就绪参数/事件 ████████████████████████████████████████████████████████████████████████████████┓ */
/*
*【增量更新】已就绪事件
*
* 将本地(native)反馈的就绪信号ops翻译并存储到Java层的就绪事件readyOps中,
* 返回值指示上一次反馈的事件与本次反馈的事件是否发生了改变。
*
* 通道收到有效的反馈事件后,会【增量更新】上次记录的已就绪事件,
* 如果本地(native)反馈了错误或挂起信号,则将已就绪事件直接设置为通道注册的监听事件。
*
* 方向:native层 --> Java层
* Net.XXX --> SelectionKey.XXX
*
* 参见:SelectionKeyImpl#readyOps
*/
public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl selectionKey) {
return translateReadyOps(ops, selectionKey.nioReadyOps(), selectionKey);
}
/*
*【覆盖更新】已就绪事件
*
* 将本地(native)反馈的就绪信号ops翻译并存储到Java层的就绪事件readyOps中,
* 返回值指示上一次反馈的事件与本次反馈的事件是否发生了改变。
*
* 通道收到有效的反馈事件后,会【覆盖】(selectionKey中)上次记录的已就绪事件,
* 如果本地(native)反馈了错误或挂起信号,则将已就绪事件直接设置为通道注册的监听事件。
*
* 方向:native层 --> Java层
* Net.XXX --> SelectionKey.XXX
*
* 参见:SelectionKeyImpl#readyOps
*/
public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl selectionKey) {
return translateReadyOps(ops, 0, selectionKey);
}
/*
*【增量更新】已就绪事件(基于initialOps叠加)
*
* 将本地(native)反馈的就绪信号ops翻译并存储到Java层的就绪事件readyOps中,
* 返回值指示上一次反馈的事件与本次反馈的事件是否发生了改变。
*
* 通道收到有效的反馈事件后,会将其【叠加】在initialOps上,换句话说是对已就绪事件的增量更新。
* 如果本地(native)反馈了错误或挂起信号,则将已就绪事件直接设置为通道注册的监听事件
*
* 方向:native层 --> Java层
* Net.XXX --> SelectionKey.XXX
*
* 参见:SelectionKeyImpl#readyOps
*/
public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl selectionKey) {
// 不合规的文件描述符
if((ops & Net.POLLNVAL) != 0) {
throw new Error("POLLNVAL detected");
}
// 获取注册的监听事件:SelectionKey.XXX(不会验证当前"选择键"是否有效)
int intOps = selectionKey.nioInterestOps();
// 获取已就绪事件
int oldOps = selectionKey.nioReadyOps();
int newOps = initialOps;
// 本地(native)反馈了错误或挂起的信号
if((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
newOps = intOps;
// 直接将通道注册的监听事件设置为已就绪事件
selectionKey.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
// 该通道监听了"可写"事件,且可以向通道写入数据
if(((ops & Net.POLLOUT) != 0) && ((intOps & SelectionKey.OP_WRITE) != 0)) {
newOps |= SelectionKey.OP_WRITE;
}
// 将newOps设置为已就绪事件
selectionKey.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
/*▲ 就绪参数/事件 ████████████████████████████████████████████████████████████████████████████████┛ */
// 是否设置当前通道为阻塞模式
protected void implConfigureBlocking(boolean block) throws IOException {
socketChannel.configureBlocking(block);
}
// 返回通道在Java层的文件描述符
public FileDescriptor getFD() {
return ((SocketChannelImpl) socketChannel).getFD();
}
// 返回通道在本地(native层)的文件描述符
public int getFDVal() {
return ((SocketChannelImpl) socketChannel).getFDVal();
}
}