kafka的流计算框架

版权声明:1911907658 https://blog.csdn.net/qq_33598343/article/details/85160133

需求:

producer: 发送例如  aa-zz
consumer:收到zz
通过-切分得到后面的,如果没有-就正常输出

Processor

public class LogProcessor implements Processor<byte [],byte []> {
    ProcessorContext context ;
    //初始化
    public void init(ProcessorContext context) {
        //传输
        this.context = context;
    }
    //具体业务逻辑
    public void process(byte[] key, byte[] value) {
        //获取数据
        String line = new String(value);
        //切分数据
        if (line.contains("-")){
            line = line.split("-")[1];
        }

        //输出数据
        context.forward(key,line.getBytes());

    }
    //释放资源
    public void close() {

    }
}

application

public class Application {
    public static void main(String[] args) {
        String onetopic = "t1";
        String twotopic = "t2";
        Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.232.132:9092,192.168.232.133:9092,192.168.232.134:9092");
        StreamsConfig s = new StreamsConfig(prop);
        //初始化拓扑
        Topology Builder = new Topology();
        Builder.addSource("source",onetopic).addProcessor("processor", new ProcessorSupplier() {
            public Processor<byte[],byte[]> get() {
                return new LogProcessor();
            }
        },"source").addSink("Sink",twotopic,"processor");

        //输出
        KafkaStreams ka = new KafkaStreams(Builder,prop); //可以是Builder,s,我试过,好像过时了.....
        ka.start();
    }
}

kafka可以做流计算,但是不适合

猜你喜欢

转载自blog.csdn.net/qq_33598343/article/details/85160133