粒子物理后传之:构建流计算拓扑[大结局]

粒子物理后传之:构建流计算拓扑[大结局]


这里写图片描述


意义

所谓的拓扑就是一系列结构相连的单元组成的图,在最终章我需要构建一个“能量数据采集->(发布能量数据)->接收能量数据并计算期望->(交付给更下游)->接收结果并发布”这样的简单拓扑。扩展它,我们可以得到任意功能任意处理的复杂任务流处理方式。


更多的依赖包

在这里我们需要在maven的项目对象模型文件pom.xml里添加更多的依赖,最后它是这样:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.hanss.acc</groupId>
  <artifactId>Acc</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>Acc</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.10.1.0</version>
      <exclusions>
        <exclusion>
          <groupId>com.101tec</groupId>
          <artifactId>zkclient</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.10.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>0.10.1.0</version>
    </dependency>
    <dependency>
      <groupId>commons-io</groupId>
      <artifactId>commons-io</artifactId>
      <version>2.5</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.2</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

流处理单元:processor

processor是kafka的低层处理器API, 可通过实现Processor接口来自定义处理逻辑,该接口有两个主要方法,process方法会被作用于每条收到的记录,punctuate方法基于时间的流逝周期性地执行。另外,处理器可使用init方法中创建的ProcessorContext实例来维护当前上下文,并使用上下文来调用周期性任务(context().schedule),或将修改的、新的键值对推送给下游处理器(context().forward),或提交当前的处理进度(context().commit),等等。

第一个Processor:”EProcessor”,它的功能是从上游主题读取能量值,然后加以计算期望,再交付下游。

package com.hanss.acc;

import java.util.Optional;
import java.util.stream.Stream;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

public class EProcessor implements Processor<String, String> {
    private ProcessorContext context;
    private KeyValueStore<String, Integer> kvStore;

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.schedule(1000);
        this.kvStore = (KeyValueStore<String, Integer>)context.getStateStore("Counts");//ensure Topic
    }



    @Override 
    public void process(String key, String value) {
        Double thisEnergyValue     =  new Double(Double.valueOf(value).doubleValue());
        thisEnergyValue     =  thisEnergyValue*0.5+22;
        Integer thisEnergyValueInt =  ( new Double(thisEnergyValue) ).intValue();
        System.out.println("EP2 正在计算");
        System.out.println("EP2 计算散射能量期望 : "+thisEnergyValueInt+" Mev");
        this.kvStore.put(value, thisEnergyValueInt);
    }

    @Override
    public void punctuate(long timestamp) {
        try (KeyValueIterator<String, Integer> iterator = this.kvStore.all()) {
            iterator.forEachRemaining(entry -> {
              context.forward(entry.key, entry.value);//with "final ProcessorNode previousNode = currentNode();" for move forward and to delete
              //String thisEnergy  = entry.value;
              //int energyToDelete =( new Double(Double.valueOf((thisEnergy).replace(" Mev","")).doubleValue()) ).intValue(); 
              this.kvStore.delete(entry.key);//delete
            });
        }
        context.commit();
    }

    @Override
    public void close() {
        this.kvStore.close();
    }
}

第二个Processor:”SProcessor”,它的功能是从上游处理器读取期望值,然后发布结果。

package com.hanss.acc;

import java.util.Optional;
import java.util.stream.Stream;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

public class SProcessor implements Processor<String, Integer> {
    private ProcessorContext context;
    private KeyValueStore<String, String> kvStore;

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.schedule(1000);
        this.kvStore = (KeyValueStore<String, String>)context.getStateStore("Sprint");//ensure Topic
    }

    @Override
    public void process(String key, Integer value) {
        //Integer thisEnergyValue   = ( new Double(Double.valueOf(value).doubleValue()) ).intValue(); 
        System.out.println("SP3 收到计算结果: "+value+" Mev");
        this.kvStore.put("res",  Integer.toString(value));
    }

    @Override
    public void punctuate(long timestamp) {
        try (KeyValueIterator<String, String> iterator = this.kvStore.all()) {
            iterator.forEachRemaining(entry -> {
              context.forward(entry.key, entry.value);//with "final ProcessorNode previousNode = currentNode();" for move forward and to delete
              //String thisEnergy  = entry.value;
              //int energyToDelete =( new Double(Double.valueOf((thisEnergy).replace(" Mev","")).doubleValue()) ).intValue(); 
              this.kvStore.delete(entry.key);//delete
            });
        }
        context.commit();
    }

    @Override
    public void close() {
        this.kvStore.close();
    }
}

