spring-kafka-client
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <bean id="baseConfigure" abstract="true" class="com.laifeng.kafka.client.youku.conf.BaseConfigure"> <property name="token" value="${kafka.sdk.token}"/> <property name="zkConnTimeOut" value="60000"/> </bean> <bean id="cpsJobsTopic" class="java.lang.String"> <constructor-arg value="${kafka.topic.cpsJobs}"/> </bean> </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <import resource="classpath:applicationContext-kafka-topics.xml"/> <bean id="consumerConfigure" abstract="true" class="com.laifeng.kafka.client.youku.conf.ConsumerConfigure" parent="baseConfigure"> <property name="consumerGroupName" value="CpsJobsTask"/> <property name="messageClass" value="com.laifeng.kafka.client.message.JsonMapLFMessage"/> </bean> <bean id="consumerAdapter" class="com.laifeng.kafka.client.youku.ConsumerAdapter"> <property name="outputFromKafka"> <bean class="org.springframework.integration.channel.FixedSubscriberChannel"> <constructor-arg ref="dispatcherConsumerHandler"/> <property name="beanName" value="outputFromKafka"/> </bean> </property> <property name="consumerSet"> <set value-type="com.laifeng.kafka.client.youku.TopicConsumers"> <ref bean="cpsJobsConsumer"/> </set> </property> </bean> <bean id="dispatcherConsumerHandler" class="com.laifeng.kafka.client.youku.handler.ReceiveMessageHandler"> <property name="handlers"> <map key-type="java.lang.String" value-type="com.laifeng.kafka.client.handler.LFMessageHandler"> <entry key-ref="cpsJobsTopic"><bean class="com.laifeng.cpsjobs.handler.cps.PackageHandler"/></entry> </map> </property> </bean> <bean id="cpsJobsConsumer" class="com.laifeng.kafka.client.youku.factory.ConsumerFactoryBean" parent="consumerConfigure"> <property name="streamsCount" value="2"/> <property name="topic" ref="cpsJobsTopic"/> </bean> </beans>
ConsumerFactoryBean
public TopicConsumers getObject() throws Exception { KafkaUtil.setKafkaPropZkConnTimeout(getZkConnTimeOut()); List<IChannel> channels=new ArrayList<IChannel>(); if(streamsCount>0) MqSystem.getMqSystem().setMaxChannleQue(streamsCount); for (int i=0;i<streamsCount;i++) { IChannel iChannel=null; try { iChannel = MqSystem.getMqSystem().open(getChannelMode(), getTopicPrefix() + getTopic() + getTopicSuffix(), getToken(),getGroupPrefix()+getConsumerGroupName()+getCgroupSuffix()); }catch (Exception e){ logger.error("Failed to create kafka consumer,msg:{}",e.getMessage(),e); System.err.println("Failed to create kafka consumer,exit"); System.exit(-1); } channels.add(iChannel); } return new TopicConsumers(getTopic(),channels,messageClass); }
ConsumerAdapter
public void afterPropertiesSet() throws Exception { //初始化线程池 executorService= Executors.newCachedThreadPool(); //遍历消费者,创建任务 for (TopicConsumers topicConsumers:consumerSet){ String topicName=topicConsumers.getTopicName(); Class messageClass=topicConsumers.getMessageClass(); List<IChannel> channels=topicConsumers.getChannelList(); if(CollectionUtils.isNotEmpty(channels)){ for (IChannel iChannel : channels){ executorService.execute(new getMessageTask(iChannel,topicName,messageClass)); } } } }
getMessageTask
public void run() { for (;;){ try { String msg= iChannel.getMsg(); LFMessage payload = (LFMessage)fromJson(msg,messageClass); if(payload instanceof JsonMapLFMessage){ String msgProfile =((JsonMapLFMessage) payload).getString(ChatProfiles.BIZ_PROFILE_PARAM_KEY); if(ChatProfiles.isDistinguishENV() && !ChatProfiles.isCurrentENV(msgProfile)){ logger.warn("需要区分环境,并且本条消息不属于本环境,current:{},thisMag:{}",ChatProfiles.currentEnv(),msgProfile); continue; } } outputFromKafka.send(new KafkaMessage(payload,topicName)); } catch (MqException e) { logger.error("从kafka获取并处理消息发生异常,topicName:{}",topicName,e); }catch (Exception e){ logger.error("从kafka获取并处理消息发生异常,topicName:{}",topicName,e); } } }
PackageHandler 入口程序
public class PackageHandler implements LFMessageHandler { public void onMessage(LFMessage message) throws KafkaLFException { if (message instanceof JsonMapLFMessage) { this.packageService.runnable(taskList, this.packageService); } } }
PackageServiceImpl
public void runnable(Integer[] taskList, PackageService packageService) { for (Integer i : taskList) { List<PackageInfoTask> packageInfoTaskList = this.packageInfoTaskService.listPackageInfoTask(i); this.execute.submit(new PackageTaskRunnable(packageInfoTaskList, packageService)); } } private ExecutorService execute = Executors.newFixedThreadPool(1);
PackageTaskRunnable
public void run() { for (PackageInfoTask packageInfoTask : this.packageInfoTaskList) { try { this.packageService.executeTask(packageInfoTask); } catch (IOException e) { e.printStackTrace(); } } }
packageService.executeTask
//执行打包外部命令 try { this.executeScript( id, taskId, apkPath, cpsPosition, cpsCookie, isFirst, isFirstImg, isFirstPosition, friendChannelName, jobCode ); this.unCloseStatus(taskId, UN_LOCK_CODE); } catch (Exception e) { this.unCloseStatus(taskId, UN_LOCK_CODE); }finally { this.unCloseStatus(taskId, UN_LOCK_CODE); }
打包
String executeInfo = PackageServiceImpl.exec(parame, null).toString();
public static ExecResult exec(String command, File dir) throws IOException { ExecResult ret = new ExecResult(); Runtime rt = Runtime.getRuntime(); Process pr = rt.exec(command, null, dir); BufferedReader input = new BufferedReader(new InputStreamReader(pr.getInputStream())); String line = null; while ((line = input.readLine()) != null) { ret.output += line + "\n"; } input = new BufferedReader(new InputStreamReader(pr.getErrorStream())); while ((line = input.readLine()) != null) { ret.errout += line + "\n"; } try { ret.code = pr.waitFor(); } catch (InterruptedException e) { ret.code = -1; } return ret; } /** * 执行返回的数据结构 */ static class ExecResult { //外部程序返回的错误码,0表示执行成功 int code; //外部程序的标准输出stdout String output = ""; //外部程序的标准错误输出stderr String errout = ""; public String toString() { return "code:" + code + ",output:" + output + ",errout:" + errout; } }