粒子物理后传之:构建流计算拓扑[大结局]
意义
所谓的拓扑就是一系列结构相连的单元组成的图,在最终章我需要构建一个“能量数据采集->(发布能量数据)->接收能量数据并计算期望->(交付给更下游)->接收结果并发布”这样的简单拓扑。扩展它,我们可以得到任意功能任意处理的复杂任务流处理方式。
更多的依赖包
在这里我们需要在maven的项目对象模型文件pom.xml里添加更多的依赖,最后它是这样:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hanss.acc</groupId>
<artifactId>Acc</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>Acc</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
<exclusions>
<exclusion>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.1.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
流处理单元:processor
processor是kafka的低层处理器API, 可通过实现Processor接口来自定义处理逻辑,该接口有两个主要方法,process方法会被作用于每条收到的记录,punctuate方法基于时间的流逝周期性地执行。另外,处理器可使用init方法中创建的ProcessorContext实例来维护当前上下文,并使用上下文来调用周期性任务(context().schedule),或将修改的、新的键值对推送给下游处理器(context().forward),或提交当前的处理进度(context().commit),等等。
第一个Processor:”EProcessor”,它的功能是从上游主题读取能量值,然后加以计算期望,再交付下游。
package com.hanss.acc;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
public class EProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, Integer> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
this.kvStore = (KeyValueStore<String, Integer>)context.getStateStore("Counts");//ensure Topic
}
@Override
public void process(String key, String value) {
Double thisEnergyValue = new Double(Double.valueOf(value).doubleValue());
thisEnergyValue = thisEnergyValue*0.5+22;
Integer thisEnergyValueInt = ( new Double(thisEnergyValue) ).intValue();
System.out.println("EP2 正在计算");
System.out.println("EP2 计算散射能量期望 : "+thisEnergyValueInt+" Mev");
this.kvStore.put(value, thisEnergyValueInt);
}
@Override
public void punctuate(long timestamp) {
try (KeyValueIterator<String, Integer> iterator = this.kvStore.all()) {
iterator.forEachRemaining(entry -> {
context.forward(entry.key, entry.value);//with "final ProcessorNode previousNode = currentNode();" for move forward and to delete
//String thisEnergy = entry.value;
//int energyToDelete =( new Double(Double.valueOf((thisEnergy).replace(" Mev","")).doubleValue()) ).intValue();
this.kvStore.delete(entry.key);//delete
});
}
context.commit();
}
@Override
public void close() {
this.kvStore.close();
}
}
第二个Processor:”SProcessor”,它的功能是从上游处理器读取期望值,然后发布结果。
package com.hanss.acc;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
public class SProcessor implements Processor<String, Integer> {
private ProcessorContext context;
private KeyValueStore<String, String> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
this.kvStore = (KeyValueStore<String, String>)context.getStateStore("Sprint");//ensure Topic
}
@Override
public void process(String key, Integer value) {
//Integer thisEnergyValue = ( new Double(Double.valueOf(value).doubleValue()) ).intValue();
System.out.println("SP3 收到计算结果: "+value+" Mev");
this.kvStore.put("res", Integer.toString(value));
}
@Override
public void punctuate(long timestamp) {
try (KeyValueIterator<String, String> iterator = this.kvStore.all()) {
iterator.forEachRemaining(entry -> {
context.forward(entry.key, entry.value);//with "final ProcessorNode previousNode = currentNode();" for move forward and to delete
//String thisEnergy = entry.value;
//int energyToDelete =( new Double(Double.valueOf((thisEnergy).replace(" Mev","")).doubleValue()) ).intValue();
this.kvStore.delete(entry.key);//delete
});
}
context.commit();
}
@Override
public void close() {
this.kvStore.close();
}
}
串接组织:Topology
有了在处理器API中自定义的处理器,然后就可以使用TopologyBuilder来将处理器连接到一起从而构建处理器拓扑。
事实上,我需要它也继承一个线程类,以至于能加入更高层的主函数。
package com.hanss.acc;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.Stores;
public class StatTopology extends Thread{
@Override
public void run() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "energy-stat-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "127.0.0.1:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "EnergyInfo")
.addProcessor("EProcessor", EProcessor::new, "SOURCE")
.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "EProcessor")
//.connectProcessorAndStateStores("EProcessor", "Counts")
.addProcessor("SProcessor", SProcessor::new, "EProcessor")
.addStateStore(Stores.create("Sprint").withStringKeys().withIntegerValues().inMemory().build(), "SProcessor")
.addSink("SINK", "EnergyCount", new StringSerializer(), new StringSerializer(), "SProcessor");
KafkaStreams stream = new KafkaStreams(builder, props);
stream.start();
try{System.in.read();}catch(IOException e){e.printStackTrace();}
//System.in.read();
stream.close();
stream.cleanUp();
}
}
主函数
依照“能量数据采集->(发布能量数据)->接收能量数据并计算期望->(交付给更下游)->接收结果并发布”的顺序组织:
package com.hanss.acc;
import com.hanss.acc.ConfigureAPI.KafkaProperties;
import com.hanss.acc.JConsumer;
import com.hanss.acc.JProducer;
import com.hanss.acc.StatTopology;
/**
* @Date Nov 20, 2017
*
* @Author hanss401
*
* @Note To run Kafka Code
*/
public class KafkaClient {
public static void main(String[] args) {
JProducer pro = new JProducer("EnergyInfo");
pro.start();
StatTopology topo = new StatTopology();
topo.start();
//sleep(3*1000);
JConsumer con = new JConsumer("EnergyCount");
con.start();
}
}
尾声
P1:消息发布者;
EP2:第一个计算节点;
SP3:第二个计算节点;
[root@master AccBak]# mvn exec:java -Dexec.mainClass="com.hanss.acc.KafkaClient"
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Acc 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- exec-maven-plugin:1.6.0:java (default-cli) @ Acc ---
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
P1 发送了消息->[探测到粒子: 1733.223 Mev]
P1 消息主题:粒子信息收集
EP2 正在计算
EP2 计算散射能量期望 : 888 Mev
SP3 收到计算结果: 888 Mev
P1 发送了消息->[探测到粒子: 569.331 Mev]
P1 消息主题:粒子信息收集
EP2 正在计算
EP2 计算散射能量期望 : 306 Mev
SP3 收到计算结果: 306 Mev
P1 发送了消息->[探测到粒子: 667.824 Mev]
P1 消息主题:粒子信息收集
EP2 正在计算
EP2 计算散射能量期望 : 355 Mev
SP3 收到计算结果: 355 Mev
P1 发送了消息->[探测到粒子: 1217.001 Mev]
P1 消息主题:粒子信息收集
EP2 正在计算
EP2 计算散射能量期望 : 630 Mev
SP3 收到计算结果: 630 Mev
P1 发送了消息->[探测到粒子: 1733.223 Mev]
P1 消息主题:粒子信息收集
EP2 正在计算
EP2 计算散射能量期望 : 888 Mev
SP3 收到计算结果: 888 Mev
P1 发送了消息->[探测到粒子: 569.331 Mev]
P1 消息主题:粒子信息收集
EP2 正在计算
EP2 计算散射能量期望 : 306 Mev
SP3 收到计算结果: 306 Mev
... ...
后记
1、Kafka很可能取代Storm;
2、Kafka会有更惊喜特性;
3、更虚拟的世界… …