背景
项目需要用SparkStreaming连接kafka,本以为很简单,没想到遇到不少麻烦
版本
scala版本2.10,kafka版本2.11.0-0.11.0.0,jdk1.8
pom依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
..............
<repositories>
<repository>
<id>Hortonworks</id>
<url>http://repo.hortonworks.com/content/repositories/releases/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
.............
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.50.Final</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.6</version>
</dependency>
</dependencies>
</project>
spark程序的基本框架
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("ActionConsumer")
.set("spark.serializer", KryoSerializer.class.getCanonicalName())
.registerKryoClasses(new Class[]{ConsumerRecord.class})
.set("spark.kryoserializer.buffer.max", "512m");
// 优雅的关闭,避免在处理数据时yarn kill导致kafka数据重复消费和数据丢失
conf.set("spark.streaming.stopGracefullyOnShutdown", "true");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
Set<String> topicsSet = Collections.singleton("test0");
//kafka相关参数,必要!缺了会报错
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", KafkaConfig.sBOOTSTRAP_SERVER);
kafkaParams.put("group.id", "group2");
kafkaParams.put("key.deserializer", StringDeserializer.class.getCanonicalName());
kafkaParams.put("value.deserializer", StringDeserializer.class.getCanonicalName());
kafkaParams.put("enable.auto.commit", true);
final JavaInputDStream<ConsumerRecord<Object, Object>> kafkaSteam =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
);
kafkaSteam.foreachRDD(new VoidFunction2<JavaRDD<ConsumerRecord<Object, Object>>,
Time>() {
public void call(JavaRDD<ConsumerRecord<Object, Object>> consumerRecordJavaRDD,
Time time) throws Exception {
if (consumerRecordJavaRDD.rdd().count() > 0) {
OffsetRange[] offsetRanges = ((HasOffsetRanges)consumerRecordJavaRDD
.rdd()).offsetRanges();
final long recordCount = consumerRecordJavaRDD.count();
List<ConsumerRecord<Object, Object>> records = consumerRecordJavaRDD.take((int) recordCount);
for (ConsumerRecord<Object, Object> record : records) {
// 获取kafka消息,处理业务逻辑
JSONObject obj = JSON.parseObject(record.value().toString());
System.out.println(obj);
}
((CanCommitOffsets)kafkaSteam.inputDStream()).commitAsync(offsetRanges);
}
}
});
ssc.start();
ssc.awaitTermination();
ssc.close();
遇到过的问题
1)、找不到类org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$RetrieveSparkAppConfig$
解决方法,pom中spark依赖改成2.10
2)、Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient
解决方法:之前我是在spark程序里连接远程的spark,也就是sparkConfig.setLocal()传入的远程地址。现在我直接把参数改成local[2],反正到时候也是要到服务器下运行jar包,和本地跑没什么区别
3)、找不到方法sparkjava.lang.NoSuchMethodError: javax.servlet.http.HttpServletResponse.getStatus()I
解决方法:按照上面提供的方式改pom依赖,问题就不知不觉解决了,不用导入额外的javax.servlet.http.HttpServletRequest包
4)、 requirement failed: No output operations registered, so nothing to execute
解决方法:流要进行行为操作,例如print和上面的foreachRDD
5)、JsonMappingException: Incompatible Jackson version: 2.7.8
jackson依赖冲突,复制上面pom依赖的相关内容
6)、send RPC to java.lang.AbstractMethodError: org.apache.spark.network.protocol.MessageWithHeader.touc
解决方法:按照我上面的方式配置:pom、master
7)、NoSuchMethodError: io.netty.buffer.PooledByteBufAllocator.metric()Lio/netty/buffer/PooledByteBufAllo
解决方法:nettyIO依赖冲突,复制上面pom依赖相的关内容
8)、ClassNotFoundException org.apache.spark.streaming.kafka010.KafkaRDDPartition
解决方法:降低scala的版本到2.10(以前我用的是2.11)
9)、object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord
这是要输出rdd相关内容时报的错
解决方法:像上面程序框架开头那样,给sparkConf设置Kyro序列化器
SparkConf conf = new SparkConf()
...
.set("spark.serializer", KryoSerializer.class.getCanonicalName())
.registerKryoClasses(new Class[]{ConsumerRecord.class})
...
10)、报错....To avoid this, increase spark.kryoserializer.buffer.max value...
解决方法:kyro缓存不够,像上面程序框架开头那样,给sparkConf设置Kyro的最大缓存
SparkConf conf = new SparkConf()
......
.set("spark.kryoserializer.buffer.max", "512m");
11)、 NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
解决方法:降低cala的版本到2.10(以前我用的是2.11)
12)、sparkStreaming的map方法不执行,打个断点也进不去,哪怕有数据
解决方法:像上面程序框架一样,改成foreachRDD遍历RDD,然后根据rdd的count()方法获取拿到的数据量,根据这个值take数据
13)、sparkStreaming丢失数据
解决方法:没有及时提交偏移量,按照上面代码的这部分提交偏移量
kafkaSteam.foreachRDD(new VoidFunction2<JavaRDD<ConsumerRecord<Object, Object>>,
Time>() {
public void call(JavaRDD<ConsumerRecord<Object, Object>> consumerRecordJavaRDD,
Time time) throws Exception {
if (consumerRecordJavaRDD.rdd().count() > 0) {
OffsetRange[] offsetRanges = ((HasOffsetRanges)consumerRecordJavaRDD
.rdd()).offsetRanges();
......
((CanCommitOffsets)kafkaSteam.inputDStream()).commitAsync(offsetRanges);
}
}
});
同时开启自动提交
kafkaParams.put("enable.auto.commit", true);
经过多次测试,发现streaming停止后kafka发数据,streaming启动后可以正常接续接收上次结束后的所有新数据,也就是上面两种设置不冲突。
结语
20200722记录:用scala2.10编译运行没问题后,我把scala版本换成了2.11,再次编译运行,竟然成功不出错了,真是神奇