JobPlus知识库 IT 软件开发 文章
Java的调度线程池ScheduleExecutorService

对于ScheduleExecutorService来说,你给我设定的调度周期是1秒,那么我当然1秒就会去运行一次你,但是运行1秒后发现你还在运行,那我是再次运行你还是等你运行完成再调度你运行?

当然,这都是我的主观臆断来猜测ScheduleExecutorService的原理,ScheduleExecutorService的真正原理需要去阅读源码来理解,下面带着这个问题,以解决这个问题为目标去看一下ScheduleExecutorService的源码吧。

首先,我们使用下面的代码作为测试:

   private static Runnable blockRunner = () -> {        try {            TimeUnit.SECONDS.sleep(2);            System.out.println("one round:" + new Date());        } catch (InterruptedException e) {            e.printStackTrace();        }    };    private static ScheduledExecutorService scheduledExecutorService =            Executors.newScheduledThreadPool(2);    public static void main(String ... args) {        scheduledExecutorService                .scheduleAtFixedRate(blockRunner, 0, 100, TimeUnit.MILLISECONDS);    }

先来看一下scheduleAtFixedRate这个方法:

   public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,                                                  long initialDelay,                                                  long period,                                                  TimeUnit unit) {        if (command == null || unit == null)            throw new NullPointerException();        if (period <= 0)            throw new IllegalArgumentException();        ScheduledFutureTask<Void> sft =            new ScheduledFutureTask<Void>(command,                                          null,                                          triggerTime(initialDelay, unit),                                          unit.toNanos(period));        RunnableScheduledFuture<Void> t = decorateTask(command, sft);        sft.outerTask = t;        delayedExecute(t);        return t;    }

我们的任务命令被包装了两次,一次变成了一个ScheduledFutureTask类型的对象,然后又变成了RunnableScheduledFuture类型的对象。然后执行了一个方法delayedExecute,这个方法字面意思上看起来像是延时执行的意思,看一下它的代码:

   private void delayedExecute(RunnableScheduledFuture<?> task) {        if (isShutdown())            reject(task);        else {            super.getQueue().add(task);            if (isShutdown() &&                !canRunInCurrentRunState(task.isPeriodic()) &&                remove(task))                task.cancel(false);            else                ensurePrestart();        }    }

它的执行逻辑是:如果线程池被关闭了,那么拒绝提交的任务,否则,将该任务添加队列中去这个队列就是的ThreadPoolExecutor中的工作队列,而这个工作队列是在的ThreadPoolExecutor的构造函数中被初始化的,也就是下面这关键的一句:

   public ScheduledThreadPoolExecutor(int corePoolSize,                                       ThreadFactory threadFactory,                                       RejectedExecutionHandler handler) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue(), threadFactory, handler);    }

也就是说,我们的任务被添加到了一个DelayedWorkQueue队列中去了,我们最后找到ScheduledFutureTask实现了延迟的getDelay方法。

       public long getDelay(TimeUnit unit) {            return unit.convert(time - now(), NANOSECONDS);        }

时间变量是什么?原来是延迟,好像和周期无关啊!分析了这么久,发现这是第一次执行任务的逻辑啊,我想知道的是第二次,第三次以后和初始的延迟无关之后的周期调度的情况啊,继续找吧!

然后发现了ScheduledFutureTask的运行方法,很明显这就是任务调度被执行的关键所在,看下代码:

       public void run() {            boolean periodic = isPeriodic();            if (!canRunInCurrentRunState(periodic))                cancel(false);            else if (!periodic)                ScheduledFutureTask.super.run();            else if (ScheduledFutureTask.super.runAndReset()) {                setNextRunTime();                reExecutePeriodic(outerTask);            }        }    }

最为关键的地方在于:

           else if (ScheduledFutureTask.super.runAndReset()) {                setNextRunTime();                reExecutePeriodic(outerTask);            }

首先是:runAndReset()这个方法,然后是setNextRunTime()这个方法,然后是reExecutePeriodic(outerTask)这个方法。
第一个方法runAndReset()貌似是执行我们的提交的任务的,我们看下代码:

   protected boolean runAndReset() {        if (state != NEW ||            !UNSAFE.compareAndSwapObject(this, runnerOffset,                                         null, Thread.currentThread()))            return false;        boolean ran = false;        int s = state;        try {            Callable<V> c = callable;            if (c != null && s == NEW) {                try {                    c.call(); // don't set result                    ran = true;                } catch (Throwable ex) {                    setException(ex);                }            }        } finally {            // runner must be non-null until state is settled to            // prevent concurrent calls to run()            runner = null;            // state must be re-read after nulling runner to prevent            // leaked interrupts            s = state;            if (s >= INTERRUPTING)                handlePossibleCancellationInterrupt(s);        }        return ran && s == NEW;    }

关键的地方是c.call()这一句,这个Ç就是我们提交的任务。
第二个方法setNextRunTime()的意思是设置下次执行的时间,下面是他的代码细节:

       private void setNextRunTime() {            long p = period;            if (p > 0)                time += p;            else                time = triggerTime(-p);        }

我们只需要看P> 0这个分支就可以了,其实这是两种策略。我们的示例对应了第一个分支的策略,所以很显然,时间这个变量会被加P,而p则是我们设。定好的周期下面我们找一下这个时间是在哪里初始化的,回忆一下scheduleAtFixedRate这个方法的内,我们说我们的任务被包装了两次,而时间就是在这里被初始化的:

   /**     * Returns the trigger time of a delayed action.     */    private long triggerTime(long delay, TimeUnit unit) {        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));    }    /**     * Returns the trigger time of a delayed action.     */    long triggerTime(long delay) {        return now() +            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));    }

无论如何,我们知道一个任务会被运行完一次之后再次设置时间,然后线程池会获取任务来执行,而任务队列是一个延时阻塞队列,所以也就造成了周期性运行的假象。可以看下下面获取任务的采取方法:

 public RunnableScheduledFuture<?> take() throws InterruptedException {            final ReentrantLock lock = this.lock;            lock.lockInterruptibly();            try {                for (;;) {                    RunnableScheduledFuture<?> first = queue[0];                    if (first == null)                        available.await();                    else {                        long delay = first.getDelay(NANOSECONDS);                        if (delay <= 0)                            return finishPoll(first);                        first = null; // don't retain ref while waiting                        if (leader != null)                            available.await();                        else {                            Thread thisThread = Thread.currentThread();                            leader = thisThread;                            try {                                available.awaitNanos(delay);                            } finally {                                if (leader == thisThread)                                    leader = null;                            }                        }                    }                }            } finally {                if (leader == null && queue[0] != null)                    available.signal();                lock.unlock();            }        }

可以看到,如果延迟小于等于0,那么就是说需要被立即调度,否则延时延迟这样一段时间。也就是延时消费。

结论就是,一个任务会被重复添加到一个延时任务队列,所以同一时间任务队列中会有多个任务待调度,线程池会首先获取优先级高的任务执行。


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
169人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序