版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013817676/article/details/81774543
本文实现kafka与Spark Streaming之间的通信,其中Kafka端producer实现使用Java,Spark Streaming端Consumer使用Python实现。
首先安装kafka与spark streaming环境,kafka测试连通测试参考上文,本文的实验环境都为本地单机版本。
Kafka
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class producer {
private final static String TOPIC = "data-message";
private final static String BOOTSTRAP_SERVER = "127.0.0.1:9092";
public static Producer<String,String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
// 实现自定义partition
public static int partition(long time){
if(time%2 == 0)
return 0;
else
return 1;
}
public static void runProducer() throws Exception{
final Producer<String,String> producer = createProducer();
long time = System.currentTimeMillis();
long curTime = time;
try{
while(true){
curTime = System.currentTimeMillis();
if(curTime-time == 10000){
final ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC, partition(curTime) ,"JP_"+curTime,"AUX|989|bid|276|"+curTime);
RecordMetadata metadata = producer.send(record).get();
long elapsedTime = System.currentTimeMillis() - time;
System.out.printf("sent record(key=%s value=%s) " +
"meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(),
metadata.offset(), elapsedTime);
curTime = time = System.currentTimeMillis();
}
}
} finally {
producer.flush();
producer.close();
}
}
public static void main(String[] args) throws Exception{
runProducer();
}
}
Spark Streaming实现了Spark Steaming两者通信方式,createStream和createDirectStream
import os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
import configparser
def startReceiver(config,topics,ssc):
#connect kafka
kafkaStreams = [KafkaUtils.createStream(ssc,config.get('oppo','zookeeper'),
config.get('oppo','consumer'),topics) for _ in range(int(config.get('oppo','numStreams')))]
uniStream = ssc.union(*kafkaStreams)
stream = uniStream.map(lambda x: x[0])
stream.pprint()
ssc.start()
ssc.awaitTermination()
def startDirect(config,topic,ssc):
brokerList = config.get('oppo','brokerList')
#connect kafka
kafkaStreams = KafkaUtils.createDirectStream(ssc,[config.get('oppo','topic')],
{"metadata.broker.list":brokerList})
stream = kafkaStreams.map(lambda x: x[1])
stream.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == '__main__':
config = configparser.SafeConfigParser()
config.read("properties.conf")
sc = SparkContext(appName=config.get('oppo', 'appName'))
sc.setLogLevel(config.get('oppo', 'logLevel'))
# create Streaming Context
# deal with internal 10 seconds
ssc = StreamingContext(sc, 10)
topic = config.get('oppo', 'topic')
topics = {topic: 0, topic: 1}
#startReceiver(config,topics,ssc)
startDirect(config,topic,ssc)
properties.conf配置文件
[oppo]
appName = SparkStreamingKafka
logLevel = WARN
topic = data-message
partitions = 2
zookeeper=127.0.0.1:2181
numStreams = 2
consumer = spark-streaming
brokerList=127.0.0.1:9092