Flink学习(二):实验一数据清洗

作者:chen_h
微信号 & QQ:862251340
微信公众号:coderpai


Flink学习(一):流处理介绍

Flink学习(二):实验一数据清洗


数据准备

首先我们需要下载实验需要的数据,下载地址如下:

wget http://training.ververica.com/trainingData/nycTaxiRides.gz
wget http://training.ververica.com/trainingData/nycTaxiFares.gz

我们只需要获取数据,不需要去进行解压。

Taxi 数据的格式

我们的出租车数据集(TaxiRide)包含有关纽约市各个出租车行程的信息。 每次出行都由两个事件表示:旅途开始和旅途结束事件。 每个事件包含11个字段:

rideId         : Long      // a unique id for each ride
taxiId         : Long      // a unique id for each taxi
driverId       : Long      // a unique id for each driver
isStart        : Boolean   // TRUE for ride start events, FALSE for ride end events
startTime      : DateTime  // the start time of a ride
endTime        : DateTime  // the end time of a ride,
                           //   "1970-01-01 00:00:00" for start events
startLon       : Float     // the longitude of the ride start location
startLat       : Float     // the latitude of the ride start location
endLon         : Float     // the longitude of the ride end location
endLat         : Float     // the latitude of the ride end location
passengerCnt   : Short     // number of passengers on the ride

注意:数据集包含坐标信息无效或缺失的记录(经度和纬度为0.0)。

还有一个包含出租车费用(Taxi Fare)数据的相关数据集,这些字段包括:

rideId         : Long      // a unique id for each ride
taxiId         : Long      // a unique id for each taxi
driverId       : Long      // a unique id for each driver
startTime      : DateTime  // the start time of a ride
paymentType    : String    // CSH or CRD
tip            : Float     // tip for this ride
tolls          : Float     // tolls for this ride
totalFare      : Float     // total fare collected

在Flink程序中生成出租车行驶数据流

注意:这些练习已经提供了使用这些出租车数据流的代码。

我们提供了Flink源函数(TaxiRideSource),该函数读取带有出租车记录的.gz文件并发出TaxiRide事件流。 源函数在事件时间运行。 TaxiFare事件有一个类似的源函数(TaxiFareSource)。

修改 ExerciseBase 文件

下载数据集后,在您的IDE中打开com.ververica.flinktraining.exercises.datastream_java.utils.ExerciseBase 类,然后编辑这两行以指向已下载的两个出租车数据文件:

pathToRideData = "YOUR DATA PATH";
pathToFareData = "YOUR DATA PATH";

实验要求

“出租车骑行数据清洗”练习的任务是通过删除未在纽约市开始或结束的事件来清理TaxiRide事件流。

GeoUtils实用程序类提供了一个静态方法isInNYC(float lon,float lat),以检查某个位置是否在NYC区域内。

总结一下就是:

  1. 筛选出始发和终点都在纽约的出租车数据。
  2. GeoUtils里面提供了是否在纽约的判断。
  3. 一个很简单的Filter

数据输入:

// get an ExecutionEnvironment
StreamExecutionEnvironment env =
  StreamExecutionEnvironment.getExecutionEnvironment();
// configure event-time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// get the taxi ride data stream
DataStream<TaxiRide> rides = env.addSource(
  new TaxiRideSource("/Users/XXX/Resources/2018/trainingData/nycTaxiRides.gz", maxDelay, servingSpeed));

数据输出

期望输出起始点不在纽约的数据到控制台。

完整代码:

package com.dataartisans.flinktraining.exercises.datastream_java.basics;

import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.ExerciseBase;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.GeoUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * The "Ride Cleansing" exercise from the Flink training
 * (http://training.data-artisans.com).
 * The task of the exercise is to filter a data stream of taxi ride records to keep only rides that
 * start and end within New York City. The resulting stream should be printed.
 * <p>
 * Parameters:
 * -input path-to-input-file
 */
public class RideCleansingExercise extends ExerciseBase {
    public static void main(String[] args) throws Exception {

        ParameterTool params = ParameterTool.fromArgs(args);
        final String input = params.get("input", ExerciseBase.pathToRideData);

        final int maxEventDelay = 60;       // events are out of order by max 60 seconds
        final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(ExerciseBase.parallelism);

        // start the data generator
        DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));

        DataStream<TaxiRide> filteredRides = rides
                // filter out rides that do not start or stop in NYC
                .filter(new NYCFilter());

        // print the filtered stream
        printOrTest(filteredRides);

        // run the cleansing pipeline
        env.execute("Taxi Ride Cleansing");
    }

    private static class NYCFilter implements FilterFunction<TaxiRide> {

        @Override
        public boolean filter(TaxiRide taxiRide) throws Exception {
            // 起点和终点都在纽约
            return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) && GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
        }
    }

}

我们只是修改了程序中的 filter 函数:

private static class NYCFilter implements FilterFunction<TaxiRide> {

        @Override
        public boolean filter(TaxiRide taxiRide) throws Exception {
            // 起点和终点都在纽约
            return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) && GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
        }
    }
发布了414 篇原创文章 · 获赞 168 · 访问量 47万+

猜你喜欢

转载自blog.csdn.net/CoderPai/article/details/104898891