概述
Trident是一个高级抽象,用于在Storm之上进行实时计算。它允许您无缝混合高吞吐量(每秒数百万条消息),有状态流处理和低延迟分布式查询。如果您熟悉Pig或Cascading等高级批处理工具,Trident的概念将非常熟悉 - Trident具有连接,聚合,分组,功能和过滤器。除此之外,Trident还添加了基元,用于在任何数据库或持久性存储之上执行有状态的增量处理。 Trident具有一致, exactly-once 的语义,因此很容易推理Trident拓扑。
Trident和kafka集成
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
public class KafkaSpoutUtils {
public static KafkaSpout<String, String> buildKafkaSpout(String boostrapServers, String topic){
KafkaSpoutConfig<String,String> kafkaspoutConfig=KafkaSpoutConfig.builder(boostrapServers,topic)
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
.setProp(ConsumerConfig.GROUP_ID_CONFIG,"g1")
.setEmitNullTuples(false)
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE)
.setMaxUncommittedOffsets(10)//一旦分区积压有10个未提交offset,Spout停止poll数据,解决Storm背压问题
.build();
return new KafkaSpout<String, String>(kafkaspoutConfig);
}
//可以保证精准一次更新,推荐使用
public static KafkaTridentSpoutOpaque<String,String> buildKafkaSpoutOpaque(String boostrapServers, String topic){
KafkaTridentSpoutConfig<String, String> kafkaOpaqueSpoutConfig = KafkaTridentSpoutConfig.builder(boostrapServers, topic)
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
.setProp(ConsumerConfig.GROUP_ID_CONFIG,"g1")
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
.setRecordTranslator(new Func<ConsumerRecord<String, String>, List<Object>>() {
public List<Object> apply(ConsumerRecord<String, String> record) {
return new Values(record.key(),record.value(),record.timestamp());
}
},new Fields("key","value","timestamp"))
.build();
return new KafkaTridentSpoutOpaque<String, String>(kafkaOpaqueSpoutConfig);
}
}
public static void main(String[] args) throws Exception {
TridentTopology tridentTopology=new TridentTopology();
tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
.peek((TridentTuple input) ->{
System.out.println(input);
});
new LocalCluster().submitTopology("tridentTopology",new Config(),tridentTopology.build());
}
常见算子
- Map算子
将一个Tuple转换为另外一个Tuple,如果用户修改了Tuple元素的个数,需要指定输出的Fields
tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
.map((tuple)-> new Values("Hello~"+tuple.getStringByField("value")),new Fields("name"))
.peek((tuple) -> System.out.println(tuple));
- Filter
过滤上游输入的Tuple将满足条件的Tuple向下游输出。
tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
.filter(new Fields("value"), new BaseFilter() {
@Override
public boolean isKeep(TridentTuple tuple) {
System.out.println(tuple.getFields());
return !tuple.getStringByField("value").contains("error");
}
})
.peek((tuple) -> System.out.println(tuple));
- flatMap
将一个Tuple,转换为多个Tuple,如果修改了Tuple的数目,需要指定输出的Fields
tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
.flatMap((tuple)->{
List<Values> list=new ArrayList<>();
String[] tokens = tuple.getStringByField("value").split("\\W+");
for (String token : tokens) {
list.add(new Values(token));
}
return list;
},new Fields("word"))
.peek((tuple) -> System.out.println(tuple));
- each
参数传递可以是BaseFunction(添加fields)和BaseFilter(等价于Filter)
basefunction
tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
.each(new Fields("value"), new BaseFunction() {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
collector.emit(new Values(tuple.getStringByField("value")));
}
}, new Fields("other"))
.peek((tuple) -> System.out.println(tuple));
basefilter
tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
.each(new Fields("value"), new BaseFilter() {
@Override
public boolean isKeep(TridentTuple tuple) {
return !tuple.getStringByField("value").contains("error");
}
})
.peek((tuple) -> System.out.println(tuple));
- project
投影/过滤Tuple中无用field
tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
.project(new Fields("value","timestamp"))
.peek((tuple) -> System.out.println(tuple));
分区和聚合
public class KafkaTridentTopology {
public static void main(String[] args) throws Exception {
TridentTopology tridentTopology=new TridentTopology();
tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
.parallelismHint(3)
.project(new Fields("value"))
.flatMap((tuple)-> {
List<Values> list=new ArrayList<>();
String[] tokens = tuple.getStringByField("value").split("\\W+");
for (String token : tokens) {
list.add(new Values(token));
}
return list;
},new Fields("word"))
.map((tuple)->new Values(tuple.getStringByField("word"),1),new Fields("key","count"))
.partition(new PartialKeyGrouping(new Fields("key")))
.parallelismHint(5)
.partitionAggregate(new Fields("key","count"),new CountAggregater(),new Fields("word","total"))
.peek((tuple) -> System.out.println(tuple));
new LocalCluster().submitTopology("tridentTopology",new Config(),tridentTopology.build());
}
}
CountAggregater
public class CountAggregater extends BaseAggregator<Map<String,Integer>> {
@Override
public Map<String, Integer> init(Object batchId, TridentCollector collector) {
return new HashMap<>();
}
@Override
public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) {
String word = tuple.getStringByField("key");
Integer count=tuple.getIntegerByField("count");
if(val.containsKey(word)){
count= val.get(word)+count;
}
val.put(word,count);
}
@Override
public void complete(Map<String, Integer> val, TridentCollector collector) {
for (Map.Entry<String, Integer> entry : val.entrySet()) {
collector.emit(new Values(entry.getKey(),entry.getValue()));
}
val.clear();
}
}
Trident 状态管理
Trident以容错方式管理状态,以便在重试和失败时状态更新是幂等的。这使您可以推理Trident拓扑,就好像每条消息都被精确处理一次。 在进行状态更新时,可以实现各种级别的容错。在开始之前,让我们看一个示例,说明实现一次性语义所需的技巧。假设您正在对流进行计数聚合,并希望将运行计数存储在数据库中。现在假设您在数据库中存储了一个表示计数的值,并且每次处理新tuple时都会增加计数。
发生故障时,将重放tuple。这会在执行状态更新(或任何有副作用的事物)时出现问题 - 您不知道以前是否曾基于此tuple成功更新状态。也许你以前从未处理过tuple,在这种情况下你应该增加计数。也许你已经处理了tuple并成功递增了计数,但是tuple在另一个步骤中处理失败。在这种情况下,您不应增加计数。或许您之前看过tuple但在更新数据库时出错。在这种情况下,您应该更新数据库。
通过将计数存储在数据库中,您不知道之前是否已处理此tuple。因此,您需要更多信息才能做出正确的决定。 Trident提供以下语义,足以实现一次性处理语义:
- tuple作为小批量处理(参见教程)
- 每批元组都有一个称为“事务ID”(txid)的唯一ID。如果批量重播,则给出完全相同的txid。
- 批次之间订购状态更新。也就是说,在批处理2的状态更新成功之前,将不会应用批处理3的状态更新。
在容错方面有三种可能的喷口:“非事务性(non-transactional)”,“事务性(transactional)”和“不透明事务性(opaque transactional)”。同样,在容错方面有三种可能的状态:“非事务性(non-transactional)”,“事务性(transactional)”和“不透明事务性(opaque transactional)”。
Transactional spouts(事务)
Trident将tuple作为小批量处理,每个批次都被赋予唯一的事务ID。spout的属性根据它们可以提供的关于每批中的含量的保证而变化。事务性spout具有以下属性:
- 给定txid的批次始终相同。(失败事务ID不变且 事务中的Tuple不变)
- 批处理tuple之间没有重叠(tuple是一批或另一批,永远不是多tuple)。
- 每个tuple都在一个批处理中(没有跳过元组)
//txid不同 更新value、txid相同跳过
key - [txid,value]
流被分成永不改变的固定批次,Storm为Kafka实现了一个transactional spout。
Opaque transactional spouts(不透明的事务)
不透明的事务性spout不保证txid中的一批tuple保持不变。不透明的事务性spout具有以下属性:
- 失败事务ID不变,但是同一个批次的tuple可以不同(允许分区数据丢失)。
- 保证状态更新严格有序
窗口
- slidingWindow
public class TridentWindowDemo {
public static void main(String[] args) throws Exception {
TridentTopology tridentTopology = new TridentTopology();
tridentTopology.newStream("KafkaSpoutOpaque",
KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092", "topic01"))
.project(new Fields("value"))
.flatMap((tuple) -> {
List<Values> list = new ArrayList<>();
String[] tokens = tuple.getStringByField("value").split("\\W+");
for (String token : tokens) {
list.add(new Values(token));
}
return list;
}, new Fields("word"))
.map((tuple) -> new Values(tuple.getStringByField("word"), 1), new Fields("key", "count"))
.slidingWindow(
BaseWindowedBolt.Duration.seconds(10),
BaseWindowedBolt.Duration.seconds(5),
new InMemoryWindowsStoreFactory(),
new Fields("key","count"),
new WordCountAggregator(),
new Fields("key","total")
)
.peek(new Consumer() {
@Override
public void accept(TridentTuple input) {
System.out.println(input);
}
});
new LocalCluster().submitTopology("aa",new Config(),tridentTopology.build());
}
}
- TridentTopology
TridentTopology tridentTopology = new TridentTopology();
WindowConfig wc= SlidingDurationWindow.of(BaseWindowedBolt.Duration.seconds(10),
BaseWindowedBolt.Duration.seconds(5));
tridentTopology.newStream("KafkaSpoutOpaque",
KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092", "topic01"))
.project(new Fields("value"))
.flatMap((tuple) -> {
List<Values> list = new ArrayList<>();
String[] tokens = tuple.getStringByField("value").split("\\W+");
for (String token : tokens) {
list.add(new Values(token));
}
return list;
}, new Fields("word"))
.map((tuple) -> new Values(tuple.getStringByField("word"), 1), new Fields("key", "count"))
.window(wc,
new InMemoryWindowsStoreFactory(),
new Fields("word","count"),
new WordCountAggregator(),
new Fields("word","total")
)
.peek(new Consumer() {
@Override
public void accept(TridentTuple input) {
System.out.println(input);
}
});
new LocalCluster().submitTopology("aa",new Config(),tridentTopology.build());