Flink示例——Source
其他
2020-03-04 10:51:26
阅读次数: 0
Flink示例——Source
版本信息
产品
版本
Flink
1.7.2
Java
1.8.0_231
Scala
2.11.12
Mavan依赖
Collection/Elements Source 示例
代码 CollectionElementsSourceDemopublic class CollectionElementsSourceDemo {
public static void main ( String[ ] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment ( ) ;
ArrayList< String> list = new ArrayList < > (
Arrays. asList ( "name1" , "name2" , "name3" )
) ;
env. fromCollection ( list)
. print ( "stream" ) ;
try {
env. execute ( ) ;
} catch ( Exception e) {
e. printStackTrace ( ) ;
}
}
}
Text Source 示例
Socket Source 示例
测试建议
下载 netcat
启动命令 nc.exe -L -v -p 23333
启动Flink应用
在启动的nc程序中发送字符串
代码 SocketSourceDemopublic class SocketSourceDemo {
public static void main ( String[ ] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment ( ) ;
DataStreamSource< String> dataDS = env. socketTextStream ( "localhost" , 23333 ) ;
dataDS. print ( "stream" ) ;
try {
env. execute ( ) ;
} catch ( Exception e) {
e. printStackTrace ( ) ;
}
}
}
Kafka Source 示例
Maven导包 pom.xml< dependency>
< groupId> org.apache.flink</ groupId>
< artifactId> flink-connector-kafka-0.11_2.11</ artifactId>
< version> ${flink.version}</ version>
</ dependency>
代码 KafkaSourceDemopublic class KafkaSourceDemo {
public static void main ( String[ ] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment ( ) ;
FlinkKafkaConsumer011< String> kafkaConsumer011 = generateKafkaConsumer ( ) ;
DataStreamSource< String> kafkaDS = env. addSource ( kafkaConsumer011) ;
kafkaDS. print ( ) ;
try {
env. execute ( ) ;
} catch ( Exception e) {
e. printStackTrace ( ) ;
}
}
private static FlinkKafkaConsumer011< String> generateKafkaConsumer ( ) {
Properties props = new Properties ( ) ;
props. setProperty ( "bootstrap.servers" , "192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092" ) ;
props. setProperty ( "group.id" , "consumer-group-1" ) ;
props. setProperty ( "auto.offset" , "latest" ) ;
FlinkKafkaConsumer011< String> kafkaConsumer011 = new FlinkKafkaConsumer011 < > (
"topic_1" , new SimpleStringSchema ( ) , props
) ;
return kafkaConsumer011;
}
}
自定义 Source 示例
代码 CustomSourceFunctionpublic class CustomSourceFunction extends RichSourceFunction < Tuple2< String, Long> > {
private boolean flag = true ;
@Override
public void run ( SourceContext< Tuple2< String, Long> > ctx) throws Exception {
List< String> data = Arrays. asList ( "a" , "b" , "c" , "d" , "e" , "f" , "g" ) ;
Random random = new Random ( ) ;
while ( flag) {
Thread. sleep ( 100 ) ;
String key = data. get ( random. nextInt ( data. size ( ) ) ) ;
long value = System. currentTimeMillis ( ) ;
ctx. collect ( Tuple2. of ( key, value) ) ;
}
}
@Override
public void cancel ( ) {
flag = false ;
}
}
代码 CustomSourceDemopublic class CustomSourceDemo {
public static void main ( String[ ] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment ( ) ;
CustomSourceFunction sourceFunction = new CustomSourceFunction ( ) ;
DataStreamSource< Tuple2< String, Long> > customDS = env. addSource ( sourceFunction) ;
customDS. print ( ) ;
try {
env. execute ( ) ;
} catch ( Exception e) {
e. printStackTrace ( ) ;
}
}
}
发布了146 篇原创文章 ·
获赞 54 ·
访问量 17万+
转载自 blog.csdn.net/alionsss/article/details/104247693