/**
-
异步消费kafka消息任务
*/
@Slf4j
@Component
public class ConsumeKafkaMessageTask {/**
-
消费kafka消息
-
@param message
-
@author albert.ding
*/
@Async
public void consume(String message) {long start = System.currentTimeMillis();
boolean isSuccess = false;//本次消息处理结果
int cmd = Cmd.UNKNOWN_CMD.getCmd();//默认为不支持的业务类型try {
//解析消息,找cmd区分业务
cmd = JsonPath.read(message, “$.cmd”);
log.info("—>本次业务cmd=[{}]", cmd);//验证业务是否支持 Cmd type = Cmd.getCmd(cmd); if(null == type){ throw new KmsException("不支持的业务" + cmd); } //找处理器 BaseSenderService handler = (BaseSenderService) ApplicationContextUtil.getBean(type.getHandler()); if(null == handler){ throw new KmsException("缺少相应的处理器" + type.getHandler()); } //具体执行 log.info("--->本次业务处理器=[{}]", handler.getClass().getSimpleName()); handler.doBusiness(message); isSuccess = true;
} catch (JsonPathException e) {
log.info(“消息处理失败,报文中缺少必要字段cmd…{}”, Throwables.getStackTraceAsString(e));
} catch (KmsException e) {
log.info(“消息处理失败[{}],将丢弃…”, e.getMessage());
} catch (Exception e) {
log.error(“消息处理异常,将丢弃…{}”, Throwables.getStackTraceAsString(e));
} finally {
log.info(“消息处理[{}],耗时[{}],消息Id=[{}]…”, isSuccess? “成功” : “失败”, System.currentTimeMillis() - start, cmd);
}
}
}
-