对于ScheduleExecutorService来说,你给我设定的调度周期是1秒,那么我当然1秒就会去运行一次你,但是运行1秒后发现你还在运行,那我是再次运行你还是等你运行完成再调度你运行?
当然,这都是我的主观臆断来猜测ScheduleExecutorService的原理,ScheduleExecutorService的真正原理需要去阅读源码来理解,下面带着这个问题,以解决这个问题为目标去看一下ScheduleExecutorService的源码吧。
首先,我们使用下面的代码作为测试:
private static Runnable blockRunner = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("one round:" + new Date()); } catch (InterruptedException e) { e.printStackTrace(); } }; private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); public static void main(String ... args) { scheduledExecutorService .scheduleAtFixedRate(blockRunner, 0, 100, TimeUnit.MILLISECONDS); }
先来看一下scheduleAtFixedRate这个方法:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
我们的任务命令被包装了两次,一次变成了一个ScheduledFutureTask类型的对象,然后又变成了RunnableScheduledFuture类型的对象。然后执行了一个方法delayedExecute,这个方法字面意思上看起来像是延时执行的意思,看一下它的代码:
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
它的执行逻辑是:如果线程池被关闭了,那么拒绝提交的任务,否则,将该任务添加队列中去这个队列就是的ThreadPoolExecutor中的工作队列,而这个工作队列是在的ThreadPoolExecutor的构造函数中被初始化的,也就是下面这关键的一句:
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
也就是说,我们的任务被添加到了一个DelayedWorkQueue队列中去了,我们最后找到ScheduledFutureTask实现了延迟的getDelay方法。
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
时间变量是什么?原来是延迟,好像和周期无关啊!分析了这么久,发现这是第一次执行任务的逻辑啊,我想知道的是第二次,第三次以后和初始的延迟无关之后的周期调度的情况啊,继续找吧!
然后发现了ScheduledFutureTask的运行方法,很明显这就是任务调度被执行的关键所在,看下代码:
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } } }
最为关键的地方在于:
else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); }
首先是:runAndReset()这个方法,然后是setNextRunTime()这个方法,然后是reExecutePeriodic(outerTask)这个方法。
第一个方法runAndReset()貌似是执行我们的提交的任务的,我们看下代码:
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
关键的地方是c.call()这一句,这个Ç就是我们提交的任务。
第二个方法setNextRunTime()的意思是设置下次执行的时间,下面是他的代码细节:
private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
我们只需要看P> 0这个分支就可以了,其实这是两种策略。我们的示例对应了第一个分支的策略,所以很显然,时间这个变量会被加P,而p则是我们设。定好的周期下面我们找一下这个时间是在哪里初始化的,回忆一下scheduleAtFixedRate这个方法的内,我们说我们的任务被包装了两次,而时间就是在这里被初始化的:
/** * Returns the trigger time of a delayed action. */ private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } /** * Returns the trigger time of a delayed action. */ long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
无论如何,我们知道一个任务会被运行完一次之后再次设置时间,然后线程池会获取任务来执行,而任务队列是一个延时阻塞队列,所以也就造成了周期性运行的假象。可以看下下面获取任务的采取方法:
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
可以看到,如果延迟小于等于0,那么就是说需要被立即调度,否则延时延迟这样一段时间。也就是延时消费。
结论就是,一个任务会被重复添加到一个延时任务队列,所以同一时间任务队列中会有多个任务待调度,线程池会首先获取优先级高的任务执行。
登录 | 立即注册