RetryBean重试配置
import java. io. Serializable ;
import java. util. Date ;
public class RetryBean implements Serializable {
public RetryBean ( ) {
this . createTime = new Date ( ) ;
}
private Date createTime;
private Date nextRetryTime;
private Integer retryTimes = 0 ;
private Integer leastRetryTimes = 50 ;
public Date getCreateTime ( ) {
return createTime;
}
public void setCreateTime ( Date createTime) {
this . createTime = createTime;
}
public Date getNextRetryTime ( ) {
return nextRetryTime;
}
public void setNextRetryTime ( Date nextRetryTime) {
this . nextRetryTime = nextRetryTime;
}
public Integer getRetryTimes ( ) {
return retryTimes;
}
public void setRetryTimes ( Integer retryTimes) {
this . retryTimes = retryTimes;
}
public Integer getLeastRetryTimes ( ) {
return leastRetryTimes;
}
public void setLeastRetryTimes ( Integer leastRetryTimes) {
this . leastRetryTimes = leastRetryTimes;
}
public boolean hasChance ( ) {
return this . retryTimes < leastRetryTimes;
}
public void reduceChange ( ) {
this . retryTimes++ ;
}
}
重试任务回调接口
* *
* 重试任务回调接口
*
* @author yanzhao
* /
public interface Callback {
void onceRetry ( ) throws Exception ;
void exceedTimesRetry ( ) throws Exception ;
void interrupt ( ) throws Exception ;
}
用户自定义Task接口
public interface Task extends Callback {
void fire ( ) throws Exception ;
String toString ( ) ;
}
重试间隔RetryInterval接口
import java. util. Date ;
public interface RetryInterval {
Date interval ( Date date) ;
}
默认重试间隔DefaultRetryInterval实现
import java. util. Date ;
public class DefaultRetryInterval implements RetryInterval {
@Override
public Date interval ( Date date) {
long time = date. getTime ( ) ;
time += 5 * 1000 ;
return new Date ( time) ;
}
}
重试任务管理器RetryTaskManager
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import java. util. concurrent. DelayQueue ;
public class RetryTaskManager {
private static final Logger LOGGER = LoggerFactory . getLogger ( RetryTaskManager . class ) ;
private static DelayQueue < RetryTask > retryTaskQueue = new DelayQueue < RetryTask > ( ) ;
private static void putRetryTask ( RetryTask retryTask) {
RetryTaskManager . getRetryTaskQueue ( ) . put ( retryTask) ;
}
public static DelayQueue < RetryTask > getRetryTaskQueue ( ) {
return retryTaskQueue;
}
public static void addRetryTask ( RetryTask retryTask) {
if ( retryTask == null ) {
LOGGER. info ( "retry task is null, return;" ) ;
return ;
}
if ( RetryQueueStarter . getInstance ( ) . isShutdown ( ) ) {
LOGGER. info ( "retry thread pool is shutdown, retry task {} put failed" , retryTask. toString ( ) ) ;
retryTask. interrupt ( ) ;
return ;
}
if ( ! retryTask. hasChance ( ) ) {
LOGGER. info ( "retry task {} has no chance, return;" , retryTask. toString ( ) ) ;
return ;
}
retryTask. nextTime ( ) ;
putRetryTask ( retryTask) ;
LOGGER. info ( "retry task {} put success" , retryTask. toString ( ) ) ;
}
}
基于Delayed的延时任务RetryTask
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import java. util. Date ;
import java. util. concurrent. Delayed ;
import java. util. concurrent. TimeUnit ;
import java. util. concurrent. atomic. AtomicBoolean ;
public class RetryTask implements Delayed , Runnable {
private Logger LOGGER = LoggerFactory . getLogger ( RetryTask . class ) ;
private RetryBean retryBean = new RetryBean ( ) ;
private RetryInterval retryInterval;
private Task task;
private long execTime;
private volatile AtomicBoolean isInterrupted = new AtomicBoolean ( false ) ;
public RetryTask ( Task task) {
this ( task, new DefaultRetryInterval ( ) ) ;
}
public RetryTask ( Task task, RetryInterval retryInterval) {
this . task = task;
this . retryInterval = retryInterval;
}
public void setMaxRetryTimes ( int times) {
if ( times > 0 ) {
this . retryBean. setLeastRetryTimes ( times) ;
}
}
@Override
public long getDelay ( TimeUnit unit) {
return unit. convert ( this . execTime - System . currentTimeMillis ( ) , TimeUnit . MILLISECONDS) ;
}
@Override
public int compareTo ( Delayed o) {
long anotherExecTime = ( ( RetryTask ) o) . getRetryBean ( ) . getNextRetryTime ( ) . getTime ( ) ;
return this . execTime > anotherExecTime ? 1 : ( this . execTime < anotherExecTime ? - 1 : 0 ) ;
}
@Override
public void run ( ) {
try {
LOGGER. info ( "task {} start" , this . task. toString ( ) ) ;
task. fire ( ) ;
LOGGER. info ( "task {} exec success" , this . task. toString ( ) ) ;
task. onceRetry ( ) ;
} catch ( Exception e) {
tryAgain ( ) ;
LOGGER. error ( "task {} exec error: {}" , this . task. toString ( ) , e. getMessage ( ) ) ;
} finally {
LOGGER. info ( "task {} end" , this . task. toString ( ) ) ;
}
}
private void tryAgain ( ) {
RetryTaskManager . addRetryTask ( this ) ;
}
private RetryBean getRetryBean ( ) {
return retryBean;
}
public boolean hasChance ( ) {
boolean b = this . retryBean. hasChance ( ) ;
if ( ! b) {
LOGGER. info ( "task {} has retry {} times" , this . task. toString ( ) , this . getRetryBean ( ) . getLeastRetryTimes ( ) ) ;
try {
task. exceedTimesRetry ( ) ;
} catch ( Exception e) {
LOGGER. error ( "task {} has retry {} times, callback error: {}" , this . task. toString ( ) ,
this . getRetryBean ( ) . getLeastRetryTimes ( ) , e. getMessage ( ) ) ;
}
}
return b;
}
public void nextTime ( ) {
Date interval = retryInterval. interval ( new Date ( ) ) ;
retryBean. setNextRetryTime ( interval) ;
this . execTime = interval. getTime ( ) ;
this . retryBean. reduceChange ( ) ;
LOGGER. info ( "task {} retry {} times, next exec time: {}, timestamp: {}" , this . task. toString ( ) , this . getRetryBean ( ) . getRetryTimes ( ) ,
interval, this . execTime) ;
}
public void interrupt ( ) {
try {
boolean b = isInterrupted. compareAndSet ( false , true ) ;
if ( b) {
task. interrupt ( ) ;
LOGGER. info ( "task {} interrupt success" , this . task. toString ( ) ) ;
}
} catch ( Exception e) {
LOGGER. error ( "task {} interrupt error: {} " , this . task. toString ( ) , e. getMessage ( ) ) ;
}
}
@Override
public String toString ( ) {
return task. toString ( ) ;
}
}
异常重试RetryException
public class RetryException extends RuntimeException {
public RetryException ( ) {
super ( ) ;
}
public RetryException ( String message) {
super ( message) ;
}
public RetryException ( String message, Throwable cause) {
super ( message, cause) ;
}
public RetryException ( Throwable cause) {
super ( cause) ;
}
protected RetryException ( String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super ( message, cause, enableSuppression, writableStackTrace) ;
}
}
重试队列启动器
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import java. util. concurrent. DelayQueue ;
import java. util. concurrent. ThreadPoolExecutor ;
public class RetryQueueStarter {
private static final RetryQueueStarter retryQueueStarter;
static {
retryQueueStarter = new RetryQueueStarter ( ) ;
}
private RetryQueueStarter ( ) {
}
public static RetryQueueStarter getInstance ( ) {
return retryQueueStarter;
}
private static final Logger LOGGER = LoggerFactory . getLogger ( RetryQueueStarter . class ) ;
private ThreadPoolExecutor retryQueuePool;
public void init ( ) {
startThread ( ) ;
}
public void destroy ( ) {
retryQueuePool. shutdown ( ) ;
storeTask ( ) ;
}
public void storeTask ( ) {
Object [ ] objects = RetryTaskManager . getRetryTaskQueue ( ) . toArray ( ) ;
for ( int i = 0 ; i < objects. length; i++ ) {
Object object = objects[ i] ;
LOGGER. info ( "remain task: {}" , object. toString ( ) ) ;
}
}
private void startThread ( ) {
LOGGER. info ( "init retry task delay queue ..." ) ;
final DelayQueue < RetryTask > retryTaskQueue = RetryTaskManager . getRetryTaskQueue ( ) ;
new Thread ( new Runnable ( ) {
@Override
public void run ( ) {
while ( true ) {
try {
if ( ! isShutdown ( ) ) {
RetryTask task = retryTaskQueue. take ( ) ;
if ( task != null ) {
if ( ! isShutdown ( ) ) {
retryQueuePool. submit ( task) ;
} else {
LOGGER. info ( "retry thread pool is shutdown, task: {}" , task. toString ( ) ) ;
}
}
} else {
break ;
}
} catch ( Exception e) {
LOGGER. error ( "task exec error: {}" , e. getMessage ( ) ) ;
}
}
}
} , "RetryQueueStarter-Thread" ) . start ( ) ;
}
public void setRetryQueuePool ( ThreadPoolExecutor retryQueuePool) {
this . retryQueuePool = retryQueuePool;
}
public boolean isShutdown ( ) {
return this . retryQueuePool. isShutdown ( ) ;
}
}
测试任务TslSyncTask
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
public class TslSyncTask implements Task {
private Logger logger = LoggerFactory . getLogger ( TslSyncTask . class ) ;
public TslSyncTask ( String url, String tslJson, String productId) {
this . url = url;
this . tslJson = tslJson;
this . productId = productId;
}
private String url;
private String tslJson;
private String productId;
public String getUrl ( ) {
return url;
}
public void setUrl ( String url) {
this . url = url;
}
public String getTslJson ( ) {
return tslJson;
}
public void setTslJson ( String tslJson) {
this . tslJson = tslJson;
}
public String getProductId ( ) {
return productId;
}
public void setProductId ( String productId) {
this . productId = productId;
}
@Override
public void fire ( ) throws Exception {
logger. info ( "url: {}, productId: {}" , url, productId) ;
if ( productId. equals ( "2" ) ) {
throw new RetryException ( "主动重试" ) ;
}
System . out. println ( "hi" ) ;
}
@Override
public void onceRetry ( ) throws Exception {
}
@Override
public void exceedTimesRetry ( ) throws Exception {
}
@Override
public void interrupt ( ) throws Exception {
}
@Override
public String toString ( ) {
return productId;
}
}
测试客户端
import java. util. concurrent. ArrayBlockingQueue ;
import java. util. concurrent. ThreadFactory ;
import java. util. concurrent. ThreadPoolExecutor ;
import java. util. concurrent. TimeUnit ;
import java. util. concurrent. atomic. AtomicInteger ;
public class Client {
public static void main ( String [ ] args) throws InterruptedException {
String url = "http://10.8.1.22:10206/create" ;
String json = "json data" ;
RetryQueueStarter retryQueueStarter = RetryQueueStarter . getInstance ( ) ;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor ( 2 , 2 , 20 , TimeUnit . SECONDS,
new ArrayBlockingQueue < Runnable > ( 2000 ) ,
new ThreadFactory ( ) {
final AtomicInteger threadNumber = new AtomicInteger ( 1 ) ;
public Thread newThread ( Runnable r) {
Thread t = new Thread ( r,
"retry-" + threadNumber. getAndIncrement ( ) ) ;
if ( t. getPriority ( ) != Thread . NORM_PRIORITY) {
t. setPriority ( Thread . NORM_PRIORITY) ;
}
return t;
}
} , new ThreadPoolExecutor. CallerRunsPolicy ( ) ) ;
retryQueueStarter. setRetryQueuePool ( threadPoolExecutor) ;
retryQueueStarter. init ( ) ;
RetryTaskManager . addRetryTask ( new RetryTask ( new TslSyncTask ( url, json, "2" ) ) ) ;
RetryTaskManager . addRetryTask ( new RetryTask ( new TslSyncTask ( url, json, "1" ) ) ) ;
TimeUnit . SECONDS. sleep ( 6 ) ;
retryQueueStarter. destroy ( ) ;
TimeUnit . SECONDS. sleep ( 1 ) ;
RetryTaskManager . addRetryTask ( new RetryTask ( new TslSyncTask ( url, json, "3" ) ) ) ;
synchronized ( Client . class ) {
Client . class . wait ( ) ;
}
}
}