开发步骤
获得一个执行环境
加载/创建初始化数据
指定操作数据的Transaction算子
指定计算好的数据的存放位置
调用execute()触发执行程序
Flink流处理 开发
import org. apache. flink. api. common. functions. FlatMapFunction;
import org. apache. flink. api. java. utils. ParameterTool;
import org. apache. flink. runtime. state. filesystem. FsStateBackend;
import org. apache. flink. runtime. state. memory. MemoryStateBackend;
import org. apache. flink. streaming. api. datastream. DataStream;
import org. apache. flink. streaming. api. datastream. DataStreamSource;
import org. apache. flink. streaming. api. environment. StreamExecutionEnvironment;
import org. apache. flink. streaming. api. windowing. time. Time;
import org. apache. flink. util. Collector;
public class SocketWindowWordCountJava {
public static void main ( String[ ] args) throws Exception{
int port;
try {
ParameterTool parameterTool = ParameterTool. fromArgs ( args) ;
port = parameterTool. getInt ( "port" ) ;
} catch ( Exception e) {
System. err. println ( "No port set. use default port 9000--Java" ) ;
port = 9000 ;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment ( ) ;
String hostname = "localhost" ;
String delimiter = "\n" ;
DataStreamSource< String> text = env. socketTextStream ( hostname, port, delimiter) ;
DataStream< WordWithCount> windowCounts = text. flatMap ( new FlatMapFunction
< String, WordWithCount> ( ) {
public void flatMap ( String value, Collector< WordWithCount> out) throws
Exception {
String[ ] splits = value. split ( "\\s" ) ;
for ( String word : splits) {
out. collect ( new WordWithCount ( word, 1 L) ) ;
}
}
} ) . keyBy ( "word" )
. timeWindow ( Time. seconds ( 2 ) , Time. seconds ( 1 ) )
. sum ( "count" ) ;
windowCounts. print ( ) . setParallelism ( 1 ) ;
env. execute ( "Socket window count" ) ;
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount ( ) { }
public WordWithCount ( String word, long count) {
this . word = word;
this . count = count;
}
@Override
public String toString ( ) {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}' ;
}
}
}
import org. apache. flink. api. java. utils. ParameterTool
import org. apache. flink. streaming. api. scala. StreamExecutionEnvironment
import org. apache. flink. streaming. api. windowing. time. Time
object SocketWindowWordCountScala {
def main ( args: Array[ String] ) : Unit = {
val port: Int = try {
ParameterTool. fromArgs ( args) . getInt ( "port" )
} catch {
case e: Exception = > {
System. err. println ( "No port set. use default port 9000--Scala" )
}
9000
}
val env: StreamExecutionEnvironment = StreamExecutionEnvironment. getExecutionEnvironment
val text = env. socketTextStream ( "localhost" , port, '\n' )
import org. apache. flink. api. scala. _
val windowCounts = text. flatMap ( line = > line. split ( "\\s" ) )
. map ( w = > WordWithCount ( w, 1 ) )
. keyBy ( "word" )
. timeWindow ( Time. seconds ( 2 ) , Time. seconds ( 1 ) )
. sum ( "count" ) ;
windowCounts. print ( ) . setParallelism ( 1 ) ;
env. execute ( "Socket window count" ) ;
}
case class WordWithCount ( word: String, count: Long)
}
Flink批处理开发
package batch;
import org. apache. flink. api. common. functions. FlatMapFunction;
import org. apache. flink. api. java. DataSet;
import org. apache. flink. api. java. ExecutionEnvironment;
import org. apache. flink. api. java. operators. DataSource;
import org. apache. flink. api. java. tuple. Tuple2;
import org. apache. flink. util. Collector;
public class BatchWordCountJava {
public static void main ( String[ ] args) throws Exception{
String inputPath = "/Users/eric/Desktop/flink-train/FlinkTech/src/main/resources/data/input" ;
String outPath = "/Users/eric/Desktop/flink-train/FlinkTech/src/main/resources/data/result" ;
ExecutionEnvironment env = ExecutionEnvironment. getExecutionEnvironment ( ) ;
DataSource< String> text = env. readTextFile ( inputPath) ;
DataSet< Tuple2< String, Integer> > counts = text. flatMap ( new Tokenizer ( ) ) . groupBy ( 0 ) . sum ( 1 ) ;
counts. writeAsCsv ( outPath, "\n" , " " ) . setParallelism ( 1 ) ;
env. execute ( "batch word count" ) ;
}
public static class Tokenizer implements FlatMapFunction < String, Tuple2< String,
Integer> > {
public void flatMap ( String value, Collector< Tuple2< String, Integer> > out)
throws Exception {
String[ ] tokens = value. toLowerCase ( ) . split ( "\\W+" ) ;
for ( String token: tokens) {
if ( token. length ( ) > 0 ) {
out. collect ( new Tuple2 < String, Integer> ( token, 1 ) ) ;
}
}
}
}
}
package batch
import org. apache. flink. api. scala. ExecutionEnvironment
object BatchWordCountScala {
def main ( args: Array[ String] ) : Unit = {
val inputPath = "/Users/eric/Desktop/flink-train/FlinkTech/src/main/resources/data/input"
val outPut = "/Users/eric/Desktop/flink-train/FlinkTech/src/main/resources/data/result"
val env = ExecutionEnvironment. getExecutionEnvironment
val text = env. readTextFile ( inputPath)
import org. apache. flink. api. scala. _
val counts = text. flatMap ( _. toLowerCase. split ( "\\W+" ) )
. filter ( _. nonEmpty)
. map ( ( _, 1 ) )
. groupBy ( 0 )
. sum ( 1 )
counts. writeAsCsv ( outPut, "\n" , " " ) . setParallelism ( 1 )
env. execute ( "batch word count" )
}
}