串接组织:Topology

有了在处理器API中自定义的处理器,然后就可以使用TopologyBuilder来将处理器连接到一起从而构建处理器拓扑。
事实上,我需要它也继承一个线程类,以至于能加入更高层的主函数。

package com.hanss.acc;

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.Stores;


public class StatTopology extends Thread{

   @Override
   public void run() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "energy-stat-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "127.0.0.1:2181");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
        //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "EnergyInfo")
               .addProcessor("EProcessor", EProcessor::new, "SOURCE")
               .addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "EProcessor")
               //.connectProcessorAndStateStores("EProcessor", "Counts")
               .addProcessor("SProcessor", SProcessor::new, "EProcessor")
               .addStateStore(Stores.create("Sprint").withStringKeys().withIntegerValues().inMemory().build(), "SProcessor")
               .addSink("SINK", "EnergyCount", new StringSerializer(), new StringSerializer(), "SProcessor");

        KafkaStreams stream = new KafkaStreams(builder, props);
        stream.start();
        try{System.in.read();}catch(IOException e){e.printStackTrace();}
        //System.in.read();
        stream.close();
        stream.cleanUp();
    }
}

主函数

依照“能量数据采集->(发布能量数据)->接收能量数据并计算期望->(交付给更下游)->接收结果并发布”的顺序组织:

package com.hanss.acc;

import com.hanss.acc.ConfigureAPI.KafkaProperties;
import com.hanss.acc.JConsumer;
import com.hanss.acc.JProducer;
import com.hanss.acc.StatTopology;

/**
 * @Date Nov 20, 2017
 *
 * @Author hanss401
 *
 * @Note To run Kafka Code
 */
public class KafkaClient {

    public static void main(String[] args) {
        JProducer pro = new JProducer("EnergyInfo");
        pro.start();

        StatTopology topo = new StatTopology();
        topo.start();    

        //sleep(3*1000);
        JConsumer con = new JConsumer("EnergyCount");
        con.start();
    }
}

尾声

P1:消息发布者;
EP2:第一个计算节点;
SP3:第二个计算节点;

[root@master AccBak]# mvn exec:java -Dexec.mainClass="com.hanss.acc.KafkaClient"
[INFO] Scanning for projects...
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] Building Acc 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:1.6.0:java (default-cli) @ Acc ---
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
P1 发送了消息->[探测到粒子: 1733.223  Mev]
P1 消息主题:粒子信息收集
EP2 正在计算
EP2 计算散射能量期望 : 888 Mev
SP3 收到计算结果: 888 Mev
P1 发送了消息->[探测到粒子: 569.331  Mev]
P1 消息主题:粒子信息收集
EP2 正在计算
EP2 计算散射能量期望 : 306 Mev
SP3 收到计算结果: 306 Mev
P1 发送了消息->[探测到粒子: 667.824  Mev]
P1 消息主题:粒子信息收集
EP2 正在计算
EP2 计算散射能量期望 : 355 Mev
SP3 收到计算结果: 355 Mev
P1 发送了消息->[探测到粒子: 1217.001  Mev]
P1 消息主题:粒子信息收集
EP2 正在计算
EP2 计算散射能量期望 : 630 Mev
SP3 收到计算结果: 630 Mev
P1 发送了消息->[探测到粒子: 1733.223  Mev]
P1 消息主题:粒子信息收集
EP2 正在计算
EP2 计算散射能量期望 : 888 Mev
SP3 收到计算结果: 888 Mev
P1 发送了消息->[探测到粒子: 569.331  Mev]
P1 消息主题:粒子信息收集
EP2 正在计算
EP2 计算散射能量期望 : 306 Mev
SP3 收到计算结果: 306 Mev
... ...

后记

1、Kafka很可能取代Storm;
2、Kafka会有更惊喜特性;
3、更虚拟的世界… …
这里写图片描述

猜你喜欢

转载自blog.csdn.net/hanss2/article/details/78629082