目录
(1)Map详解
完成对数据集Map端的转换,并行将每一条数据转换成新的一条数据,数据分区不发生变化
Java代码实现:
package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
* @author :caizhengjie
* @description:TODO
* @date :2021/3/8 9:58 上午
*/
public class MapJava {
public static void main(String[] args) throws Exception {
// 准备环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> dateSource = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
);
/**
* map
*/
DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
@Override
public String map(String line) throws Exception {
return line.toUpperCase();
}
});
mapSource.print();
/**
* JAVA JAVA SPARK HIVE
* HIVE JAVA JAVA SPARK
* JAVA JAVA HADOOP
*/
}
}
Scala代码实现:
package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{
ExecutionEnvironment,_}
object MapScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment;
val dataSource = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
).map(line => line.toUpperCase())
.print()
}
}
(2)FlatMap详解
将接入的每一条数据转换成多条数据输出,包括空值,比如我们前面所讲的行数据切割
Java代码实现:
package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @author :caizhengjie
* @description:TODO
* @date :2021/3/7 8:26 下午
*/
public class FlatMapJava {
public static void main(String[] args) throws Exception {
// 准备环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> dateSource = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
);
/**
* map
*/
DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
@Override
public String map(String line) throws Exception {
return line.toUpperCase();
}
});
/**
* flatmap
*/
DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : s.split(" ")){
collector.collect(new Tuple2<>(word,1));
}
}
});
flatmapSource.print();
/**
* (JAVA,1)
* (JAVA,1)
* (SPARK,1)
* (HIVE,1)
* ..
*/
}
}
Scala代码实现:
package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{
ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object FlatMapScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment;
val dataSource = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
).map(line => line.toUpperCase)
.flatMap((line : String,collector : Collector[(String,Int)]) => {
// for (word <- line.split(" ")){
// collector.collect(word,1)
// }
(line.split(" ").foreach(word => collector.collect(word,1)))
})
.print()
}
}
(3)Map优化之MapPartition详解
功能与Map相似,只是MapPartition操作是在DataSet中基于分区对数据进行处理,函数调用中会按照分区将数据通过Iterator的形式传入并返回任意数量的结果值,这是对Map的一个小优化.
Java代码实现:
package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author :caizhengjie
* @description:TODO
* @date :2021/3/7 8:48 下午
*/
public class MapPartitionJava {
public static void main(String[] args) throws Exception {
// 准备环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> dateSource = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
);
/**
* mapPartition
*/
DataSet<String> mapPartition = dateSource.mapPartition(new MapPartitionFunction<String, String>() {
@Override
public void mapPartition(Iterable<String> iterable, Collector<String> collector) throws Exception {
for (String line : iterable){
collector.collect(line.toUpperCase());
}
}
});
mapPartition.print();
/**
* JAVA JAVA SPARK HIVE
* HIVE JAVA JAVA SPARK
* JAVA JAVA HADOOP
*/
}
}
Scala代码实现:
package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{
ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object MapPartitionScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment;
val dataSource = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
)
.mapPartition((iterator:Iterator[String] , collector : Collector[String]) => {
for (line <- iterator){
collector.collect(line.toUpperCase)
}
})
.print()
}
}
(4)Filter详解
根据条件对出入数据进行过滤,当条件为True后,数据元素才会传输到下游的DataSet数据集中.
Java代码实现:
package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @author :caizhengjie
* @description:TODO
* @date :2021/3/7 8:26 下午
*/
public class FilterJava {
public static void main(String[] args) throws Exception {
// 准备环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> dateSource = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
);
/**
* map
*/
DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
@Override
public String map(String line) throws Exception {
return line.toUpperCase();
}
});
/**
* flatmap
*/
DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : s.split(" ")){
collector.collect(new Tuple2<>(word,1));
}
}
});
/**
* filter
*/
DataSet<Tuple2<String,Integer>> filterSource = flatmapSource.filter(new FilterFunction<Tuple2<String, Integer>>() {
@Override
public boolean filter(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return "SPARK".equals(stringIntegerTuple2.f0);
}
});
filterSource.print();
/**
* (SPARK,1)
* (SPARK,1)
*/
}
}
Scala代码实现:
package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{
ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object FilterScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment;
val dataSource: Unit = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
).map(line => line.toUpperCase)
.flatMap((line : String,collector : Collector[(String,Int)]) => {
// for (word <- line.split(" ")){
// collector.collect(word,1)
// }
(line.split(" ").foreach(word => collector.collect(word,1)))
})
.filter(tuple2 => (tuple2._1.equals("SPARK")))
.print()
}
}
(5)Reduce详解
通过两两合并,将数据集中的元素合并成一个元素,可以在整个数据集上使用。
reduce是读取一条一条的聚合
Java代码实现:
package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @author :caizhengjie
* @description:TODO
* @date :2021/3/7 8:26 下午
*/
public class ReduceJava {
public static void main(String[] args) throws Exception {
// 准备环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> dateSource = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
);
/**
* map
*/
DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
@Override
public String map(String line) throws Exception {
return line.toUpperCase();
}
});
/**
* flatmap
*/
DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : s.split(" ")){
collector.collect(new Tuple2<>(word,1));
}
}
});
/**
* reduce
*/
DataSet<Tuple2<String,Integer>> reduceSource = flatmapSource.groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>(t1.f0,t1.f1 + t2.f1);
}
});
reduceSource.print();
/**
* (HIVE,2)
* (HADOOP,1)
* (JAVA,6)
* (SPARK,2)
*/
}
}
Scala代码实现:
package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{
ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object ReduceScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment;
val dataSource = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
).map(line => line.toUpperCase)
.flatMap((line : String,collector : Collector[(String,Int)]) => {
// for (word <- line.split(" ")){
// collector.collect(word,1)
// }
(line.split(" ").foreach(word => collector.collect(word,1)))
})
.groupBy(0)
.reduce((x,y) => (x._1,x._2+y._2))
.print()
}
}
(6)ReduceGroup详解
GroupReduce算子应用在一个已经分组了的DataSet上,其会对每个分组都调用到用户定义的group-reduce函数。它与Reduce的区别在于用户定义的函数会立即获得整个组。
将一组元素合并成一个或者多个元素,可以在整个数据集上使用。这是对reduce程序的一个小优化。
reduceGroup是一次性读取,比如一次读取10个,然后统一的去做聚合
Java代码实现:
package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @author :caizhengjie
* @description:TODO
* @date :2021/3/7 8:26 下午
*/
public class ReduceGroupJava {
public static void main(String[] args) throws Exception {
// 准备环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> dateSource = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
);
/**
* map
*/
DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
@Override
public String map(String line) throws Exception {
return line.toUpperCase();
}
});
/**
* flatmap
*/
DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : s.split(" ")){
collector.collect(new Tuple2<>(word,1));
}
}
});
/**
* ReduceGroup
*/
DataSet<Tuple2<String,Integer>> reduceSource = flatmapSource.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
String key = null;
int count = 0;
for (Tuple2<String,Integer> tuple2 : iterable){
key = tuple2.f0;
count = count + tuple2.f1;
}
collector.collect(new Tuple2<>(key,count));
}
});
reduceSource.print();
/**
* (HIVE,2)
* (HADOOP,1)
* (JAVA,6)
* (SPARK,2)
*/
}
}
Scala代码实现:
package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{
ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object ReduceGroupScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment;
val dataSource = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
).map(line => line.toUpperCase)
.flatMap((line : String,collector : Collector[(String,Int)]) => {
// for (word <- line.split(" ")){
// collector.collect(word,1)
// }
(line.split(" ").foreach(word => collector.collect(word,1)))
})
.groupBy(0)
.reduceGroup((x => x reduce((x,y) => (x._1,x._2 + y._2))))
.print()
}
}
(7)ReduceGroup优化之CombineGroup详解
我们可以通过CombineGroup事先在每一台机器上进行聚合操作,再通过ReduceGroup将每台机器CombineGroup输出的结果进行聚合,这样的话,ReduceGroup需要汇总的数据量就少很多,从而加快计算的速度。
Java代码实现:
package com.aikfk.flink.dataset.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @author :caizhengjie
* @description:TODO
* @date :2021/3/7 8:26 下午
*/
public class CombineGroupJava {
public static void main(String[] args) throws Exception {
// 准备环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> dateSource = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
);
/**
* map
*/
DataSet<String> mapSource = dateSource.map(new MapFunction<String, String>() {
@Override
public String map(String line) throws Exception {
return line.toUpperCase();
}
});
/**
* flatmap
*/
DataSet<Tuple2<String,Integer>> flatmapSource = mapSource.flatMap(
new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : s.split(" ")){
collector.collect(new Tuple2<>(word,1));
}
}
});
/**
* CombineGroup
*/
DataSet<Tuple2<String,Integer>> combineGroupSource = flatmapSource.groupBy(0).combineGroup(
new GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void combine(Iterable<Tuple2<String, Integer>> iterable,
Collector<Tuple2<String, Integer>> collector) throws Exception {
String key = null;
int count = 0;
for (Tuple2<String,Integer> tuple2 : iterable){
key = tuple2.f0;
count = count + tuple2.f1;
}
collector.collect(new Tuple2<>(key,count));
}
});
/**
* ReduceGroup
*/
DataSet<Tuple2<String,Integer>> reduceSource = combineGroupSource.groupBy(0).reduceGroup(
new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void reduce(Iterable<Tuple2<String, Integer>> iterable,
Collector<Tuple2<String, Integer>> collector) throws Exception {
String key = null;
int count = 0;
for (Tuple2<String,Integer> tuple2 : iterable){
key = tuple2.f0;
count = count + tuple2.f1;
}
collector.collect(new Tuple2<>(key,count));
}
});
reduceSource.print();
/**
* (HIVE,2)
* (HADOOP,1)
* (JAVA,6)
* (SPARK,2)
*/
}
}
Scala代码实现:
package com.aikfk.flink.dataset.transform
import org.apache.flink.api.scala.{
ExecutionEnvironment, _}
import org.apache.flink.util.Collector
object CombineGroupScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment;
val dataSource = env.fromElements(
"java java spark hive",
"hive java java spark",
"java java hadoop"
).map(line => line.toUpperCase)
.flatMap((line : String,collector : Collector[(String,Int)]) => {
// for (word <- line.split(" ")){
// collector.collect(word,1)
// }
(line.split(" ").foreach(word => collector.collect(word,1)))
})
.groupBy(0)
.combineGroup((iterator,combine_collector : Collector[(String,Int)]) => {
combine_collector.collect(iterator reduce((t1,t2) => (t1._1,t1._2 + t2._2)))
})
.groupBy(0)
.reduceGroup((x => x reduce((x,y) => (x._1,x._2 + y._2))))
.print()
}
}
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!