Spark 项目实战之数据服务分析(二)

业务逻辑的处理

首次收录

确定一定范围内的设备,获取指定时间内,经过这些设备的所有对象的集合(被设备所监测到并传输到系统的数据),再获取之前指定时间内经过这些设备的的集合,对比之间的数据来判断当前监测对象是否是首次出现。

在实践过程中,需要思考两个问题:
1、对两份数据如何进行处理才能得到一个公共部分;
2、如何对数据进行处理将数据的公共部分处理掉。

Join 有四种方式

  1. Inner join: 如果表中有至少一个匹配,则返回行 – 交集返回
  2. Left join: 即使右表中没有匹配,也从左表返回所有的行 – 左侧全返回,右侧不存在用null代替
  3. RIGHT JOIN: 即使左表中没有匹配,也从右表返回所有的行 – 右侧全返回,左侧不存在用null代替
  4. FULL JOIN: 只要其中一个表中存在匹配,就返回行 – 左右都返回,不匹配用null表示。

盗一张图来看看这几个join的区别:
在这里插入图片描述
从这幅图可以发现:左下图和右下图这两幅图比较适合我们的场景。也就是说对两份 RDD 进行左连接或右连接后,然后再过滤中 key != null 的情况,即可以获取我们想要的首次收录的情况的数据。

思路:

  1. 本地已有之前的数据集合,与当前的获取的数据集合进行 join 操作。比如,获取的数据与已有数据进行 leftOuterJoin 的左外连接。左外连接 leftOuterJoin 可以获取到与右表中相同的 key 值部分;如果右表与左表没有相同部分,则右表部分会返回 null。
  2. 对 join 表进行过滤,可以找出 右表部分 value 值为 null,那么可以确定当前的被监测对象是首次入镜的。

知道了采用什么方法来分析这个问题,现在我们需要针对不同的场景来处理问题。

  1. 离线方式。离线方式主要创建两个 RDD, 分别是一个时间内的数据集和一个指定时间段内的数据集。然后通过 RSFZ 关键字来进行 PairRDD 的处理。通过将指定时间段内的数据集(新数据集)与本地数据库中的数据进行左连接然后再过滤处理。或者是将本地数据库中的数据集与指定时间内的数据集进行右外连接,然后再进行过滤处理。这样就可以得到相应的数据。

  2. 实时方法。实时方式主要从 Kafka 获取实时数据流,然后再使用离线方式来进行数据的处理。

代码:离线

        JavaPairRDD<String,Tuple2<String,Optional<String>>> filterRDD =leftJoinRDD.filter(new Function<Tuple2<String, Tuple2<String, Optional<String>>>, Boolean>() {
            @Override
            public Boolean call(Tuple2<String, Tuple2<String, Optional<String>>> v1) throws Exception {
                Optional<String> opt = v1._2._2;
                return ! opt.isPresent();
            }
        });

实时数据流的处理与离线式的处理并无二致,主要麻烦的是如何获取数据流:

        String brokers = "bigdata01:9092";
        String topics = "personData";

        JavaStreamingContext ssc = new JavaStreamingContext(jsc, Durations.seconds(5));
        Collection<String> topicList = new HashSet<>(Arrays.asList(topics.split(",")));
        Map<String,Object> kafkaParams = new HashMap();

        kafkaParams.put("bootstrap.servers",brokers);
        kafkaParams.put("group.id","fps_group");
        kafkaParams.put("auto.offset.reset","latest");
        kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        Map<TopicPartition,Long> offsets = new HashMap<TopicPartition,Long>();

        JavaInputDStream<ConsumerRecord<String,String>> lines = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicList,kafkaParams,offsets)
        );

接下来,将分别从离线和实时两个操作的步骤进行表述。

项目代码展示:
https://github.com/yy1028500451/MonitorAnalysis/tree/master/src/main/java/com/monitor/firstrecord

猜你喜欢

转载自blog.csdn.net/dec_sun/article/details/89438773