-
Notifications
You must be signed in to change notification settings - Fork 668
/
SubmissionPublisher.java
2100 lines (1793 loc) · 89.9 KB
/
SubmissionPublisher.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import static java.util.concurrent.Flow.Publisher;
import static java.util.concurrent.Flow.Subscriber;
import static java.util.concurrent.Flow.Subscription;
/**
* A {@link Flow.Publisher} that asynchronously issues submitted
* (non-null) items to current subscribers until it is closed. Each
* current subscriber receives newly submitted items in the same order
* unless drops or exceptions are encountered. Using a
* SubmissionPublisher allows item generators to act as compliant <a
* href="http://www.reactive-streams.org/"> reactive-streams</a>
* Publishers relying on drop handling and/or blocking for flow
* control.
*
* <p>A SubmissionPublisher uses the {@link Executor} supplied in its
* constructor for delivery to subscribers. The best choice of
* Executor depends on expected usage. If the generator(s) of
* submitted items run in separate threads, and the number of
* subscribers can be estimated, consider using a {@link
* Executors#newFixedThreadPool}. Otherwise consider using the
* default, normally the {@link ForkJoinPool#commonPool}.
*
* <p>Buffering allows producers and consumers to transiently operate
* at different rates. Each subscriber uses an independent buffer.
* Buffers are created upon first use and expanded as needed up to the
* given maximum. (The enforced capacity may be rounded up to the
* nearest power of two and/or bounded by the largest value supported
* by this implementation.) Invocations of {@link
* Flow.Subscription#request(long) request} do not directly result in
* buffer expansion, but risk saturation if unfilled requests exceed
* the maximum capacity. The default value of {@link
* Flow#defaultBufferSize()} may provide a useful starting point for
* choosing a capacity based on expected rates, resources, and usages.
*
* <p>A single SubmissionPublisher may be shared among multiple
* sources. Actions in a source thread prior to publishing an item or
* issuing a signal <a href="package-summary.html#MemoryVisibility">
* <i>happen-before</i></a> actions subsequent to the corresponding
* access by each subscriber. But reported estimates of lag and demand
* are designed for use in monitoring, not for synchronization
* control, and may reflect stale or inaccurate views of progress.
*
* <p>Publication methods support different policies about what to do
* when buffers are saturated. Method {@link #submit(Object) submit}
* blocks until resources are available. This is simplest, but least
* responsive. The {@code offer} methods may drop items (either
* immediately or with bounded timeout), but provide an opportunity to
* interpose a handler and then retry.
*
* <p>If any Subscriber method throws an exception, its subscription
* is cancelled. If a handler is supplied as a constructor argument,
* it is invoked before cancellation upon an exception in method
* {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
* {@link Flow.Subscriber#onSubscribe onSubscribe},
* {@link Flow.Subscriber#onError(Throwable) onError} and
* {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
* handled before cancellation. If the supplied Executor throws
* {@link RejectedExecutionException} (or any other RuntimeException
* or Error) when attempting to execute a task, or a drop handler
* throws an exception when processing a dropped item, then the
* exception is rethrown. In these cases, not all subscribers will
* have been issued the published item. It is usually good practice to
* {@link #closeExceptionally closeExceptionally} in these cases.
*
* <p>Method {@link #consume(Consumer)} simplifies support for a
* common case in which the only action of a subscriber is to request
* and process all items using a supplied function.
*
* <p>This class may also serve as a convenient base for subclasses
* that generate items, and use the methods in this class to publish
* them. For example here is a class that periodically publishes the
* items generated from a supplier. (In practice you might add methods
* to independently start and stop generation, to share Executors
* among publishers, and so on, or use a SubmissionPublisher as a
* component rather than a superclass.)
*
* <pre> {@code
* class PeriodicPublisher<T> extends SubmissionPublisher<T> {
* final ScheduledFuture<?> periodicTask;
* final ScheduledExecutorService scheduler;
* PeriodicPublisher(Executor executor, int maxBufferCapacity,
* Supplier<? extends T> supplier,
* long period, TimeUnit unit) {
* super(executor, maxBufferCapacity);
* scheduler = new ScheduledThreadPoolExecutor(1);
* periodicTask = scheduler.scheduleAtFixedRate(
* () -> submit(supplier.get()), 0, period, unit);
* }
* public void close() {
* periodicTask.cancel(false);
* scheduler.shutdown();
* super.close();
* }
* }}</pre>
*
* <p>Here is an example of a {@link Flow.Processor} implementation.
* It uses single-step requests to its publisher for simplicity of
* illustration. A more adaptive version could monitor flow using the
* lag estimate returned from {@code submit}, along with other utility
* methods.
*
* <pre> {@code
* class TransformProcessor<S,T> extends SubmissionPublisher<T>
* implements Flow.Processor<S,T> {
* final Function<? super S, ? extends T> function;
* Flow.Subscription subscription;
* TransformProcessor(Executor executor, int maxBufferCapacity,
* Function<? super S, ? extends T> function) {
* super(executor, maxBufferCapacity);
* this.function = function;
* }
* public void onSubscribe(Flow.Subscription subscription) {
* (this.subscription = subscription).request(1);
* }
* public void onNext(S item) {
* subscription.request(1);
* submit(function.apply(item));
* }
* public void onError(Throwable ex) { closeExceptionally(ex); }
* public void onComplete() { close(); }
* }}</pre>
*
* @param <T> the published item type
*
* @author Doug Lea
* @since 9
*/
/*
* 生产/发布/推送者,是Flow.Publisher的一个实现
*/
public class SubmissionPublisher<T> implements Publisher<T>, AutoCloseable {
/*
* Most mechanics are handled by BufferedSubscription. This class
* mainly tracks subscribers and ensures sequentiality, by using
* built-in synchronization locks across public methods. Using
* built-in locks works well in the most typical case in which
* only one thread submits items. We extend this idea in
* submission methods by detecting single-ownership to reduce
* producer-consumer synchronization strength.
*/
/** The largest possible power of two array size. */
static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
/**
* Initial buffer capacity used when maxBufferCapacity is
* greater. Must be a power of two.
*/
static final int INITIAL_CAPACITY = 32;
// default Executor setup; nearly the same as CompletableFuture
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot support parallelism.
*/
// 默认的任务执行器
private static final Executor ASYNC_POOL = (ForkJoinPool.getCommonPoolParallelism()>1) ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
/**
* Clients (BufferedSubscriptions) are maintained in a linked list
* (via their "next" fields). This works well for publish loops.
* It requires O(n) traversal to check for duplicate subscribers,
* but we expect that subscribing is much less common than
* publishing. Unsubscribing occurs only during traversal loops,
* when BufferedSubscription methods return negative values
* signifying that they have been closed. To reduce
* head-of-line blocking, submit and offer methods first call
* BufferedSubscription.offer on each subscriber, and place
* saturated ones in retries list (using nextRetry field), and
* retry, possibly blocking or dropping.
*/
// 注册的中介链表,先注册的排在开头
BufferedSubscription<T> clients;
/** Run status, updated only within locks */
// 标记中介是否已经关闭
volatile boolean closed;
/** Set true on first call to subscribe, to initialize possible owner */
// 当首次注册消费者之后置为true
boolean subscribed;
/** The first caller thread to subscribe, or null if thread ever changed */
// 首次注册消费者时生产者所在线程
Thread owner;
/** If non-null, the exception in closeExceptionally */
// 调用closeExceptionally(Throwable)异常关闭中介时传入的异常信息
volatile Throwable closedException;
// 执行消费任务的任务执行器
final Executor executor;
// 如果消息在消费过程中出现异常,设置handler可以让消费者决定下一步该采取什么操作,包括如何处理异常
final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
// 消息队列最大容量
final int maxBufferCapacity;
/*▼ 构造器 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Creates a new SubmissionPublisher using the {@link
* ForkJoinPool#commonPool()} for async delivery to subscribers
* (unless it does not support a parallelism level of at least two,
* in which case, a new Thread is created to run each task), with
* maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
* handler for Subscriber exceptions in method {@link
* Flow.Subscriber#onNext(Object) onNext}.
*/
public SubmissionPublisher() {
this(ASYNC_POOL, Flow.defaultBufferSize(), null);
}
/**
* Creates a new SubmissionPublisher using the given Executor for
* async delivery to subscribers, with the given maximum buffer size
* for each subscriber, and no handler for Subscriber exceptions in
* method {@link Flow.Subscriber#onNext(Object) onNext}.
*
* @param executor the executor to use for async delivery,
* supporting creation of at least one independent thread
* @param maxBufferCapacity the maximum capacity for each
* subscriber's buffer (the enforced capacity may be rounded up to
* the nearest power of two and/or bounded by the largest value
* supported by this implementation; method {@link #getMaxBufferCapacity}
* returns the actual value)
*
* @throws NullPointerException if executor is null
* @throws IllegalArgumentException if maxBufferCapacity not
* positive
*/
public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
this(executor, maxBufferCapacity, null);
}
/**
* Creates a new SubmissionPublisher using the given Executor for
* async delivery to subscribers, with the given maximum buffer size
* for each subscriber, and, if non-null, the given handler invoked
* when any Subscriber throws an exception in method {@link
* Flow.Subscriber#onNext(Object) onNext}.
*
* @param executor the executor to use for async delivery,
* supporting creation of at least one independent thread
* @param maxBufferCapacity the maximum capacity for each
* subscriber's buffer (the enforced capacity may be rounded up to
* the nearest power of two and/or bounded by the largest value
* supported by this implementation; method {@link #getMaxBufferCapacity}
* returns the actual value)
* @param handler if non-null, procedure to invoke upon exception
* thrown in method {@code onNext}
*
* @throws NullPointerException if executor is null
* @throws IllegalArgumentException if maxBufferCapacity not
* positive
*/
/**
* executor - 该任务执行器用在执行消费动作中
* maxBufferCapacity - 消息队列最大容量,会被预处理之后再存到maxBufferCapacity域
* handler - 如果消息在消费过程中出现异常,设置handler可以让消费者决定下一步该采取什么操作,包括如何处理异常
*/
public SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
if(executor == null) {
throw new NullPointerException();
}
if(maxBufferCapacity<=0) {
throw new IllegalArgumentException("capacity must be positive");
}
this.executor = executor;
this.onNextHandler = handler;
// 适当扩大maxBufferCapacity(扩大倍数不超过2)
this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
}
/*▲ 构造器 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 注册 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Adds the given Subscriber unless already subscribed.
* If already subscribed, the Subscriber's {@link
* Flow.Subscriber#onError(Throwable) onError} method is invoked on
* the existing subscription with an {@link IllegalStateException}.
* Otherwise, upon success, the Subscriber's {@link
* Flow.Subscriber#onSubscribe onSubscribe} method is invoked
* asynchronously with a new {@link Flow.Subscription}. If {@link
* Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
* subscription is cancelled. Otherwise, if this SubmissionPublisher
* was closed exceptionally, then the subscriber's {@link
* Flow.Subscriber#onError onError} method is invoked with the
* corresponding exception, or if closed without exception, the
* subscriber's {@link Flow.Subscriber#onComplete() onComplete}
* method is invoked. Subscribers may enable receiving items by
* invoking the {@link Flow.Subscription#request(long) request}
* method of the new Subscription, and may unsubscribe by invoking
* its {@link Flow.Subscription#cancel() cancel} method.
*
* @param subscriber the subscriber
*
* @throws NullPointerException if subscriber is null
*/
// 注册消费者。其执行过程是先向生产者注册中介,再向中介注册消费者,是一个间接注册过程
public void subscribe(Subscriber<? super T> subscriber) {
if(subscriber == null) {
throw new NullPointerException();
}
int max = maxBufferCapacity; // allocate initial array
// 中介内部的消息队列(按最大容量分配)
Object[] array = new Object[max<INITIAL_CAPACITY ? max : INITIAL_CAPACITY];
// 这里将消息队列的初始容量设置的跟最大容量一样大
BufferedSubscription<T> subscription = new BufferedSubscription<T>(subscriber, executor, onNextHandler, array, max);
synchronized(this) {
// 如果是首次注册消费者
if(!subscribed) {
subscribed = true;
// 首次注册消费者时生产者所在线程
owner = Thread.currentThread();
}
// 遍历注册的所有中介(消费者)
for(BufferedSubscription<T> b = clients, pred = null; ; ) {
// 如果遍历到了结尾
if(b == null) {
Throwable ex;
// 将中介注册到生产者时的回调
subscription.onSubscribe();
// 如果中介已被异常关闭
if((ex = closedException) != null) {
// 使用已经异常关闭的中介
subscription.onError(ex);
// 如果中介已被正常关闭
} else if(closed) {
// 使用已经正常关闭的中介
subscription.onComplete();
// 如果是添加首个注册的中介
} else if(pred == null) {
clients = subscription;
// 追加中介
} else {
pred.next = subscription;
}
break;
}
BufferedSubscription<T> next = b.next;
// 如果中介已被关闭,
if(b.isClosed()) { // remove
b.next = null; // detach
if(pred == null) {
clients = next;
} else {
pred.next = next;
}
// 不允许重复注册同一个消费者
} else if(subscriber.equals(b.subscriber)) {
// 使用中介时发生异常
b.onError(new IllegalStateException("Duplicate subscribe"));
break;
} else {
pred = b;
}
b = next;
}// for
}
}
/*▲ 注册 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 生产/推送/发布消息 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Publishes the given item to each current subscriber by
* asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
* onNext} method, blocking uninterruptibly while resources for any
* subscriber are unavailable. This method returns an estimate of
* the maximum lag (number of items submitted but not yet consumed)
* among all current subscribers. This value is at least one
* (accounting for this submitted item) if there are any
* subscribers, else zero.
*
* <p>If the Executor for this publisher throws a
* RejectedExecutionException (or any other RuntimeException or
* Error) when attempting to asynchronously notify subscribers,
* then this exception is rethrown, in which case not all
* subscribers will have been issued this item.
*
* @param item the (non-null) item to publish
*
* @return the estimated maximum lag among subscribers
*
* @throws IllegalStateException if closed
* @throws NullPointerException if item is null
* @throws RejectedExecutionException if thrown by Executor
*/
/*
* 生产者向中介推送消息(存储到消息队列中以便消费者去消费),推送失败后中介/生产者进入无限期阻塞,直到被主动唤醒,醒来后只重试一次
*
* item 待发布的消息
*
* 返回值:
* 如果为负数,其绝对值代表本次发布错过的中介数量(因为满了无法接受)
* 如果为正数,代表各中介内消息队列的最大长度,即衡量估计需要最长等待的时间
*/
public int submit(T item) {
return doOffer(item, Long.MAX_VALUE, null);
}
/**
* Publishes the given item, if possible, to each current subscriber
* by asynchronously invoking its {@link
* Flow.Subscriber#onNext(Object) onNext} method. The item may be
* dropped by one or more subscribers if resource limits are
* exceeded, in which case the given handler (if non-null) is
* invoked, and if it returns true, retried once. Other calls to
* methods in this class by other threads are blocked while the
* handler is invoked. Unless recovery is assured, options are
* usually limited to logging the error and/or issuing an {@link
* Flow.Subscriber#onError(Throwable) onError} signal to the
* subscriber.
*
* <p>This method returns a status indicator: If negative, it
* represents the (negative) number of drops (failed attempts to
* issue the item to a subscriber). Otherwise it is an estimate of
* the maximum lag (number of items submitted but not yet
* consumed) among all current subscribers. This value is at least
* one (accounting for this submitted item) if there are any
* subscribers, else zero.
*
* <p>If the Executor for this publisher throws a
* RejectedExecutionException (or any other RuntimeException or
* Error) when attempting to asynchronously notify subscribers, or
* the drop handler throws an exception when processing a dropped
* item, then this exception is rethrown.
*
* @param item the (non-null) item to publish
* @param onDrop if non-null, the handler invoked upon a drop to a
* subscriber, with arguments of the subscriber and item; if it
* returns true, an offer is re-attempted (once)
*
* @return if negative, the (negative) number of drops; otherwise an estimate of maximum lag
*
* @throws IllegalStateException if closed
* @throws NullPointerException if item is null
* @throws RejectedExecutionException if thrown by Executor
*/
/*
* 生产者向中介推送消息(存储到消息队列中以便消费者去消费),推送失败后立即重试
*
* item 待发布的消息
* onDrop 第一次重试失败后调用,onDrop的返回值指示是否需要进行第二次尝试
*
* 返回值:
* 如果为负数,其绝对值代表本次发布错过的中介数量(因为满了无法接受)
* 如果为正数,代表各中介内消息队列的最大长度,即衡量估计需要最长等待的时间
*/
public int offer(T item, BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
return doOffer(item, 0L, onDrop);
}
/**
* Publishes the given item, if possible, to each current subscriber
* by asynchronously invoking its {@link
* Flow.Subscriber#onNext(Object) onNext} method, blocking while
* resources for any subscription are unavailable, up to the
* specified timeout or until the caller thread is interrupted, at
* which point the given handler (if non-null) is invoked, and if it
* returns true, retried once. (The drop handler may distinguish
* timeouts from interrupts by checking whether the current thread
* is interrupted.) Other calls to methods in this class by other
* threads are blocked while the handler is invoked. Unless
* recovery is assured, options are usually limited to logging the
* error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
* onError} signal to the subscriber.
*
* <p>This method returns a status indicator: If negative, it
* represents the (negative) number of drops (failed attempts to
* issue the item to a subscriber). Otherwise it is an estimate of
* the maximum lag (number of items submitted but not yet
* consumed) among all current subscribers. This value is at least
* one (accounting for this submitted item) if there are any
* subscribers, else zero.
*
* <p>If the Executor for this publisher throws a
* RejectedExecutionException (or any other RuntimeException or
* Error) when attempting to asynchronously notify subscribers, or
* the drop handler throws an exception when processing a dropped
* item, then this exception is rethrown.
*
* @param item the (non-null) item to publish
* @param timeout how long to wait for resources for any subscriber
* before giving up, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @param onDrop if non-null, the handler invoked upon a drop to a
* subscriber, with arguments of the subscriber and item; if it
* returns true, an offer is re-attempted (once)
*
* @return if negative, the (negative) number of drops; otherwise an estimate of maximum lag
*
* @throws IllegalStateException if closed
* @throws NullPointerException if item is null
* @throws RejectedExecutionException if thrown by Executor
*/
/*
* 生产者向中介推送消息(存储到消息队列中以便消费者去消费),推送失败后阻塞timeout时长后再重试
*
* item 待发布的消息
* timeout 发布失败并重试前需要等待的时长
* unit timeout的时间单位
* onDrop 第一次重试失败后调用,onDrop的返回值指示是否需要进行第二次尝试
*
* 返回值:
* 如果为负数,其绝对值代表本次发布错过的中介数量(因为满了无法接受)
* 如果为正数,代表各中介内消息队列的最大长度,即衡量估计需要最长等待的时间
*/
public int offer(T item, long timeout, TimeUnit unit, BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
// 将unit时间单位下的timeout换算为纳秒
long nanos = unit.toNanos(timeout);
// distinguishes from untimed (only wrt interrupt policy)
if(nanos == Long.MAX_VALUE) {
--nanos;
}
return doOffer(item, nanos, onDrop);
}
/**
* Common implementation for all three forms of submit and offer.
* Acts as submit if nanos == Long.MAX_VALUE, else offer.
*/
/*
* 生产者向中介推送消息(存储到消息队列中以便消费者去消费),推送失败后阻塞nanos时长后再重试
*
* item 待发布的消息
* timeout 发布失败并重试前需要等待的时长,单位纳秒
* onDrop 第一次重试失败后调用,onDrop的返回值指示是否需要进行第二次尝试
*
* 返回值:
* 如果为负数,其绝对值代表本次发布错过的中介数量(因为满了无法接受)
* 如果为正数,代表各中介内消息队列的最大长度,即衡量估计需要最长等待的时间
*/
private int doOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
if(item == null) {
throw new NullPointerException();
}
// 记录各中介内消息队列的最大长度
int lag = 0;
// true:生产行为没有发生在首次注册消费者时生产者所在线程中
boolean unowned;
boolean complete;
synchronized(this) {
Thread t = Thread.currentThread();
Thread o;
// 中介链
BufferedSubscription<T> b = clients;
// 如果生产者当前所在线程不是首次注册消费者时所在的线程
if((unowned = ((o = owner) != t)) && o != null) {
owner = null; // disable bias
}
// 如果当前没有注册的中介(消费者)
if(b == null) {
complete = closed;
} else {
complete = false;
boolean cleanMe = false;
// 指向需要重试接收消息的中介
BufferedSubscription<T> retries = null;
BufferedSubscription<T> rtail = null;
BufferedSubscription<T> next;
// 遍历中介链,将消息推送到每个中介的消息队列,以便中介关联的消费者去消费
do {
// 下一个中介
next = b.next;
/*
* 将消息item添加到中介的消息队列中以待消费
* 返回值表示消息队列长度,返回0表示中介的消息队列已满,返回负数表示中介已关闭
*/
int stat = b.offer(item, unowned);
// 如果消息队列已满
if(stat == 0) { // saturated; add to retry list
b.nextRetry = null; // avoid garbage on exceptions
// 记录该中介,以待后续重试接收消息
if(rtail == null) {
retries = b;
} else {
rtail.nextRetry = b;
}
rtail = b;
// 如果中介已关闭
} else if(stat<0) { // closed
// 标记该中介为允许清理
cleanMe = true; // remove later
} else if(stat>lag) {
// 记录积压的更长的消息队列长度
lag = stat;
}
// 如果存在下一个中介
} while((b = next) != null);
// 每完成一次生产/推送,都需要把推送失败的重试一下
if(retries != null || cleanMe) {
// 处理推送失败的中介(阻塞并重试,或者清理)
lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
}
}
}
// 如果待接收消息的中介已关闭,则抛异常
if(complete) {
throw new IllegalStateException("Closed");
}
return lag;
}
/**
* Helps, (timed) waits for, and/or drops buffers on list;
* returns lag or negative drops (for use in offer).
*/
// 处理推送失败的中介(阻塞并重试)
private int retryOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop, BufferedSubscription<T> retries, int lag, boolean cleanMe) {
// 如果存在需要重试的中介
for(BufferedSubscription<T> r = retries; r != null; ) {
BufferedSubscription<T> nextRetry = r.nextRetry;
r.nextRetry = null;
/*
* 推送失败并重试之前需要阻塞/等待
* 如果nanos>0,说明需要阻塞一会儿(等待消费者消费)之后再去重试
*/
if(nanos>0L) {
r.awaitSpace(nanos);
}
// 尝试重新推送/生产消息,返回值代表消息队列长度,返回0表示推送/生产失败(比如满了),返回负数说明中介已关闭
int stat = r.retryOffer(item);
// 如果消息队列为null(比如满了),且需要重试(由onDrop的实现觉得),则再次尝试
if(stat == 0 && onDrop != null && onDrop.test(r.subscriber, item)) {
// 再次尝试重新推送/生产消息,返回值代表消息队列长度,返回0表示推送/生产失败(比如满了),返回负数说明中介已关闭
stat = r.retryOffer(item);
}
// 消息队列满了
if(stat == 0) {
lag = (lag >= 0) ? -1 : lag - 1;
// 中介已关闭,稍后需要清理
} else if(stat<0) {
cleanMe = true;
} else if(lag >= 0 && stat>lag) {
// 记录积压的更长的消息队列长度
lag = stat;
}
r = nextRetry;
}
// 如果有需要清理的中介(已经关闭的中介)
if(cleanMe) {
// 清理已经关闭的中介,并返回未关闭的中介数量
cleanAndCount();
}
return lag;
}
/**
* Returns current list count after removing closed subscribers.
* Call only while holding lock. Used mainly by retryOffer for cleanup.
*/
// 清理已经关闭的中介。返回未关闭的中介数量
private int cleanAndCount() {
int count = 0;
BufferedSubscription<T> pred = null, next;
for(BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
if(b.isClosed()) {
b.next = null;
if(pred == null) {
clients = next;
} else {
pred.next = next;
}
} else {
pred = b;
++count;
}
}
return count;
}
/*▲ 生产/推送/发布消息 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 关闭中介 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Unless already closed, issues {@link Flow.Subscriber#onComplete() onComplete} signals to current subscribers,
* and disallows subsequent attempts to publish.
* Upon return, this method does NOT guarantee that all subscribers have yet completed.
*/
// 正常关闭中介,阻断生产者向其推送消息(关闭后,不保证消息队列中的消息已被全部处理)
public void close() {
if(!closed) {
BufferedSubscription<T> b;
synchronized(this) {
// no need to re-check closed here
b = clients;
clients = null;
owner = null;
closed = true;
}
while(b != null) {
BufferedSubscription<T> next = b.next;
b.next = null;
// 正常关闭中介
b.onComplete();
b = next;
}
}
}
/**
* Unless already closed, issues {@link Flow.Subscriber#onError(Throwable) onError} signals to current subscribers with the given error,
* and disallows subsequent attempts to publish.
* Future subscribers also receive the given error.
* Upon return, this method does <em>NOT</em> guarantee that all subscribers have yet completed.
*
* @param error the {@code onError} argument sent to subscribers
*
* @throws NullPointerException if error is null
*/
// 异常关闭中介(传入一个异常信息),阻断生产者向其推送消息(关闭后,不保证消息队列中的消息已被全部处理)
public void closeExceptionally(Throwable error) {
if(error == null) {
throw new NullPointerException();
}
if(!closed) {
BufferedSubscription<T> b;
synchronized(this) {
b = clients;
if(!closed) { // don't clobber racing close
closedException = error;
clients = null;
owner = null;
closed = true;
}
}
while(b != null) {
BufferedSubscription<T> next = b.next;
b.next = null;
// 异常关闭中介
b.onError(error);
b = next;
}
}
}
/**
* Returns true if this publisher is not accepting submissions.
*
* @return true if closed
*/
// 判断中介是否已经关闭
public boolean isClosed() {
return closed;
}
/**
* Returns the exception associated with {@link
* #closeExceptionally(Throwable) closeExceptionally}, or null if
* not closed or if closed normally.
*
* @return the exception, or null if none
*/
// 获取引起异常关闭中介的异常信息
public Throwable getClosedException() {
return closedException;
}
/*▲ 关闭中介 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 属性 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Returns true if this publisher has any subscribers.
*
* @return true if this publisher has any subscribers
*/
// 判断是否存在未关闭的中介(顺便将查过过程中遇到的已关闭的中介从中介链上移除)
public boolean hasSubscribers() {
boolean nonEmpty = false;
synchronized(this) {
for(BufferedSubscription<T> b = clients; b != null; ) {
BufferedSubscription<T> next = b.next;
if(b.isClosed()) {
b.next = null;
b = clients = next;
} else {
nonEmpty = true;
break;
}
}
}
return nonEmpty;
}
/**
* Returns a list of current subscribers for monitoring and
* tracking purposes, not for invoking {@link Flow.Subscriber}
* methods on the subscribers.
*
* @return list of current subscribers
*/
// 获取未关闭的中介列表
public List<Subscriber<? super T>> getSubscribers() {
ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
synchronized(this) {
BufferedSubscription<T> pred = null, next;
for(BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
if(b.isClosed()) {
b.next = null;
if(pred == null) {
clients = next;
} else {
pred.next = next;
}
} else {
subs.add(b.subscriber);
pred = b;
}
}
}
return subs;
}
/**
* Returns true if the given Subscriber is currently subscribed.
*
* @param subscriber the subscriber
*
* @return true if currently subscribed
*
* @throws NullPointerException if subscriber is null
*/
// 判断是否存在未关闭的中介subscriber
public boolean isSubscribed(Subscriber<? super T> subscriber) {
if(subscriber == null) {
throw new NullPointerException();
}
if(!closed) {
synchronized(this) {
BufferedSubscription<T> pred = null, next;
for(BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
if(b.isClosed()) {
b.next = null;
if(pred == null) {
clients = next;
} else {
pred.next = next;
}
} else if(subscriber.equals(b.subscriber)) {
return true;
} else {
pred = b;
}
}
}
}
return false;
}
/**
* Returns the number of current subscribers.
*
* @return the number of current subscribers
*/
// 清理已经关闭的中介,并返回未关闭的中介数量
public int getNumberOfSubscribers() {
synchronized(this) {
return cleanAndCount();
}
}
/**
* Returns the Executor used for asynchronous delivery.
*
* @return the Executor used for asynchronous delivery
*/
// 获取执行消费任务的任务执行器
public Executor getExecutor() {
return executor;
}
/**
* Returns the maximum per-subscriber buffer capacity.
*
* @return the maximum per-subscriber buffer capacity