D:\JAVA\bin\java.exe "-javaagent:D:\IDEA\IntelliJ IDEA 2018.1.6\lib\idea_rt.jar=61787:D:\IDEA\IntelliJ IDEA 2018.1.6\bin"-Dfile.encoding=UTF-8-classpath D:\JAVA\jre\lib\charsets.jar;D:\JAVA\jre\lib\deploy.jar;D:\JAVA\jre\lib\ext\access-bridge-64.jar;D:\JAVA\jre\lib\ext\cldrdata.jar;D:\JAVA\jre\lib\ext\dnsns.jar;D:\JAVA\jre\lib\ext\jaccess.jar;D:\JAVA\jre\lib\ext\jfxrt.jar;D:\JAVA\jre\lib\ext\localedata.jar;D:\JAVA\jre\lib\ext\nashorn.jar;D:\JAVA\jre\lib\ext\sunec.jar;D:\JAVA\jre\lib\ext\sunjce_provider.jar;D:\JAVA\jre\lib\ext\sunmscapi.jar;D:\JAVA\jre\lib\ext\sunpkcs11.jar;D:\JAVA\jre\lib\ext\zipfs.jar;D:\JAVA\jre\lib\javaws.jar;D:\JAVA\jre\lib\jce.jar;D:\JAVA\jre\lib\jfr.jar;D:\JAVA\jre\lib\jfxswt.jar;D:\JAVA\jre\lib\jsse.jar;D:\JAVA\jre\lib\management-agent.jar;D:\JAVA\jre\lib\plugin.jar;D:\JAVA\jre\lib\resources.jar;D:\JAVA\jre\lib\rt.jar;D:\new\flink\target\classes;D:\SCALA\lib\scala-actors-2.11.0.jar;D:\SCALA\lib\scala-actors-migration_2.11-1.1.0.jar;D:\SCALA\lib\scala-library.jar;D:\SCALA\lib\scala-parser-combinators_2.11-1.0.4.jar;D:\SCALA\lib\scala-reflect.jar;D:\SCALA\lib\scala-swing_2.11-1.0.2.jar;D:\SCALA\lib\scala-xml_2.11-1.0.4.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-java\1.9.1\flink-java-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-core\1.9.1\flink-core-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-annotations\1.9.1\flink-annotations-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-metrics-core\1.9.1\flink-metrics-core-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;E:\BIGDATATOOLS\maven\maven_repositories\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\objenesis\objenesis\2.1\objenesis-2.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-shaded-asm-6\6.2.1-7.0\flink-shaded-asm-6-6.2.1-7.0.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;E:\BIGDATATOOLS\maven\maven_repositories\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\force-shading\1.9.1\force-shading-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-streaming-java_2.11\1.9.1\flink-streaming-java_2.11-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-runtime_2.11\1.9.1\flink-runtime_2.11-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-queryable-state-client-java\1.9.1\flink-queryable-state-client-java-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-hadoop-fs\1.9.1\flink-hadoop-fs-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\commons-io\commons-io\2.4\commons-io-2.4.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-shaded-netty\4.1.32.Final-7.0\flink-shaded-netty-4.1.32.Final-7.0.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-shaded-jackson\2.9.8-7.0\flink-shaded-jackson-2.9.8-7.0.jar;E:\BIGDATATOOLS\maven\maven_repositories\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;E:\BIGDATATOOLS\maven\maven_repositories\com\typesafe\akka\akka-actor_2.11\2.5.21\akka-actor_2.11-2.5.21.jar;E:\BIGDATATOOLS\maven\maven_repositories\com\typesafe\config\1.3.3\config-1.3.3.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;E:\BIGDATATOOLS\maven\maven_repositories\com\typesafe\akka\akka-stream_2.11\2.5.21\akka-stream_2.11-2.5.21.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\BIGDATATOOLS\maven\maven_repositories\com\typesafe\ssl-config-core_2.11\0.3.7\ssl-config-core_2.11-0.3.7.jar;E:\BIGDATATOOLS\maven\maven_repositories\com\typesafe\akka\akka-protobuf_2.11\2.5.21\akka-protobuf_2.11-2.5.21.jar;E:\BIGDATATOOLS\maven\maven_repositories\com\typesafe\akka\akka-slf4j_2.11\2.5.21\akka-slf4j_2.11-2.5.21.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\clapper\grizzled-slf4j_2.11\1.3.2\grizzled-slf4j_2.11-1.3.2.jar;E:\BIGDATATOOLS\maven\maven_repositories\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;E:\BIGDATATOOLS\maven\maven_repositories\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;E:\BIGDATATOOLS\maven\maven_repositories\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-clients_2.11\1.9.1\flink-clients_2.11-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-optimizer_2.11\1.9.1\flink-optimizer_2.11-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-shaded-guava\18.0-7.0\flink-shaded-guava-18.0-7.0.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-connector-kafka_2.11\1.9.1\flink-connector-kafka_2.11-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-connector-kafka-base_2.11\1.9.1\flink-connector-kafka-base_2.11-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\kafka\kafka-clients\2.2.0\kafka-clients-2.2.0.jar;E:\BIGDATATOOLS\maven\maven_repositories\com\github\luben\zstd-jni\1.3.8-1\zstd-jni-1.3.8-1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\lz4\lz4-java\1.5.0\lz4-java-1.5.0.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\slf4j\slf4j-log4j12\1.7.7\slf4j-log4j12-1.7.7.jar;E:\BIGDATATOOLS\maven\maven_repositories\log4j\log4j\1.2.17\log4j-1.2.17.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-scala_2.11\1.9.1\flink-scala_2.11-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\apache\flink\flink-streaming-scala_2.11\1.9.1\flink-streaming-scala_2.11-1.9.1.jar;E:\BIGDATATOOLS\maven\maven_repositories\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar DistributedCaches.DistributedCacheApp
Exception in thread "main" java.lang.IllegalStateException: The runtime context has not been initialized.
at org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
at DistributedCaches.DistributedCacheApp$$anon$1.<init>(DistributedCacheApp.scala:22)
at DistributedCaches.DistributedCacheApp$.main(DistributedCacheApp.scala:20)
at DistributedCaches.DistributedCacheApp.main(DistributedCacheApp.scala)
Process finished with exit code 1片
原因 getRuntimeContext只能在Open方法中调用
package DistributedCaches
import java.io.File
import java.util
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
/*分布式文件缓存*/
object DistributedCacheApp {
def main(args: Array[String]): Unit ={
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val filePath ="F:\\data\\wordCount1\\wordCount.txt"
val fileName ="catchFileName"//step1:注册一个本地/HDFS文件
env.registerCachedFile(filePath,fileName)
val data: DataSet[String]= env.fromElements("a","b","c")
val result: DataSet[String]= data.map(newRichMapFunction[String, String]{//step2:获取分布式缓存文件
val cacheFile: File = getRuntimeContext.getDistributedCache().getFile(fileName)
val lines: util.List[String]= FileUtils.readLines(cacheFile)//此时 lines是java集合需要转换为scala集合import scala.collection.JavaConversions._
private val list: List[String]= lines.toList
for(elem <- list){println("cacheFileContent ---->"+ elem)}
override def map(value: String)={
value
}})//sink
result.print()
env.execute()}}
解决方案
package DistributedCaches
import java.io.File
import java.util
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
/*分布式文件缓存*/
object DistributedCacheApp {
def main(args: Array[String]): Unit ={
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val filePath ="F:\\data\\wordCount1\\wordCount.txt"
val fileName ="catchFileName"//step1:注册一个本地/HDFS文件
env.registerCachedFile(filePath,fileName)
val data: DataSet[String]= env.fromElements("a","b","c")
val result: DataSet[String]= data.map(newRichMapFunction[String, String]{
override def open(parameters: Configuration): Unit ={//step2:获取分布式缓存文件
val cacheFile: File = getRuntimeContext.getDistributedCache().getFile(fileName)
val lines: util.List[String]= FileUtils.readLines(cacheFile)//此时 lines是java集合需要转换为scala集合import scala.collection.JavaConversions._
val list: List[String]= lines.toList
for(elem <- list){println("cacheFileContent ---->"+ elem)}}
override def map(value: String)={
value
}})//sink
result.print()// 注意:调用了 执行方法 就必须要sink print方式的sink都不行 env.execute()/**
* cacheFileContent ---->java java java java java java
* cacheFileContent ---->scala scala scala scala scala
* a
* b
* c
*/}}