-
Notifications
You must be signed in to change notification settings - Fork 669
/
Copy pathBlockingQueue.java
439 lines (404 loc) · 20.8 KB
/
BlockingQueue.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import java.util.Collection;
import java.util.Queue;
/**
* A {@link Queue} that additionally supports operations that wait for
* the queue to become non-empty when retrieving an element, and wait
* for space to become available in the queue when storing an element.
*
* <p>{@code BlockingQueue} methods come in four forms, with different ways
* of handling operations that cannot be satisfied immediately, but may be
* satisfied at some point in the future:
* one throws an exception, the second returns a special value (either
* {@code null} or {@code false}, depending on the operation), the third
* blocks the current thread indefinitely until the operation can succeed,
* and the fourth blocks for only a given maximum time limit before giving
* up. These methods are summarized in the following table:
*
* <table class="plain">
* <caption>Summary of BlockingQueue methods</caption>
* <tr>
* <td></td>
* <th scope="col" style="font-weight:normal; font-style:italic">Throws exception</th>
* <th scope="col" style="font-weight:normal; font-style:italic">Special value</th>
* <th scope="col" style="font-weight:normal; font-style:italic">Blocks</th>
* <th scope="col" style="font-weight:normal; font-style:italic">Times out</th>
* </tr>
* <tr>
* <th scope="row" style="text-align:left">Insert</th>
* <td>{@link #add(Object) add(e)}</td>
* <td>{@link #offer(Object) offer(e)}</td>
* <td>{@link #put(Object) put(e)}</td>
* <td>{@link #offer(Object, long, TimeUnit) offer(e, time, unit)}</td>
* </tr>
* <tr>
* <th scope="row" style="text-align:left">Remove</th>
* <td>{@link #remove() remove()}</td>
* <td>{@link #poll() poll()}</td>
* <td>{@link #take() take()}</td>
* <td>{@link #poll(long, TimeUnit) poll(time, unit)}</td>
* </tr>
* <tr>
* <th scope="row" style="text-align:left">Examine</th>
* <td>{@link #element() element()}</td>
* <td>{@link #peek() peek()}</td>
* <td style="font-style: italic">not applicable</td>
* <td style="font-style: italic">not applicable</td>
* </tr>
* </table>
*
* <p>A {@code BlockingQueue} does not accept {@code null} elements.
* Implementations throw {@code NullPointerException} on attempts
* to {@code add}, {@code put} or {@code offer} a {@code null}. A
* {@code null} is used as a sentinel value to indicate failure of
* {@code poll} operations.
*
* <p>A {@code BlockingQueue} may be capacity bounded. At any given
* time it may have a {@code remainingCapacity} beyond which no
* additional elements can be {@code put} without blocking.
* A {@code BlockingQueue} without any intrinsic capacity constraints always
* reports a remaining capacity of {@code Integer.MAX_VALUE}.
*
* <p>{@code BlockingQueue} implementations are designed to be used
* primarily for producer-consumer queues, but additionally support
* the {@link Collection} interface. So, for example, it is
* possible to remove an arbitrary element from a queue using
* {@code remove(x)}. However, such operations are in general
* <em>not</em> performed very efficiently, and are intended for only
* occasional use, such as when a queued message is cancelled.
*
* <p>{@code BlockingQueue} implementations are thread-safe. All
* queuing methods achieve their effects atomically using internal
* locks or other forms of concurrency control. However, the
* <em>bulk</em> Collection operations {@code addAll},
* {@code containsAll}, {@code retainAll} and {@code removeAll} are
* <em>not</em> necessarily performed atomically unless specified
* otherwise in an implementation. So it is possible, for example, for
* {@code addAll(c)} to fail (throwing an exception) after adding
* only some of the elements in {@code c}.
*
* <p>A {@code BlockingQueue} does <em>not</em> intrinsically support
* any kind of "close" or "shutdown" operation to
* indicate that no more items will be added. The needs and usage of
* such features tend to be implementation-dependent. For example, a
* common tactic is for producers to insert special
* <em>end-of-stream</em> or <em>poison</em> objects, that are
* interpreted accordingly when taken by consumers.
*
* <p>
* Usage example, based on a typical producer-consumer scenario.
* Note that a {@code BlockingQueue} can safely be used with multiple
* producers and multiple consumers.
* <pre> {@code
* class Producer implements Runnable {
* private final BlockingQueue queue;
* Producer(BlockingQueue q) { queue = q; }
* public void run() {
* try {
* while (true) { queue.put(produce()); }
* } catch (InterruptedException ex) { ... handle ...}
* }
* Object produce() { ... }
* }
*
* class Consumer implements Runnable {
* private final BlockingQueue queue;
* Consumer(BlockingQueue q) { queue = q; }
* public void run() {
* try {
* while (true) { consume(queue.take()); }
* } catch (InterruptedException ex) { ... handle ...}
* }
* void consume(Object x) { ... }
* }
*
* class Setup {
* void main() {
* BlockingQueue q = new SomeQueueImplementation();
* Producer p = new Producer(q);
* Consumer c1 = new Consumer(q);
* Consumer c2 = new Consumer(q);
* new Thread(p).start();
* new Thread(c1).start();
* new Thread(c2).start();
* }
* }}</pre>
*
* <p>Memory consistency effects: As with other concurrent
* collections, actions in a thread prior to placing an object into a
* {@code BlockingQueue}
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions subsequent to the access or removal of that element from
* the {@code BlockingQueue} in another thread.
*
* <p>This interface is a member of the
* <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
* Java Collections Framework</a>.
*
* @param <E> the type of elements held in this queue
*
* @author Doug Lea
* @since 1.5
*/
/*
* 单向阻塞队列
*
* 主要特性:
* 1.阻塞队列是线程安全的,入队/出队互不干扰
* 2.在不满足入队/出队条件时,可以选择阻塞操作线程,或选择快速失败
*/
public interface BlockingQueue<E> extends Queue<E> {
/*▼ 入队 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
* @param e the element to add
*
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
// 入队,无法入队时扩容或阻塞
void put(E e) throws InterruptedException;
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions, returning
* {@code true} upon success and throwing an
* {@code IllegalStateException} if no space is currently available.
* When using a capacity-restricted queue, it is generally preferable to
* use {@link #offer(Object) offer}.
*
* @param e the element to add
*
* @return {@code true} (as specified by {@link Collection#add})
*
* @throws IllegalStateException if the element cannot be added at this
* time due to capacity restrictions
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
// 入队,无法入队时扩容或抛异常,不阻塞
boolean add(E e);
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions, returning
* {@code true} upon success and {@code false} if no space is currently
* available. When using a capacity-restricted queue, this method is
* generally preferable to {@link #add}, which can fail to insert an
* element only by throwing an exception.
*
* @param e the element to add
*
* @return {@code true} if the element was added to this queue, else
* {@code false}
*
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
// 入队,无法入队时扩容或返回false,不阻塞
boolean offer(E e);
/**
* Inserts the specified element into this queue, waiting up to the
* specified wait time if necessary for space to become available.
*
* @param e the element to add
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
*
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
*
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
// 入队,无法入队时扩容,或阻塞一段时间,超时后无法入队则返回false
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
/*▲ 入队 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 出队 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
*
* @throws InterruptedException if interrupted while waiting
*/
// 出队,无法出队时阻塞
E take() throws InterruptedException;
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
*
* @return {@code true} if this queue changed as a result of the call
*
* @throws ClassCastException if the class of the specified element
* is incompatible with this queue
* (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>)
* @throws NullPointerException if the specified element is null
* (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>)
*/
// 移除,移除成功则返回true(该方法来自Collection)
boolean remove(Object o);
/**
* Retrieves and removes the head of this queue, waiting up to the
* specified wait time if necessary for an element to become available.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
*
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
*
* @throws InterruptedException if interrupted while waiting
*/
// 出队,无法出队时阻塞一段时间,超时后无法出队则返回null
E poll(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Removes all available elements from this queue and adds them
* to the given collection. This operation may be more
* efficient than repeatedly polling this queue. A failure
* encountered while attempting to add elements to
* collection {@code c} may result in elements being in neither,
* either or both collections when the associated exception is
* thrown. Attempts to drain a queue to itself result in
* {@code IllegalArgumentException}. Further, the behavior of
* this operation is undefined if the specified collection is
* modified while the operation is in progress.
*
* @param c the collection to transfer elements into
*
* @return the number of elements transferred
*
* @throws UnsupportedOperationException if addition of elements
* is not supported by the specified collection
* @throws ClassCastException if the class of an element of this queue
* prevents it from being added to the specified collection
* @throws NullPointerException if the specified collection is null
* @throws IllegalArgumentException if the specified collection is this
* queue, or some property of an element of this queue prevents
* it from being added to the specified collection
*/
// 将队列中所有元素移除,并转移到给定的容器当中
int drainTo(Collection<? super E> c);
/**
* Removes at most the given number of available elements from
* this queue and adds them to the given collection. A failure
* encountered while attempting to add elements to
* collection {@code c} may result in elements being in neither,
* either or both collections when the associated exception is
* thrown. Attempts to drain a queue to itself result in
* {@code IllegalArgumentException}. Further, the behavior of
* this operation is undefined if the specified collection is
* modified while the operation is in progress.
*
* @param c the collection to transfer elements into
* @param maxElements the maximum number of elements to transfer
*
* @return the number of elements transferred
*
* @throws UnsupportedOperationException if addition of elements
* is not supported by the specified collection
* @throws ClassCastException if the class of an element of this queue
* prevents it from being added to the specified collection
* @throws NullPointerException if the specified collection is null
* @throws IllegalArgumentException if the specified collection is this
* queue, or some property of an element of this queue prevents
* it from being added to the specified collection
*/
// 将队列中前maxElements个元素移除,并转移到给定的容器当中
int drainTo(Collection<? super E> c, int maxElements);
/*▲ 出队 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 包含查询 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
*
* @return {@code true} if this queue contains the specified element
*
* @throws ClassCastException if the class of the specified element
* is incompatible with this queue
* (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>)
* @throws NullPointerException if the specified element is null
* (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>)
*/
// 判断队列中是否包含指定的元素
boolean contains(Object o);
/*▲ 包含查询 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 杂项 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
* limit.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
* an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*
* @return the remaining capacity
*/
// 计算队列剩余空间
int remainingCapacity();
/*▲ 杂项 ████████████████████████████████████████████████████████████████████████████████┛ */
}