(一)flink的DataSet读取文件以及外连接

读取CSV文件

package batch;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple3;

public class ReadCsvFile {
    public static void main(String[] args) throws Exception {
        //获取一个执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //读取输入数据
        DataSource<Tuple3<Integer, Integer, String>> csvDS = env.readCsvFile("D:\\Downloads\\000代码+PPT\\数据源\\user.csv")
                .includeFields("11100")     //对于csv文件中的字段是否读取1代表读取,0代表不读取
                .ignoreFirstLine()          //是否忽视第一行
                .ignoreInvalidLines()       //对于不合法的行是否忽视
                .ignoreComments("##")       //忽视带有##号的行
                .lineDelimiter("\n")        //行分隔符
                .fieldDelimiter(",")        //列分隔符
                .types(Integer.class,Integer.class,String.class);       //转换出的字段类型

        csvDS.print();
    }
}

递归读取目录下的文件

package batch;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;


public class ReursionReadDemo {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        Configuration conf = new Configuration();

        //设置递归参数
        conf.setBoolean("recursive.file.enumeration",true);
        DataSource<String> ds = env.readTextFile("D:\\testData").withParameters(conf);
        ds.print();
    }
}

Union

将两个数据集组成一个数据集(需要类型一样)

package batch;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.ArrayList;

public class UnionDemo {

    public static void main(String[] args) throws Exception {
        //获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer,String>> list1 = new ArrayList<>();
        list1.add(new Tuple2<>(101,"lily"));
        list1.add(new Tuple2<>(102,"lucy"));
        list1.add(new Tuple2<>(103,"tom"));

        ArrayList<Tuple2<Integer,String>> list2 = new ArrayList<>();
        list2.add(new Tuple2<>(101,"lili"));
        list2.add(new Tuple2<>(102,"jack"));
        list2.add(new Tuple2<>(103,"jetty"));

        DataSet<Tuple2<Integer, String>> ds1 = env.fromCollection(list1);
        DataSet<Tuple2<Integer, String>> ds2 = env.fromCollection(list2);

        DataSet<Tuple2<Integer, String>> union = ds1.union(ds2);

        union.print();
    }
}

结果

(101,lily)
(101,lili)
(102,lucy)
(102,jack)
(103,tom)
(103,jetty)

inner join

Default join生成一个包含两个字段的新元组DataSet。每个元组在第一个字段中保存第一个输入DataSet的连接元素,在第二个字段中保存第二个输入DataSet的匹配元素

package batch;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.ArrayList;

public class DefaultJoinDemo {
    public static void main(String[] args) throws Exception {
        //获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer,String>> list1 = new ArrayList<>();
        list1.add(new Tuple2<>(1,"lily"));
        list1.add(new Tuple2<>(2,"lucy"));
        list1.add(new Tuple2<>(3,"tom"));
        list1.add(new Tuple2<>(4,"jack"));

        ArrayList<Tuple2<Integer,String>> list2 = new ArrayList<>();
        list2.add(new Tuple2<>(1,"beijing"));
        list2.add(new Tuple2<>(2,"shanghai"));
        list2.add(new Tuple2<>(3,"guangzhou"));

        DataSet<Tuple2<Integer, String>> ds1 = env.fromCollection(list1);
        DataSet<Tuple2<Integer, String>> ds2 = env.fromCollection(list2);

        DataSet<Tuple2<Tuple2<Integer,String>, Tuple2<Integer,String>>> joinedData = ds1.join(ds2).where(0).equalTo(0);
        joinedData.print();
    }
}

结果

((3,tom),(3,guangzhou))
((1,lily),(1,beijing))
((2,lucy),(2,shanghai))

join与对象结合

引入依赖

    <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.0</version>
            <!--<scope>provided</scope>-->
        </dependency>
package batch;

import lombok.Data;
import lombok.ToString;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.ArrayList;

