Java延时队列任务Delayed实现的失败重试任务(可以直接使用)

RetryBean重试配置

import java.io.Serializable;
import java.util.Date;

/**
 * 重试实体
 *
 * @author yanzhao
 */
public class RetryBean implements Serializable {
    
    

    public RetryBean() {
    
    
        this.createTime = new Date();
    }

    /**
     * 创建时间
     */
    private Date createTime;

    /**
     * 下次重试时间
     **/
    private Date nextRetryTime;

    /**
     * 重试次数
     **/
    private Integer retryTimes = 0;

    /**
     * 限制重试次数
     **/
    private Integer leastRetryTimes = 50;

    public Date getCreateTime() {
    
    
        return createTime;
    }

    public void setCreateTime(Date createTime) {
    
    
        this.createTime = createTime;
    }

    public Date getNextRetryTime() {
    
    
        return nextRetryTime;
    }

    public void setNextRetryTime(Date nextRetryTime) {
    
    
        this.nextRetryTime = nextRetryTime;
    }

    public Integer getRetryTimes() {
    
    
        return retryTimes;
    }

    public void setRetryTimes(Integer retryTimes) {
    
    
        this.retryTimes = retryTimes;
    }

    public Integer getLeastRetryTimes() {
    
    
        return leastRetryTimes;
    }

    public void setLeastRetryTimes(Integer leastRetryTimes) {
    
    
        this.leastRetryTimes = leastRetryTimes;
    }

    public boolean hasChance() {
    
    
        return this.retryTimes < leastRetryTimes;
    }

    public void reduceChange() {
    
    
        this.retryTimes++;
    }
}

重试任务回调接口

**
 * 重试任务回调接口
 *
 * @author yanzhao
 */
public interface Callback {
    
    

    /**
     * 每次重试回调
     *
     * @throws Exception
     */
    void onceRetry() throws Exception;

    /**
     * 超过重试次数回调
     *
     * @throws Exception
     */
    void exceedTimesRetry() throws Exception;

    /**
     * 中断回调
     *
     * @throws Exception
     */
    void interrupt() throws Exception;
}

用户自定义Task接口

/**
 * 自定义任务接口
 */
public interface Task extends Callback {
    
    

    /**
     * 业务逻辑
     *
     * @throws Exception
     */
    void fire() throws Exception;

    /**
     * 重写此方法,输出有效的日志信息,例如id等,
     * 此方法会在任务执行时被调用,方便查看日志
     *
     * @return
     */
    String toString();
}

重试间隔RetryInterval接口

import java.util.Date;

/**
 * 重试时间间隔
 */
public interface RetryInterval {
    
    
    Date interval(Date date);
}

默认重试间隔DefaultRetryInterval实现

import java.util.Date;

/**
 * 默认延迟5s
 */
public class DefaultRetryInterval implements RetryInterval {
    
    
    @Override
    public Date interval(Date date) {
    
    
        long time = date.getTime();
        time += 5 * 1000;
        return new Date(time);
    }
}

重试任务管理器RetryTaskManager

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.DelayQueue;

/**
 * 重试任务管理器
 *
 * @author yanzhao
 */
public class RetryTaskManager {
    
    

    private static final Logger LOGGER = LoggerFactory.getLogger(RetryTaskManager.class);

    private static DelayQueue<RetryTask> retryTaskQueue = new DelayQueue<RetryTask>();

    /**
     * 添加任务到队列
     *
     * @param retryTask
     */
    private static void putRetryTask(RetryTask retryTask) {
    
    
        RetryTaskManager.getRetryTaskQueue().put(retryTask);
    }

    public static DelayQueue<RetryTask> getRetryTaskQueue() {
    
    
        return retryTaskQueue;
    }

    /**
     * 将传过来的对象进行通知次数判断,之后决定是否放在任务队列中
     *
     * @param retryTask
     * @throws Exception
     */
    public static void addRetryTask(RetryTask retryTask) {
    
    

        if (retryTask == null) {
    
    
            LOGGER.info("retry task is null, return;");
            return;
        }

        if (RetryQueueStarter.getInstance().isShutdown()) {
    
    
            LOGGER.info("retry thread pool is shutdown, retry task {} put failed", retryTask.toString());
            retryTask.interrupt();
            return;
        }

        if (!retryTask.hasChance()) {
    
    
            LOGGER.info("retry task {} has no chance, return;", retryTask.toString());
            return;
        }

        retryTask.nextTime();
        putRetryTask(retryTask);
        LOGGER.info("retry task {} put success", retryTask.toString());
    }
}

