-
Notifications
You must be signed in to change notification settings - Fork 669
/
Copy pathScheduledThreadPoolExecutor.java
1773 lines (1526 loc) · 74.7 KB
/
ScheduledThreadPoolExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import java.util.AbstractQueue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* A {@link ThreadPoolExecutor} that can additionally schedule
* commands to run after a given delay, or to execute periodically.
* This class is preferable to {@link java.util.Timer} when multiple
* worker threads are needed, or when the additional flexibility or
* capabilities of {@link ThreadPoolExecutor} (which this class
* extends) are required.
*
* <p>Delayed tasks execute no sooner than they are enabled, but
* without any real-time guarantees about when, after they are
* enabled, they will commence. Tasks scheduled for exactly the same
* execution time are enabled in first-in-first-out (FIFO) order of
* submission.
*
* <p>When a submitted task is cancelled before it is run, execution
* is suppressed. By default, such a cancelled task is not
* automatically removed from the work queue until its delay elapses.
* While this enables further inspection and monitoring, it may also
* cause unbounded retention of cancelled tasks. To avoid this, use
* {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
* removed from the work queue at time of cancellation.
*
* <p>Successive executions of a periodic task scheduled via
* {@link #scheduleAtFixedRate scheduleAtFixedRate} or
* {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
* do not overlap. While different executions may be performed by
* different threads, the effects of prior executions
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* those of subsequent ones.
*
* <p>While this class inherits from {@link ThreadPoolExecutor}, a few
* of the inherited tuning methods are not useful for it. In
* particular, because it acts as a fixed-sized pool using
* {@code corePoolSize} threads and an unbounded queue, adjustments
* to {@code maximumPoolSize} have no useful effect. Additionally, it
* is almost never a good idea to set {@code corePoolSize} to zero or
* use {@code allowCoreThreadTimeOut} because this may leave the pool
* without threads to handle tasks once they become eligible to run.
*
* <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
* this class uses {@link Executors#defaultThreadFactory} as the
* default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
* as the default rejected execution handler.
*
* <p><b>Extension notes:</b> This class overrides the
* {@link ThreadPoolExecutor#execute(Runnable) execute} and
* {@link AbstractExecutorService#submit(Runnable) submit}
* methods to generate internal {@link ScheduledFuture} objects to
* control per-task delays and scheduling. To preserve
* functionality, any further overrides of these methods in
* subclasses must invoke superclass versions, which effectively
* disables additional task customization. However, this class
* provides alternative protected extension method
* {@code decorateTask} (one version each for {@code Runnable} and
* {@code Callable}) that can be used to customize the concrete task
* types used to execute commands entered via {@code execute},
* {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
* and {@code scheduleWithFixedDelay}. By default, a
* {@code ScheduledThreadPoolExecutor} uses a task type extending
* {@link FutureTask}. However, this may be modified or replaced using
* subclasses of the form:
*
* <pre> {@code
* public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
*
* static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
*
* protected <V> RunnableScheduledFuture<V> decorateTask(
* Runnable r, RunnableScheduledFuture<V> task) {
* return new CustomTask<V>(r, task);
* }
*
* protected <V> RunnableScheduledFuture<V> decorateTask(
* Callable<V> c, RunnableScheduledFuture<V> task) {
* return new CustomTask<V>(c, task);
* }
* // ... add constructors, etc.
* }}</pre>
*
* @since 1.5
* @author Doug Lea
*/
// 【定时任务线程池】,用于执行一次性或周期性的定时任务
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
/*
* This class specializes ThreadPoolExecutor implementation by
*
* 1. Using a custom task type ScheduledFutureTask, even for tasks
* that don't require scheduling because they are submitted
* using ExecutorService rather than ScheduledExecutorService
* methods, which are treated as tasks with a delay of zero.
*
* 2. Using a custom queue (DelayedWorkQueue), a variant of
* unbounded DelayQueue. The lack of capacity constraint and
* the fact that corePoolSize and maximumPoolSize are
* effectively identical simplifies some execution mechanics
* (see delayedExecute) compared to ThreadPoolExecutor.
*
* 3. Supporting optional run-after-shutdown parameters, which
* leads to overrides of shutdown methods to remove and cancel
* tasks that should NOT be run after shutdown, as well as
* different recheck logic when task (re)submission overlaps
* with a shutdown.
*
* 4. Task decoration methods to allow interception and
* instrumentation, which are needed because subclasses cannot
* otherwise override submit methods to get this effect. These
* don't have any impact on pool control logic though.
*/
/**
* The default keep-alive time for pool threads.
*
* Normally, this value is unused because all pool threads will be
* core threads, but if a user creates a pool with a corePoolSize
* of zero (against our advice), we keep a thread alive as long as
* there are queued tasks. If the keep alive time is zero (the
* historic value), we end up hot-spinning in getTask, wasting a
* CPU. But on the other hand, if we set the value too high, and
* users create a one-shot pool which they don't cleanly shutdown,
* the pool's non-daemon threads will prevent JVM termination. A
* small but non-zero value (relative to a JVM's lifetime) seems
* best.
*/
// 【N】型Worker的最大空闲时间(启用了超时设置后生效),参见ThreadPoolExecutor中的keepAliveTime
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
/**
* False if should cancel/suppress periodic tasks on shutdown.
*/
/*
* 线程池处于【关闭】状态时是否允许执行重复性任务,默认为false
*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
/**
* False if should cancel non-periodic not-yet-expired tasks on shutdown.
*/
/*
* 线程池处于【关闭】状态时是否允许执行一次性任务,默认为true
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
/**
* True if ScheduledFutureTask.cancel should remove from queue.
*/
// 是否移除被中止的任务
volatile boolean removeOnCancel;
/**
* Sequence number to break scheduling ties, and in turn to
* guarantee FIFO order among tied entries.
*/
// 任务的入队次序
private static final AtomicLong sequencer = new AtomicLong();
/*▼ 构造器 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
*
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue());
}
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param threadFactory the factory to use when the executor
* creates a new thread
*
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code threadFactory} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory);
}
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
*
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code handler} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), handler);
}
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
*
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code threadFactory} or
* {@code handler} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory, handler);
}
/*▲ 构造器 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 创建/执行/清理任务 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
// 立即执行一次性的定时任务(Runnable),并返回任务本身
public Future<?> submit(Runnable command) {
return schedule(command, 0, NANOSECONDS);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
// 立即执行一次性的定时任务(Runnable),并返回任务本身;result为预设的返回结果
public <T> Future<T> submit(Runnable command, T result) {
return schedule(Executors.callable(command, result), 0, NANOSECONDS);
}
/**
* Executes {@code command} with zero required delay.
* This has effect equivalent to
* {@link #schedule(Runnable, long, TimeUnit) schedule(command, 0, anyUnit)}.
* Note that inspections of the queue and of the list returned by
* {@code shutdownNow} will access the zero-delayed
* {@link ScheduledFuture}, not the {@code command} itself.
*
* <p>A consequence of the use of {@code ScheduledFuture} objects is
* that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
* called with a null second {@code Throwable} argument, even if the
* {@code command} terminated abruptly. Instead, the {@code Throwable}
* thrown by such a task can be obtained via {@link Future#get}.
*
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution because the
* executor has been shut down
* @throws NullPointerException {@inheritDoc}
*/
// 立即执行一次性的定时任务(Runnable)
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
// 立即执行一次性的定时任务(Callable),并返回任务本身
public <T> Future<T> submit(Callable<T> command) {
return schedule(command, 0, NANOSECONDS);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
// 执行一次性的定时任务(Runnable),并返回任务本身:在任务启动后的initialDelay时长后开始执行
public ScheduledFuture<?> schedule(Runnable command, long initialDelay, TimeUnit unit) {
if(command == null || unit == null) {
throw new NullPointerException();
}
// 构造一次性的定时任务
ScheduledFutureTask<Void> task = new ScheduledFutureTask<>(command, null, triggerTime(initialDelay, unit), sequencer.getAndIncrement());
RunnableScheduledFuture<Void> future = decorateTask(command, task);
delayedExecute(future); // 执行定时任务
return future;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
// 执行一次性的定时任务(Callable),并返回任务本身:在任务启动后的initialDelay时长后开始执行
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long initialDelay, TimeUnit unit) {
if(callable == null || unit == null) {
throw new NullPointerException();
}
// 构造一次性的定时任务
ScheduledFutureTask<V> task = new ScheduledFutureTask<>(callable, triggerTime(initialDelay, unit), sequencer.getAndIncrement());
RunnableScheduledFuture<V> future = decorateTask(callable, task);
delayedExecute(future); // 执行定时任务
return future;
}
/**
* Submits a periodic action that becomes enabled first after the
* given initial delay, and subsequently with the given period;
* that is, executions will commence after
* {@code initialDelay}, then {@code initialDelay + period}, then
* {@code initialDelay + 2 * period}, and so on.
*
* <p>The sequence of task executions continues indefinitely until
* one of the following exceptional completions occur:
* <ul>
* <li>The task is {@linkplain Future#cancel explicitly cancelled}
* via the returned future.
* <li>Method {@link #shutdown} is called and the {@linkplain
* #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
* whether to continue after shutdown} is not set true, or method
* {@link #shutdownNow} is called; also resulting in task
* cancellation.
* <li>An execution of the task throws an exception. In this case
* calling {@link Future#get() get} on the returned future will throw
* {@link ExecutionException}, holding the exception as its cause.
* </ul>
* Subsequent executions are suppressed. Subsequent calls to
* {@link Future#isDone isDone()} on the returned future will
* return {@code true}.
*
* <p>If any execution of this task takes longer than its period, then
* subsequent executions may start late, but will not concurrently
* execute.
*
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
// 执行重复性的【固定周期】定时任务(Runnable),并返回任务本身:在任务启动后的initialDelay时长后开始执行,以后每隔period时长就被触发一次(即使上次被触发的任务还未执行完)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long rate, TimeUnit unit) {
if(command == null || unit == null) {
throw new NullPointerException();
}
if(rate<=0L) {
throw new IllegalArgumentException();
}
// 构造重复性的定时任务
ScheduledFutureTask<Void> task = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(rate), sequencer.getAndIncrement());
RunnableScheduledFuture<Void> future = decorateTask(command, task);
task.outerTask = future;
delayedExecute(future); // 执行定时任务
return future;
}
/**
* Submits a periodic action that becomes enabled first after the
* given initial delay, and subsequently with the given delay
* between the termination of one execution and the commencement of
* the next.
*
* <p>The sequence of task executions continues indefinitely until
* one of the following exceptional completions occur:
* <ul>
* <li>The task is {@linkplain Future#cancel explicitly cancelled}
* via the returned future.
* <li>Method {@link #shutdown} is called and the {@linkplain
* #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
* whether to continue after shutdown} is not set true, or method
* {@link #shutdownNow} is called; also resulting in task
* cancellation.
* <li>An execution of the task throws an exception. In this case
* calling {@link Future#get() get} on the returned future will throw
* {@link ExecutionException}, holding the exception as its cause.
* </ul>
* Subsequent executions are suppressed. Subsequent calls to
* {@link Future#isDone isDone()} on the returned future will
* return {@code true}.
*
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
// 执行重复性的【固定延时】定时任务(Runnable),并返回任务本身:在任务启动后的initialDelay时长后开始执行,任务下次的开始时间=任务上次结束时间+delay(必须等到上次的任务已经执行完)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if(command == null || unit == null) {
throw new NullPointerException();
}
if(delay<=0L) {
throw new IllegalArgumentException();
}
// 构造重复性的定时任务
ScheduledFutureTask<Void> task = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay), sequencer.getAndIncrement());
RunnableScheduledFuture<Void> future = decorateTask(command, task);
task.outerTask = future;
delayedExecute(future); // 执行定时任务
return future;
}
/**
* Modifies or replaces the task used to execute a runnable.
* This method can be used to override the concrete
* class used for managing internal tasks.
* The default implementation simply returns the given task.
*
* @param runnable the submitted Runnable
* @param task the task created to execute the runnable
* @param <V> the type of the task's result
*
* @return a task that can execute the runnable
*
* @since 1.6
*/
// 装饰任务,由子类实现;runnable是任务封装前的形态,task是任务封装后的形态
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
/**
* Modifies or replaces the task used to execute a callable.
* This method can be used to override the concrete
* class used for managing internal tasks.
* The default implementation simply returns the given task.
*
* @param callable the submitted Callable
* @param task the task created to execute the callable
* @param <V> the type of the task's result
*
* @return a task that can execute the callable
*
* @since 1.6
*/
// 装饰任务,由子类实现;callable是任务封装前的形态,task是任务封装后的形态
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
/**
* Main execution method for delayed or periodic tasks.
* If pool is shut down, rejects the task. Otherwise adds task to queue
* and starts a thread, if necessary, to run it. (We cannot
* prestart the thread to run the task because the task (probably)
* shouldn't be run yet.) If the pool is shut down while the task
* is being added, cancel and remove it if required by state and
* run-after-shutdown parameters.
*
* @param task the task
*/
// 执行定时任务
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果线程池至少处于【关闭】状态(不接收新线程)
if(isShutdown()) {
// 为指定的动作(任务)执行【拒绝策略】
reject(task);
return;
}
/* 至此,线程池处于【运行】状态 */
// 将任务加入阻塞队列(这里使用了延时队列)
super.getQueue().add(task);
// 如果指定的任务在当前线程池状态下无法执行,而且成功将该任务从阻塞队列中移除
if(!canRunInCurrentRunState(task) && remove(task)) {
// 那么可以中止任务(但不中断线程)
task.cancel(false);
} else {
// 当该任务可以执行,或者虽然不能执行,但又移除失败时,可以启动一个【N】型线程以扫描阻塞队列,以处理该任务
ensurePrestart();
}
}
/**
* Returns true if can run a task given current run state and run-after-shutdown parameters.
*/
// 判断指定的任务在当前线程池状态下是否可以执行
boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
// 如果线程池处于【运行】状态(可以接收新线程,也可以处理阻塞任务)
if(!isShutdown()) {
return true; // 允许执行任务
}
// 线程池至少处于【停止】状态(不接收新线程,也不处理阻塞任务)
if(isStopped()) {
return false; // 不允许执行任务
}
/* 至此,线程池处于【关闭】状态,即不接收新线程,但可以处理阻塞任务 */
// 判断任务是重复性任务(true)还是一次性任务(false)
boolean periodic = task.isPeriodic();
if(periodic){
// 如果该任务是重复性任务,则需要判断线程池处于【关闭】状态时是否允许执行重复性任务,默认为false
return continueExistingPeriodicTasksAfterShutdown;
} else {
/*
* 如果该任务是一次性任务,则需要判断线程池处于【关闭】状态时是否允许执行一次性任务,默认为true
*
* 对于一次性任务,还要进一步考虑其延时属性:
* 如果不允许在线程池关闭时执行一次性任务,但是该任务已经到期了,那么此处也允许直接执行这个到期的任务,而那些没到期的任务会被丢弃
*/
return executeExistingDelayedTasksAfterShutdown || task.getDelay(NANOSECONDS)<=0;
}
}
/**
* Requeues a periodic task unless current run state precludes it.
* Same idea as delayedExecute except drops task rather than rejecting.
*
* @param task the task
*/
// 处理重复性任务:将该任务再次加入阻塞队列
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 如果指定的任务在当前线程池状态下可以执行
if(canRunInCurrentRunState(task)) {
// 将任务加入阻塞队列
super.getQueue().add(task);
if(canRunInCurrentRunState(task) || !remove(task)) {
// 当该任务可以执行,或者虽然不能执行,但又移除失败时,可以启动一个【N】型线程以扫描阻塞队列,以处理该任务
ensurePrestart();
return;
}
}
// 中止任务(但不中断线程)
task.cancel(false);
}
/*▲ 创建/执行/清理任务 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 运行状态 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
* has been set {@code false}, existing delayed tasks whose delays
* have not yet elapsed are cancelled. And unless the {@code
* ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
* {@code true}, future executions of existing periodic tasks will
* be cancelled.
*
* @throws SecurityException {@inheritDoc}
*/
/*
* 先让线程池进入{0}【关闭】状态(直到成功),再尝试进入{3}【终止】状态(不一定成功);
* 关闭过程中会为线程池中所有【空闲】Worker设置中断标记。
*/
public void shutdown() {
super.shutdown();
}
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* interrupts tasks via {@link Thread#interrupt}; any task that
* fails to respond to interrupts may never terminate.
*
* @return list of tasks that never commenced execution.
* Each element of this list is a {@link ScheduledFuture}.
* For tasks submitted via one of the {@code schedule}
* methods, the element will be identical to the returned
* {@code ScheduledFuture}. For tasks submitted using
* {@link #execute execute}, the element will be a
* zero-delay {@code ScheduledFuture}.
*
* @throws SecurityException {@inheritDoc}
*/
/*
* 先让线程池进入{1}【停止】状态(直到成功),再尝试进入{3}【终止】状态(不一定成功);
* 关闭过程中会为线程池中所有【空闲】Worker设置中断标记;
* 返回阻塞队列中未处理的阻塞任务。
*/
public List<Runnable> shutdownNow() {
return super.shutdownNow();
}
/**
* Cancels and clears the queue of all tasks that should not be run due to shutdown policy.
* Invoked within super.shutdown.
*/
// 运行在线程池【关闭】之后的回调,通常可用来处理阻塞队列中的任务,或者进行其他收尾工作
@Override
void onShutdown() {
// 获取阻塞队列
BlockingQueue<Runnable> taskQueue = super.getQueue();
// 线程池处于【关闭】状态时是否允许执行一次性任务,默认为true
boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
// 线程池处于【关闭】状态时是否允许执行重复性任务,默认为false
boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
/* Traverse snapshot to avoid iterator exceptions */
// TODO: implement and use efficient removeIf
// super.getQueue().removeIf(...);
// 遍历阻塞队列中的任务
for(Object task : taskQueue.toArray()) {
// 只处理重复性或延时性任务
if(!(task instanceof RunnableScheduledFuture)) {
continue;
}
RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>) task;
// 是否应当终止对该任务的处理
boolean stop;
// 如果是重复性任务
if(t.isPeriodic()) {
stop = !keepPeriodic;
} else {
// 对于一次性任务,还要进一步考虑其延时属性
stop = !keepDelayed && t.getDelay(NANOSECONDS)>0;
}
// 如果任务应当被停止,或者任务已被中止
if(stop || t.isCancelled()) { // also remove if already cancelled
// 尝试将该任务阻塞队列中移除
if(taskQueue.remove(t)) {
// 如果该任务被成功移除,重复中止任务(不中断线程)
t.cancel(false);
}
}
}
// 尝试让线程池进入{3}【终止】状态
tryTerminate();
}
/*▲ 运行状态 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 【定时任务线程池】属性 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Gets the policy on whether to continue executing existing
* periodic tasks even when this executor has been {@code shutdown}.
* In this case, executions will continue until {@code shutdownNow}
* or the policy is set to {@code false} when already shutdown.
* This value is by default {@code false}.
*
* @return {@code true} if will continue after shutdown
*
* @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
*/
// 返回值含义:线程池处于【关闭】状态时是否允许执行重复性任务,默认为false
public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
return continueExistingPeriodicTasksAfterShutdown;
}
/**
* Sets the policy on whether to continue executing existing
* periodic tasks even when this executor has been {@code shutdown}.
* In this case, executions will continue until {@code shutdownNow}
* or the policy is set to {@code false} when already shutdown.
* This value is by default {@code false}.
*
* @param value if {@code true}, continue after shutdown, else don't
*
* @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
*/
// 设置线程池处于【关闭】状态时是否允许执行重复性任务
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
continueExistingPeriodicTasksAfterShutdown = value;
// 如果设置为不允许,且当前线程池已关闭
if(!value && isShutdown()) {
// 需要处理阻塞队列中的任务,并尝试终止线程池
onShutdown();
}
}
/**
* Gets the policy on whether to execute existing delayed
* tasks even when this executor has been {@code shutdown}.
* In this case, these tasks will only terminate upon
* {@code shutdownNow}, or after setting the policy to
* {@code false} when already shutdown.
* This value is by default {@code true}.
*
* @return {@code true} if will execute after shutdown
*
* @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
*/
// 返回值含义:线程池处于【关闭】状态时是否允许执行一次性任务,默认为true
public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
return executeExistingDelayedTasksAfterShutdown;
}
/**
* Sets the policy on whether to execute existing delayed
* tasks even when this executor has been {@code shutdown}.
* In this case, these tasks will only terminate upon
* {@code shutdownNow}, or after setting the policy to
* {@code false} when already shutdown.
* This value is by default {@code true}.
*
* @param value if {@code true}, execute after shutdown, else don't
*
* @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
*/
// 设置线程池处于【关闭】状态时是否允许执行一次性任务
public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
executeExistingDelayedTasksAfterShutdown = value;
// 如果设置为不允许,且当前线程池已关闭
if(!value && isShutdown()) {
// 需要处理阻塞队列中的任务,并尝试终止线程池
onShutdown();
}
}
/**
* Gets the policy on whether cancelled tasks should be immediately
* removed from the work queue at time of cancellation. This value is
* by default {@code false}.
*
* @return {@code true} if cancelled tasks are immediately removed
* from the queue
*
* @see #setRemoveOnCancelPolicy
* @since 1.7
*/
// 返回值含义:是否允许移除被中止的任务
public boolean getRemoveOnCancelPolicy() {
return removeOnCancel;
}
/**
* Sets the policy on whether cancelled tasks should be immediately
* removed from the work queue at time of cancellation. This value is
* by default {@code false}.
*
* @param value if {@code true}, remove on cancellation, else don't
*
* @see #getRemoveOnCancelPolicy
* @since 1.7
*/
// 设置是否允许移除被中止的任务
public void setRemoveOnCancelPolicy(boolean value) {
removeOnCancel = value;
}
/**
* Returns the task queue used by this executor. Access to the
* task queue is intended primarily for debugging and monitoring.
* This queue may be in active use. Retrieving the task queue
* does not prevent queued tasks from executing.
*
* <p>Each element of this queue is a {@link ScheduledFuture}.
* For tasks submitted via one of the {@code schedule} methods, the
* element will be identical to the returned {@code ScheduledFuture}.
* For tasks submitted using {@link #execute execute}, the element
* will be a zero-delay {@code ScheduledFuture}.
*
* <p>Iteration over this queue is <em>not</em> guaranteed to traverse
* tasks in the order in which they will execute.
*
* @return the task queue
*/
// 返回阻塞队列
public BlockingQueue<Runnable> getQueue() {
return super.getQueue();
}
/*▲ 【定时任务线程池】属性 ████████████████████████████████████████████████████████████████████████████████┛ */
/*▼ 任务触发时间 ████████████████████████████████████████████████████████████████████████████████┓ */
/**
* Returns the nanoTime-based trigger time of a delayed action.
*/
// 计算任务的触发时间(=当前时刻+delay,时间单位是unit)
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos(delay<0 ? 0 : delay));
}
/**
* Returns the nanoTime-based trigger time of a delayed action.
*/
// 计算任务的触发时间(=当前时刻+delay,时间单位是纳秒)。仅在内部使用,delay>=0
long triggerTime(long delay) {
return System.nanoTime() + ((delay<(Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
/**
* Constrains the values of all delays in the queue to be within Long.MAX_VALUE of each other, to avoid overflow in compareTo.
* This may occur if a task is eligible to be dequeued, but has not yet been, while some other task is added with a delay of Long.MAX_VALUE.
*/
// 处理溢出:该方法在delay>=(Long.MAX_VALUE >> 1)时被调用
private long overflowFree(long delay) {
// 查看队头任务
Delayed head = (Delayed) super.getQueue().peek();
if(head != null) {
// 获取距任务触发还剩余的时间
long headDelay = head.getDelay(NANOSECONDS);
if(headDelay<0 && (delay-headDelay<0)) {
delay = Long.MAX_VALUE + headDelay;
}
}
return delay;
}
/*▲ 任务触发时间 ████████████████████████████████████████████████████████████████████████████████┛ */
// 定时任务,是一个带有延时特性的FutureTask
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
/** The actual task to be re-enqueued by reExecutePeriodic */
// 记录任务自身,以便后续重复执行
RunnableScheduledFuture<V> outerTask = this;
/**
* Index into delay queue, to support faster cancellation.
*/
// 当前任务/元素在延时队列(小顶堆)中的索引
int heapIndex;
/**
* Period for repeating tasks, in nanoseconds.
* A positive value indicates fixed-rate execution.
* A negative value indicates fixed-delay execution.
* A value of 0 indicates a non-repeating (one-shot) task.
*/
/*
* 任务的重复模式:
*
* 零 :非重复任务:只执行一次
* 正数:重复性任务:【固定周期】,从任务初次被触发开始,以后每隔period时长就被触发一次(即使上次被触发的任务还未执行完)
* 负数:重复性任务:【固定延时】,任务下次的开始时间=任务上次结束时间+|period|(必须等到上次的任务已经执行完)
*/
private final long period;
/** The nanoTime-based time when the task is enabled to execute. */
// 任务下次被触发的时间(绝对时间,时间单位是纳秒)
private volatile long time;
/** Sequence number to break ties FIFO */
// 任务加入队列的次序
private final long sequenceNumber;
/**
* Creates a one-shot action with given nanoTime-based trigger time.
*/
// 构造一次性的定时任务:在triggerTime时刻开始执行
ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber) {
super(r, result);
this.time = triggerTime;
this.period = 0;
this.sequenceNumber = sequenceNumber;
}
/**
* Creates a one-shot action with given nanoTime-based trigger time.
*/
// 构造一次性的定时任务
ScheduledFutureTask(Callable<V> callable, long triggerTime, long sequenceNumber) {
super(callable);
this.time = triggerTime;
this.period = 0;
this.sequenceNumber = sequenceNumber;
}
/**
* Creates a periodic action with given nanoTime-based initial
* trigger time and period.
*/
// 构造重复性的定时任务
ScheduledFutureTask(Runnable r, V result, long triggerTime, long period, long sequenceNumber) {
super(r, result);
this.time = triggerTime;
this.period = period;
this.sequenceNumber = sequenceNumber;
}
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
// 执行定时任务
public void run() {
// 如果指定的任务在当前线程池状态下无法执行
if(!canRunInCurrentRunState(this)) {
// 中止任务(但不中断线程)
cancel(false);
} else if(!isPeriodic()) {
// 非重复任务,直接执行(只执行一次)
super.run();