读取CSV文件
package batch;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple3;
public class ReadCsvFile {
public static void main(String[] args) throws Exception {
//获取一个执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//读取输入数据
DataSource<Tuple3<Integer, Integer, String>> csvDS = env.readCsvFile("D:\\Downloads\\000代码+PPT\\数据源\\user.csv")
.includeFields("11100") //对于csv文件中的字段是否读取1代表读取,0代表不读取
.ignoreFirstLine() //是否忽视第一行
.ignoreInvalidLines() //对于不合法的行是否忽视
.ignoreComments("##") //忽视带有##号的行
.lineDelimiter("\n") //行分隔符
.fieldDelimiter(",") //列分隔符
.types(Integer.class,Integer.class,String.class); //转换出的字段类型
csvDS.print();
}
}
递归读取目录下的文件
package batch;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;
public class ReursionReadDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Configuration conf = new Configuration();
//设置递归参数
conf.setBoolean("recursive.file.enumeration",true);
DataSource<String> ds = env.readTextFile("D:\\testData").withParameters(conf);
ds.print();
}
}
Union
将两个数据集组成一个数据集(需要类型一样)
package batch;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.ArrayList;
public class UnionDemo {
public static void main(String[] args) throws Exception {
//获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer,String>> list1 = new ArrayList<>();
list1.add(new Tuple2<>(101,"lily"));
list1.add(new Tuple2<>(102,"lucy"));
list1.add(new Tuple2<>(103,"tom"));
ArrayList<Tuple2<Integer,String>> list2 = new ArrayList<>();
list2.add(new Tuple2<>(101,"lili"));
list2.add(new Tuple2<>(102,"jack"));
list2.add(new Tuple2<>(103,"jetty"));
DataSet<Tuple2<Integer, String>> ds1 = env.fromCollection(list1);
DataSet<Tuple2<Integer, String>> ds2 = env.fromCollection(list2);
DataSet<Tuple2<Integer, String>> union = ds1.union(ds2);
union.print();
}
}
结果
(101,lily)
(101,lili)
(102,lucy)
(102,jack)
(103,tom)
(103,jetty)
inner join
Default join生成一个包含两个字段的新元组DataSet。每个元组在第一个字段中保存第一个输入DataSet的连接元素,在第二个字段中保存第二个输入DataSet的匹配元素
package batch;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.ArrayList;
public class DefaultJoinDemo {
public static void main(String[] args) throws Exception {
//获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer,String>> list1 = new ArrayList<>();
list1.add(new Tuple2<>(1,"lily"));
list1.add(new Tuple2<>(2,"lucy"));
list1.add(new Tuple2<>(3,"tom"));
list1.add(new Tuple2<>(4,"jack"));
ArrayList<Tuple2<Integer,String>> list2 = new ArrayList<>();
list2.add(new Tuple2<>(1,"beijing"));
list2.add(new Tuple2<>(2,"shanghai"));
list2.add(new Tuple2<>(3,"guangzhou"));
DataSet<Tuple2<Integer, String>> ds1 = env.fromCollection(list1);
DataSet<Tuple2<Integer, String>> ds2 = env.fromCollection(list2);
DataSet<Tuple2<Tuple2<Integer,String>, Tuple2<Integer,String>>> joinedData = ds1.join(ds2).where(0).equalTo(0);
joinedData.print();
}
}
结果
((3,tom),(3,guangzhou))
((1,lily),(1,beijing))
((2,lucy),(2,shanghai))
join与对象结合
引入依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
<!--<scope>provided</scope>-->
</dependency>
package batch;
import lombok.Data;
import lombok.ToString;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.ArrayList;
public class JoinWithObject {
public static void main(String[] args) throws Exception {
//获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> list1 = new ArrayList<>();
list1.add(new Tuple2<>(1, "lily"));
list1.add(new Tuple2<>(2, "lucy"));
list1.add(new Tuple2<>(3, "tom"));
list1.add(new Tuple2<>(4, "jack"));
ArrayList<Tuple2<Integer, String>> list2 = new ArrayList<>();
list2.add(new Tuple2<>(1, "beijing"));
list2.add(new Tuple2<>(2, "shanghai"));
list2.add(new Tuple2<>(3, "guangzhou"));
DataSet<Tuple2<Integer, String>> ds1 = env.fromCollection(list1);
DataSet<Tuple2<Integer, String>> ds2 = env.fromCollection(list2);
JoinOperator.EquiJoin<Tuple2<Integer, String>, Tuple2<Integer, String>, UserInfo> userObj = ds1.join(ds2).where(0).equalTo(0)
.with(new UserInfoJoinFun());
userObj.print();
}
public static class UserInfoJoinFun implements JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, UserInfo> {
@Override
public UserInfo join(Tuple2<Integer, String> f, Tuple2<Integer, String> s) throws Exception {
return new UserInfo(f.f0, f.f1, s.f1);
}
}
@Data
@ToString
public static class UserInfo {
private Integer userId;
private String userName;
private String address;
public UserInfo(Integer userId, String userName, String address) {
this.userId = userId;
this.userName = userName;
this.address = address;
}
}
}
结果
JoinWithObject.UserInfo(userId=3, userName=tom, address=guangzhou)
JoinWithObject.UserInfo(userId=1, userName=lily, address=beijing)
JoinWithObject.UserInfo(userId=2, userName=lucy, address=shanghai)
左外连接,右外连接,全外连接
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;
/**
* 外连接
*
* @author dajiangtai
* @create 2019-07-29-11:50
*/
public class OuterJoinDemo {
public static void main(String[] args) throws Exception {
//获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer,String>> list1 = new ArrayList<>();
list1.add(new Tuple2<>(1,"lily"));
list1.add(new Tuple2<>(2,"lucy"));
list1.add(new Tuple2<>(4,"jack"));
ArrayList<Tuple2<Integer,String>> list2 = new ArrayList<>();
list2.add(new Tuple2<>(1,"beijing"));
list2.add(new Tuple2<>(2,"shanghai"));
list2.add(new Tuple2<>(3,"guangzhou"));
DataSet<Tuple2<Integer, String>> ds1 = env.fromCollection(list1);
DataSet<Tuple2<Integer, String>> ds2 = env.fromCollection(list2);
/**
* 左外连接
* 注意:second tuple中的元素可能为null
*/
// ds1.leftOuterJoin(ds2)
// .where(0)
// .equalTo(0)
// .with(new JoinFunction<Tuple2<Integer, String>,Tuple2<Integer, String>,Tuple3<Integer,String,String>>(){
// @Override
// public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
// if(second == null){
// return new Tuple3<>(first.f0,first.f1,"null");
// }else{
// return new Tuple3<>(first.f0,first.f1,second.f1);
// }
// }
// }).print();
/**
* 右外连接
* 注意:first 这个tuple中的数据可能为null
*
*/
// ds1.rightOuterJoin(ds2)
// .where(0)
// .equalTo(0)
// .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() {
// @Override
// public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
// if(first == null){
// return new Tuple3<>(second.f0,"null",second.f1);
// }else{
// return new Tuple3<>(first.f0,first.f1,second.f1);
// }
// }
// }).print();
/**
* 全外连接
* 注意:first 和 second 他们的tuple 都有可能为 null
*/
ds1.fullOuterJoin(ds2)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
if(first == null){
return new Tuple3<>(second.f0,"null",second.f1);
}else if(second == null){
return new Tuple3<>(first.f0,first.f1,"null");
}else{
return new Tuple3<>(first.f0,first.f1,second.f1);
}
}
}).print();
}
}