@Service("codBillingSyncService")
public class CodBillingSyncServiceImpl implements ICodBillingSyncService {
/**
* 用于管理线程的线程池
*/
static Integer corePoolSize = Integer.parseInt(Property.getProperty("COD_TO_BIL_COREPOOLSIZE"));
static Integer maximumPoolSize = Integer.parseInt(Property.getProperty("COD_TO_BIL_MAXIMUMPOOLSIZE"));
static Integer keepaliveTime = Integer.parseInt(Property.getProperty("COD_TO_BIL_KEEPALIVETIME"));
static Integer workqueueSize = Integer.parseInt(Property.getProperty("COD_TO_BIL_WORKQUEUESIZE"));
private static ThreadPoolExecutor exec = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepaliveTime,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(
workqueueSize), new ThreadPoolExecutor.CallerRunsPolicy());
@Override
public void sendCodInfoToBillingReal(final OfflineBusiness business,
final BusinessTypeToBil bt, final ReceiveTypeToBil rt,
final TradeTypeToBil tt) {
final CodToBil ctb = OrderUtils.getCodToBil(business, bt, rt, tt);
ctb.setSendStatus(SendStatus.N);
codBillingSyncService.createCodToBil(ctb);
// 如果线程池未满,或缓存队列未满,则执行以下代码.
synchronized (exec) {
if (exec.getPoolSize() < exec.getMaximumPoolSize() || exec.getQueue().size() < workqueueSize) {
// 执行线程
exec.execute(new Runnable() {
@Override
public void run() {
boolean f = false;// 发送是否成功的标志位
CodTradeItem cti = null;
// 发送MQ消息
try {
cti = OrderUtils.getCodTradeItem(ctb);
codBillingSyncService.sendCodInfoToBillingReal(cti);
f = true;
} catch (Exception e) {
logger.error("sendCodCancelInfoToBillingReal error: ", e);
}
// 新增相应的Cod入Bil记录
try {
if (ctb != null) {
if (f) {
//更新状态
codBillingSyncService.updateSendStatus(ctb.getId(), SendStatus.Y);
}
}
} catch (Exception e) {
logger.error("createCodToBil error: ", e);
}
}
});
}
}
}
}