基于Delayed的延时任务RetryTask

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 延迟重试任务
 *
 * @author yanzhao
 */
public class RetryTask implements Delayed, Runnable {
    
    

    private Logger LOGGER = LoggerFactory.getLogger(RetryTask.class);

    private RetryBean retryBean = new RetryBean();

    private RetryInterval retryInterval;

    private Task task;

    private long execTime;

    /**
     * 当前任务是否被中断
     */
    private volatile AtomicBoolean isInterrupted = new AtomicBoolean(false);

    public RetryTask(Task task) {
    
    
        this(task, new DefaultRetryInterval());
    }

    public RetryTask(Task task, RetryInterval retryInterval) {
    
    
        this.task = task;
        this.retryInterval = retryInterval;
    }

    /**
     * 设置最大重试次数
     *
     * @param times
     */
    public void setMaxRetryTimes(int times) {
    
    
        if (times > 0) {
    
    
            this.retryBean.setLeastRetryTimes(times);
        }
    }

    /**
     * 剩余多少时间
     *
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
    
    
        return unit.convert(this.execTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 排序
     *
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
    
    
        long anotherExecTime = ((RetryTask) o).getRetryBean().getNextRetryTime().getTime();
        return this.execTime > anotherExecTime ? 1 : (this.execTime < anotherExecTime ? -1 : 0);
    }

    @Override
    public void run() {
    
    
        try {
    
    
            LOGGER.info("task {} start", this.task.toString());
            task.fire();
            LOGGER.info("task {} exec success", this.task.toString());
            task.onceRetry();
        } catch (Exception e) {
    
    
            tryAgain();
            LOGGER.error("task {} exec error: {}", this.task.toString(), e.getMessage());
        } finally {
    
    
            LOGGER.info("task {} end", this.task.toString());
        }
    }

    private void tryAgain() {
    
    
        RetryTaskManager.addRetryTask(this);
    }

    private RetryBean getRetryBean() {
    
    
        return retryBean;
    }

    /**
     * 判断当前任务是否还有重试机会
     *
     * @return
     */
    public boolean hasChance() {
    
    
        boolean b = this.retryBean.hasChance();
        if (!b) {
    
    
            LOGGER.info("task {} has retry {} times", this.task.toString(), this.getRetryBean().getLeastRetryTimes());
            try {
    
    
                task.exceedTimesRetry();
            } catch (Exception e) {
    
    
                LOGGER.error("task {} has retry {} times, callback error: {}", this.task.toString(),
                        this.getRetryBean().getLeastRetryTimes(), e.getMessage());
            }
        }
        return b;
    }

    /**
     * 下次执行任务时间
     */
    public void nextTime() {
    
    
        Date interval = retryInterval.interval(new Date());
        retryBean.setNextRetryTime(interval);
        this.execTime = interval.getTime();
        this.retryBean.reduceChange();
        LOGGER.info("task {} retry {} times, next exec time: {}, timestamp: {}", this.task.toString(), this.getRetryBean().getRetryTimes(),
                interval, this.execTime);
    }

    /**
     * 任务中断
     */
    public void interrupt() {
    
    
        try {
    
    
            boolean b = isInterrupted.compareAndSet(false, true);
            if (b) {
    
    
                task.interrupt();
                LOGGER.info("task {} interrupt success", this.task.toString());
            }
        } catch (Exception e) {
    
    
            LOGGER.error("task {} interrupt error: {} ", this.task.toString(), e.getMessage());
        }
    }

    @Override
    public String toString() {
    
    
        return task.toString();
    }
}

异常重试RetryException

/**
 * 重试异常
 */
public class RetryException extends RuntimeException {
    
    
    public RetryException() {
    
    
        super();
    }

    public RetryException(String message) {
    
    
        super(message);
    }

    public RetryException(String message, Throwable cause) {
    
    
        super(message, cause);
    }

    public RetryException(Throwable cause) {
    
    
        super(cause);
    }

    protected RetryException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
    
    
        super(message, cause, enableSuppression, writableStackTrace);
    }
}

重试队列启动器

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 重试队列启动器
 **/
public class RetryQueueStarter {
    
    


    private static final RetryQueueStarter retryQueueStarter;

    static {
    
    
        retryQueueStarter = new RetryQueueStarter();
    }

    private RetryQueueStarter() {
    
    
    }

    public static RetryQueueStarter getInstance() {
    
    
        return retryQueueStarter;
    }

    private static final Logger LOGGER = LoggerFactory.getLogger(RetryQueueStarter.class);

    private ThreadPoolExecutor retryQueuePool;

