のらぬこの日常を描く

ノージャンルのお役立ち情報やアニメとゲームの話、ソフトウェア開発に関する話などを中心としたブログです。

android向けの軽量Job Systemを作った話

こんにちは。のらぬこです。

今回は、いま開発中のandroidアプリで必要になった、指定されたメソッドを順にバックグラウンドで実行してくれるJobQueueの仕組みを実装した話をします。

既存のものもいくつか調べたのですが、僕が欲しかった機能を満たしているものがぱっと見でなさそうだったので自作することにしました。

欲しかった機能

  • Jobの優先順位がつけられる
  • 失敗時や例外発生時にリトライができる
  • 定期的に繰り返し実行されるJobとしてQueueに登録できる
  • とにかくスケジューラが軽量
    • 短時間で終わるJobを数千単位で登録するような用途のため
  • Jobの登録はマルチスレッド対応

できれば欲しかった機能

  • Jobのグループ化
    • 同一グループのJobを、一括で有効/無効化したい
      • ネットワークが切れたら特定の種類のJobの起動を抑制したい等の用途

特にいらなかった機能

  • 複数Jobの並列実行(あってもよかったかなー)
  • Queueのシリアライズ

実装してみた

てことで、欲しかった機能+できれば欲しかった機能を詰め込んで実装してみました。 android固有のclassやmethodはログ出力の部分くらいしか使っていないので、ほぼ何も変更することなく、他の用途でも使えると思います。

そのうちちゃんとプロジェクト作ってgithubとかに載せるかもしれませんが、とりあえずソースをベタっと貼っておきます。

とりあえず、ソースはgithubにあげました。readmeやサンプルなどは近々更新します(あくまで予定です)。

GitHub - noranuk0/panther-job: Simple and lightweight job system with priorities and group ideas

JobManager.instnce() で JobManagerのインスタンスを取得したら、Job classの派生クラスを Jobとしてガンガン登録すればあとはなんか良きに計らってくれます。

Jobグループ有効化、無効化のメソッドなどもこのclassに用意されています。

リトライ(再実行)の設定は、Jobが完了したとき、例外で落ちたときに呼ばれるCallbackクラスで設定します。
RepeatedlyRunCallback クラスの実装を見てください。

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * JobManager.java
 * <p/>
 * Copyright (c) 2016 Hiroyuki Mizuhara
 * <p/>
 * This software is released under the MIT License.
 * http://opensource.org/licenses/mit-license.php
 */
public class JobManager {
    public static final int PRIORITY_MIN = 0;
    public static final int PRIORITY_BELOW_NORMAL = 1;
    public static final int PRIORITY_NORMAL = 2;
    public static final int PRIORITY_ABOVE_NORMAL = 3;
    public static final int PRIORITY_MAX = 4;

    public static final int MAX_JOB_GROUP_ID = 30;

    @SuppressWarnings("unchecked")
    private final List<Job>[] priorityJobList = new ArrayList[PRIORITY_MAX + 1];

    private final long[] jobGroupSuspendLimitTimes = new long[MAX_JOB_GROUP_ID];
    private int currentEnabledJobGroupMask = 0x7fffffff;

    private static JobManager instance;
    private Worker worker;

    private static void initialize() {
        instance = new JobManager();
    }

    public static JobManager instance() {
        if (instance == null) {
            initialize();
        }
        return instance;
    }

    private class Worker extends Thread {
        public Worker() {
            super("Job#WorkerThread");
        }

