前言
当前在互联网上的言论众多,如何对用户的言论进行实时监控和报警呢?并且,对于监控的敏感词经常要进行变更,如何实现动态更新呢?下面,笔者就带领大家进入实战环节。
用户数据写入
假设从 kafka 接收的消息格式如下:
{
"userId": 10010,
"content": "我爱你中国",
"createTime": "2020-01-01 12:00:00"
}
由于用户发表的言论众多,有可能需要对用户触发敏感词的言论次数做监控,所以我们对消息根据 userId 做分组
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(kafkaProps.getProperty("kafka.user.opinion.topic"), new SimpleStringSchema(), properties);
DataStreamSource<String> transaction = env.addSource(consumer);
KeyedStream<UserOpinionData, Tuple> sourceStream = transaction.map(s -> UserOpinionData.buildUserOpinionData(s))
……
.uid("user-opition-source")
.keyBy("userId");
敏感词更新写入
我们将需要监控的敏感词存放在 mysql,当启动任务时,从 mysql 加载敏感词,而当需要更新敏感词时,则更新数据库保存的敏感词信息。要实现敏感词的动态更新,需要使用广播,再监听一个 rabbitmq,用于写入更新的敏感词数据。
// 敏感词信息
RMQConnectionConfig configConnection = new RMQConnectionConfig.Builder()
……
// 广播流
BroadcastStream configBroadcastStream = configStream
……
.broadcast(new MapStateDescriptor(
"user-opinion-broadcast-state-desc",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<SensitiveWordConfig>(){})
));
// 数据处理
sourceStream
.connect(configBroadcastStream)
.process(new KafkaUserOpinionBroadcastProcessFunction(parameterTool))
.uid("broadcast-connect-process");
在设计数据库表时,需要特别注意,因为 flink task 通常是多并发的,会造成写入数据库的敏感词重复。解决办法就是将敏感词字段设置为唯一索引,由数据库来控制敏感词的唯一性。
数据处理
敏感词匹配使用的是 hutoo-dfa 工具类,是基于 DFA 算法,能够快速查找需要匹配的词。
public class KafkaUserOpinionBroadcastProcessFunction extends KeyedBroadcastProcessFunction<Tuple, UserOpinionData, SensitiveWordConfig, String> implements Serializable {
@Override
public void open(Configuration parameters) throws Exception {
// 加载敏感词列表
// 构建敏感词树
}
@Override
public void processElement(UserOpinionData userOpinionData, ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
// 用户言论信息监控,全词匹配所有匹配的敏感词
List<String> matchSensitiveWord(wordTree, userOpinionData.getContent());
// 其他处理
}
/**
* 敏感词匹配
* */
public List<String> matchSensitiveWord(WordTree wordTree, String word) {
List<String> matchList = new ArrayList<>();
if(wordTree == null || StringUtils.isEmpty(word)) {
return matchList;
}
return wordTree.matchAll(word, -1, true, true);
}
@Override
public void processBroadcastElement(SensitiveWordConfig sensitiveWordConfig, Context ctx, Collector<String> collector) throws Exception {
// 敏感词更新,更新到 mysql
}
}
在对用户的言论进行敏感词匹配前,往往需要先对言论进行特殊字符的过滤处理,笔者使用的是正则过滤。
hutool-dfa 敏感词匹配的模式有多种,大家可以根据需要进行设置。
告警通知
当用户的言论触发了监控规则,则可以通过 http 或者 写入队列等方式,将触发敏感词的用户言论通知下游处理。
如果有写的不对的地方,欢迎大家指正。有什么疑问,欢迎加QQ群:176098255