    /**
     * 初始化队列监听
     *
     * @throws Exception
     */
    public void init() {
    
    
        startThread();
    }

    /**
     * 销毁线程池
     *
     * @throws Exception
     */
    public void destroy() {
    
    
        retryQueuePool.shutdown();
        storeTask();
    }

    public void storeTask() {
    
    
        Object[] objects = RetryTaskManager.getRetryTaskQueue().toArray();
        for (int i = 0; i < objects.length; i++) {
    
    
            Object object = objects[i];
            LOGGER.info("remain task: {}", object.toString());
        }
    }

    private void startThread() {
    
    
        LOGGER.info("init retry task delay queue ...");
        final DelayQueue<RetryTask> retryTaskQueue = RetryTaskManager.getRetryTaskQueue();

        new Thread(new Runnable() {
    
    
            @Override
            public void run() {
    
    
                while (true) {
    
    
                    try {
    
    
                        if (!isShutdown()) {
    
    
                            RetryTask task = retryTaskQueue.take();
                            if (task != null) {
    
    
                                if (!isShutdown()) {
    
    
                                    retryQueuePool.submit(task);
                                } else {
    
    
                                    LOGGER.info("retry thread pool is shutdown, task: {}", task.toString());
                                }
                            }
                        } else {
    
    
                            break;
                        }
                    } catch (Exception e) {
    
    
                        LOGGER.error("task exec error: {}", e.getMessage());
                    }
                }
            }
        }, "RetryQueueStarter-Thread").start();
    }

    public void setRetryQueuePool(ThreadPoolExecutor retryQueuePool) {
    
    
        this.retryQueuePool = retryQueuePool;
    }

    public boolean isShutdown() {
    
    
        return this.retryQueuePool.isShutdown();
    }
}

测试任务TslSyncTask

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TslSyncTask implements Task {
    
    

    private Logger logger = LoggerFactory.getLogger(TslSyncTask.class);

    public TslSyncTask(String url, String tslJson, String productId) {
    
    
        this.url = url;
        this.tslJson = tslJson;
        this.productId = productId;
    }

    private String url;
    private String tslJson;
    private String productId;

    public String getUrl() {
    
    
        return url;
    }

    public void setUrl(String url) {
    
    
        this.url = url;
    }

    public String getTslJson() {
    
    
        return tslJson;
    }

    public void setTslJson(String tslJson) {
    
    
        this.tslJson = tslJson;
    }

    public String getProductId() {
    
    
        return productId;
    }

    public void setProductId(String productId) {
    
    
        this.productId = productId;
    }

    @Override
    public void fire() throws Exception {
    
    
        logger.info("url: {}, productId: {}", url, productId);
        if (productId.equals("2")) {
    
    
            throw new RetryException("主动重试");
        }
        System.out.println("hi");
    }

	@Override
    public void onceRetry() throws Exception {
    
    
    }

    @Override
    public void exceedTimesRetry() throws Exception {
    
    
    }

    @Override
    public void interrupt() throws Exception {
    
    
    }

    @Override
    public String toString() {
    
    
        return productId;
    }
}

测试客户端

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Client {
    
    

    public static void main(String[] args) throws InterruptedException {
    
    
        String url = "http://10.8.1.22:10206/create";
        String json = "json data";
        RetryQueueStarter retryQueueStarter = RetryQueueStarter.getInstance();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 20, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), 
                new ThreadFactory() {
    
    
                    final AtomicInteger threadNumber = new AtomicInteger(1);
                    public Thread newThread(Runnable r) {
    
    
                        Thread t = new Thread(r,
                                "retry-" + threadNumber.getAndIncrement());
                        if (t.getPriority() != Thread.NORM_PRIORITY) {
    
    
                            t.setPriority(Thread.NORM_PRIORITY);
                        }
                        return t;
                    }
                }, new ThreadPoolExecutor.CallerRunsPolicy());

        retryQueueStarter.setRetryQueuePool(threadPoolExecutor);
        retryQueueStarter.init();
        RetryTaskManager.addRetryTask(new RetryTask(new TslSyncTask(url, json, "2")));
        RetryTaskManager.addRetryTask(new RetryTask(new TslSyncTask(url, json, "1")));
        TimeUnit.SECONDS.sleep(6);
        retryQueueStarter.destroy();
        TimeUnit.SECONDS.sleep(1);
        RetryTaskManager.addRetryTask(new RetryTask(new TslSyncTask(url, json, "3")));

        synchronized (Client.class){
    
    
            Client.class.wait();
        }
    }
}

猜你喜欢

转载自blog.csdn.net/qq_30038111/article/details/113141106