        private Job get() {

            final int[] workJobSizeArray = new int[PRIORITY_MAX + 1];
            final long[] workJobGroupSuspendLimitTimes = new long[MAX_JOB_GROUP_ID];
            Job result = null;
            while (result == null) {
                long nextScheduledJobTime = Long.MAX_VALUE;
                long currentTimeMillis = System.currentTimeMillis();
                synchronized (priorityJobList) {
                    for (int priority = PRIORITY_MAX; priority >= PRIORITY_MIN; priority--) {
                        workJobSizeArray[priority] = priorityJobList[priority].size();
                    }
                }
                int workActiveJobGroupMask;
                synchronized (JobManager.this) {
                    workActiveJobGroupMask = JobManager.this.currentEnabledJobGroupMask;
                    System.arraycopy(
                            JobManager.this.jobGroupSuspendLimitTimes, 0,
                            workJobGroupSuspendLimitTimes,
                            0, workJobGroupSuspendLimitTimes.length);
                }
                for (int index = 0; index < MAX_JOB_GROUP_ID; index++) {
                    if (workJobGroupSuspendLimitTimes[index] <= currentTimeMillis) {
                        workJobGroupSuspendLimitTimes[index] = 0;
                    } else {
                        if (workJobGroupSuspendLimitTimes[index] < nextScheduledJobTime) {
                            nextScheduledJobTime = workJobGroupSuspendLimitTimes[index];
                        }
                        workActiveJobGroupMask &= ~(1 << index);
                    }
                }
                for (int priority = PRIORITY_MAX; priority >= PRIORITY_MIN; priority--) {
                    List<Job> targetList = priorityJobList[priority];
                    for (int index = 0; index < workJobSizeArray[priority]; index++) {
                        Job target = targetList.get(index);
                        if ((target.getJobGroupMask() & workActiveJobGroupMask)
                                != target.getJobGroupMask()) {
                            continue;
                        }
                        if ((target.getJobGroupMask() & workActiveJobGroupMask)
                                != target.getJobGroupMask()) {
                            continue;
                        }
                        if (nextScheduledJobTime > target.getNextExecuteSystemTime()) {
                            nextScheduledJobTime = target.getNextExecuteSystemTime();
                        }
                        if (target.getNextExecuteSystemTime() > currentTimeMillis) {
                            continue;
                        }
                        targetList.remove(index);
                        target.setNextScheduleTime(0);
                        result = target;
                        break;
                    }
                    if (result != null) {
                        break;
                    }
                }
                if (result == null) {
                    try {
                        synchronized (worker) {
                            if (nextScheduledJobTime < Long.MAX_VALUE) {
                                currentTimeMillis = System.currentTimeMillis();
                                wait(nextScheduledJobTime - currentTimeMillis);
                            } else {
                                wait();
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            return result;
        }

        @Override
        public void run() {
            while (true) {
                final long systemTime = System.currentTimeMillis();
                Job target = get();
                Callback<Job> callback = target.getCallback();
                try {
                    boolean result =
                            target.execute();
                    if (result) {
                        if (callback != null) {
                            callback.success(target);
                        }
                    } else {
                        if (callback != null) {
                            callback.fail(target);
                        }
                    }
                } catch (Exception e) {
                    try {
                        if (callback != null) {
                            callback.exception(target, e);
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                if (target.getNextExecuteSystemTime() >= systemTime) {
                    JobManager.this.register(target);
                }
            }
        }
    }

    private JobManager() {
        for (int index = PRIORITY_MIN; index <= PRIORITY_MAX; index++) {
            priorityJobList[index] = new ArrayList<>();
        }
        this.worker = new Worker();
        worker.start();
    }

    public void register(Job job) {
        synchronized (priorityJobList) {
            final List<Job> targetList = priorityJobList[job.getPriority()];
            targetList.add(job);
        }
        synchronized (worker) {
            worker.notify();
        }
    }

    public void suspendGroup(int jobGroupId, long wakeUpTime) {
        long current = System.currentTimeMillis();
        synchronized (this) {
            jobGroupSuspendLimitTimes[jobGroupId] = wakeUpTime;
        }
        if (wakeUpTime <= current) {
            synchronized (worker) {
                worker.notify();
            }
        }
    }

    public Map<Class<?>, Integer> count() {
        Map<Class<?>, Integer> result = new HashMap<>();
        synchronized(this) {
            for (List<Job> listJob : priorityJobList) {
                for (Job job : listJob) {
                    if (result.get(job.getClass()) == null) {
                        result.put(job.getClass(), 0);
                    }
                    result.put(job.getClass(), result.get(job.getClass()) + 1);
                }
            }
        }
        return result;
    }
    
    public void enableGroup(int groupId) {
        synchronized (this) {
            currentEnabledJobGroupMask |= (1 << groupId);
        }
        synchronized (worker) {
            worker.notify();
        }
    }

    public void disableGroup(int groupId) {
        synchronized (this) {
            currentEnabledJobGroupMask &= ~(1 << groupId);
        }
    }

    public void cleanUp() {
        synchronized (this) {
            JobManager.instance = null;
            JobManager.initialize();
        }
    }
}
/**
 * Job.java
 * <p/>
 * Copyright (c) 2016 Hiroyuki Mizuhara
 * <p/>
 * This software is released under the MIT License.
 * http://opensource.org/licenses/mit-license.php
 */
public abstract class Job {
    public int getPriority() {
        return priority;
    }

    public int getJobGroupMask() {
        return jobGroupMask;
    }

    public long getNextExecuteSystemTime() {
        return nextExecuteSystemTime;
    }

    public void setNextScheduleTime(long nextExecuteSystemTime) {
        this.nextExecuteSystemTime = nextExecuteSystemTime;
    }

    public Callback getCallback() {
        return callback;
    }

    public static class Builder {
        private int priority = JobManager.PRIORITY_NORMAL;
        private int jobGroupMask = 0;   /* note set */
        private long nextExecuteSystemTime = 0;
        private Callback callback;

        private Builder(int priority) {
            if (priority >= JobManager.PRIORITY_MIN && priority <= JobManager.PRIORITY_MAX) {
                this.priority = priority;
            } else {
                throw new IllegalArgumentException();
            }
        }

        public static Builder create(int priority) {
            return new Builder(priority);
        }

        public Builder nextExecuteSystemTime(long systemTime) {
            this.nextExecuteSystemTime = systemTime;
            return this;
        }

        public Builder addGroup(int addGroupIndex) {
            if (addGroupIndex >= 0 && addGroupIndex <= JobManager.MAX_JOB_GROUP_ID) {
                this.jobGroupMask |= (1 << addGroupIndex);
                return this;
            } else {
                throw new IllegalArgumentException();
            }
        }

        public Builder callback(Callback callback) {
            this.callback = callback;
            return this;
        }

        public int getPriority() {
            return priority;
        }

        public int getJobGroupMask() {
            return jobGroupMask;
        }

        public long nextExecuteSystemTime() {
            return nextExecuteSystemTime;
        }

        public Callback getCallback() {
            return callback;
        }
    }

    public Job(Builder builder) {
        this.priority = builder.getPriority();
        this.jobGroupMask = builder.getJobGroupMask();
        this.nextExecuteSystemTime = builder.nextExecuteSystemTime();
        this.callback = builder.getCallback();
    }

    protected final int priority;
    protected final int jobGroupMask;
    protected long nextExecuteSystemTime;
    protected final Callback callback;

    public String description() {
        return this.getClass().getSimpleName();
    }

    public boolean containJobGroup(int groupId) {
        return ((jobGroupMask >> groupId) & 0x1) != 0;
    }

    public abstract boolean execute() throws Exception;
}
package com.mknk6556655.noranuko.job;

/**
 * Callback.java
 * <p/>
 * Copyright (c) 2016 Hiroyuki Mizuhara
 * <p/>
 * This software is released under the MIT License.
 * http://opensource.org/licenses/mit-license.php
 */
public class Callback<T extends Job> {
    public void success(T sender) {
    }

    public void fail(T sender) {
    }

    public void exception(T sender, Exception e) {
    }

}
package com.mknk6556655.noranuko.job;

/**
 * RepeatedlyRunCallback.java
 * <p/>
 * Copyright (c) 2016 Hiroyuki Mizuhara
 * <p/>
 * This software is released under the MIT License.
 * http://opensource.org/licenses/mit-license.php
 */
public class RepeatedlyRunCallback<T extends Job> extends Callback<T> {
    private int delayTimeIfSuccessful;
    private int delayTimeIfFailed;
    private int delayTimeIfException;

    public RepeatedlyRunCallback(int delayTimeIfSuccessful,
                                 int delayTimeIfFailed,
                                 int delayTimeIfException) {
        this.delayTimeIfSuccessful = delayTimeIfSuccessful;
        this.delayTimeIfFailed = delayTimeIfFailed;
        this.delayTimeIfException = delayTimeIfException;
    }

    public void success(T sender) {
        if (delayTimeIfSuccessful > 0) {
            long current = System.currentTimeMillis();
            sender.setNextScheduleTime(current + delayTimeIfSuccessful);
        }
    }

    @Override
    public void fail(T sender) {
        if (delayTimeIfFailed > 0) {
            long current = System.currentTimeMillis();
            sender.setNextScheduleTime(current + delayTimeIfFailed);
        }
    }

    @Override
    public void exception(T sender, Exception e) {
        if (delayTimeIfException > 0) {
            long current = System.currentTimeMillis();
            sender.setNextScheduleTime(current + delayTimeIfException);
        }
    }
}

そのうちもうちょっとましな説明書くかもしれませんがとりあえず今回はこのへんで、ということで。

お読みいただいてありがとうございました。