版权声明:1911907658 https://blog.csdn.net/qq_33598343/article/details/85160133
需求:
producer: 发送例如 aa-zz
consumer:收到zz
通过-切分得到后面的,如果没有-就正常输出
Processor
public class LogProcessor implements Processor<byte [],byte []> {
ProcessorContext context ;
//初始化
public void init(ProcessorContext context) {
//传输
this.context = context;
}
//具体业务逻辑
public void process(byte[] key, byte[] value) {
//获取数据
String line = new String(value);
//切分数据
if (line.contains("-")){
line = line.split("-")[1];
}
//输出数据
context.forward(key,line.getBytes());
}
//释放资源
public void close() {
}
}
application
public class Application {
public static void main(String[] args) {
String onetopic = "t1";
String twotopic = "t2";
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.232.132:9092,192.168.232.133:9092,192.168.232.134:9092");
StreamsConfig s = new StreamsConfig(prop);
//初始化拓扑
Topology Builder = new Topology();
Builder.addSource("source",onetopic).addProcessor("processor", new ProcessorSupplier() {
public Processor<byte[],byte[]> get() {
return new LogProcessor();
}
},"source").addSink("Sink",twotopic,"processor");
//输出
KafkaStreams ka = new KafkaStreams(Builder,prop); //可以是Builder,s,我试过,好像过时了.....
ka.start();
}
}
kafka可以做流计算,但是不适合