public class JoinWithObject {
    public static void main(String[] args) throws Exception {
        //获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> list1 = new ArrayList<>();
        list1.add(new Tuple2<>(1, "lily"));
        list1.add(new Tuple2<>(2, "lucy"));
        list1.add(new Tuple2<>(3, "tom"));
        list1.add(new Tuple2<>(4, "jack"));

        ArrayList<Tuple2<Integer, String>> list2 = new ArrayList<>();
        list2.add(new Tuple2<>(1, "beijing"));
        list2.add(new Tuple2<>(2, "shanghai"));
        list2.add(new Tuple2<>(3, "guangzhou"));

        DataSet<Tuple2<Integer, String>> ds1 = env.fromCollection(list1);
        DataSet<Tuple2<Integer, String>> ds2 = env.fromCollection(list2);

        JoinOperator.EquiJoin<Tuple2<Integer, String>, Tuple2<Integer, String>, UserInfo> userObj = ds1.join(ds2).where(0).equalTo(0)
                .with(new UserInfoJoinFun());
        userObj.print();
    }

    public static class UserInfoJoinFun implements JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, UserInfo> {
        @Override
        public UserInfo join(Tuple2<Integer, String> f, Tuple2<Integer, String> s) throws Exception {
           return new UserInfo(f.f0, f.f1, s.f1);
        }
    }

    @Data
    @ToString
    public static class UserInfo {
        private Integer userId;
        private String userName;
        private String address;

        public UserInfo(Integer userId, String userName, String address) {
            this.userId = userId;
            this.userName = userName;
            this.address = address;
        }

    }
}

结果

JoinWithObject.UserInfo(userId=3, userName=tom, address=guangzhou)
JoinWithObject.UserInfo(userId=1, userName=lily, address=beijing)
JoinWithObject.UserInfo(userId=2, userName=lucy, address=shanghai)

左外连接,右外连接,全外连接

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;

import java.util.ArrayList;

/**
 * 外连接
 *
 * @author dajiangtai
 * @create 2019-07-29-11:50
 */
public class OuterJoinDemo {
    public static void main(String[] args) throws Exception {
        //获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer,String>> list1 = new ArrayList<>();
        list1.add(new Tuple2<>(1,"lily"));
        list1.add(new Tuple2<>(2,"lucy"));
        list1.add(new Tuple2<>(4,"jack"));

        ArrayList<Tuple2<Integer,String>> list2 = new ArrayList<>();
        list2.add(new Tuple2<>(1,"beijing"));
        list2.add(new Tuple2<>(2,"shanghai"));
        list2.add(new Tuple2<>(3,"guangzhou"));

        DataSet<Tuple2<Integer, String>> ds1 = env.fromCollection(list1);
        DataSet<Tuple2<Integer, String>> ds2 = env.fromCollection(list2);

        /**
         * 左外连接
         * 注意:second tuple中的元素可能为null
         */
//        ds1.leftOuterJoin(ds2)
//                .where(0)
//                .equalTo(0)
//                .with(new JoinFunction<Tuple2<Integer, String>,Tuple2<Integer, String>,Tuple3<Integer,String,String>>(){
//                    @Override
//                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
//                       if(second == null){
//                           return new Tuple3<>(first.f0,first.f1,"null");
//                       }else{
//                           return new Tuple3<>(first.f0,first.f1,second.f1);
//                       }
//                    }
//                }).print();
        /**
         * 右外连接
         * 注意:first 这个tuple中的数据可能为null
         *
         */
//        ds1.rightOuterJoin(ds2)
//                .where(0)
//                .equalTo(0)
//                .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() {
//                    @Override
//                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
//                        if(first == null){
//                            return new Tuple3<>(second.f0,"null",second.f1);
//                        }else{
//                            return new Tuple3<>(first.f0,first.f1,second.f1);
//                        }
//                    }
//                }).print();
        /**
         * 全外连接
         * 注意:first 和 second 他们的tuple 都有可能为 null
         */
        ds1.fullOuterJoin(ds2)
                .where(0)
                .equalTo(0)
                .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() {
                    @Override
                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                       if(first == null){
                           return new Tuple3<>(second.f0,"null",second.f1);
                       }else if(second == null){
                           return  new Tuple3<>(first.f0,first.f1,"null");
                       }else{
                           return new Tuple3<>(first.f0,first.f1,second.f1);
                       }
                    }
                }).print();
    }
}
发布了483 篇原创文章 · 获赞 62 · 访问量 14万+

猜你喜欢

转载自blog.csdn.net/wwwzydcom/article/details/103827834