kafka+zookeeper
不提供spark安装,这里从kafka安装开始
首先下载kafka和zookeeper
brew install zookeeper
等它安装完毕,先进入zookeeper文件夹,往往在/usr/local/Cellar下,启动zookeeper:
cd /usr/local/Cellar/zookeeper/3.4.6_1/bin
zkServer start
如果启动kafka下的zookeeper我这里会报错,然后进入kafka,启动kafka:
bin/kafka-server-start.sh config/kafka.properties
启动后会一直在最后一个INFO 不会动了,其实这已经表明启动成功了,打开另一个终端,进入同样的kafka路径,新建一个topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看新建的topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
可以看到test,测试下kafka:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
直接输入几个字符如
hello world!
再新建一个终端,注意上面已经有两个终端了,这里打开第三个,进入同样的路径下,:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
就可以看到刚才输入的hello world了。
接下来介绍python的kafka+spark的使用。
python+kafka+spark
编写python实例代码:
# -*- coding: utf-8 -*-
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sconf=SparkConf()
sconf.set('spark.cores.max' , 2)
sc=SparkContext(appName="KafkaWordCount",conf=sconf)
ssc=StreamingContext(sc,3)
zookeeper="localhost:2181"
topic={"test":1}
groupid="test-consumer-group"
lines = KafkaUtils.createStream(ssc, zookeeper,groupid,topic)
lines1=lines.map(lambda x:x[1])
words=lines1.flatMap(lambda line:line.split(" "))
pairs=words.map(lambda word:(word,1))
wordcounts=pairs.reduceByKey(lambda x,y:x+y)
#wordcounts.saveAsTextFiles("/kafka")
wordcounts.pprint()
ssc.start()
ssc.awaitTermination()
提交python代码
spark-submit --jars /usr/local/Cellar/kafka/0.8.2.1/libexec/spark-streaming-kafka-assembly_2.11-2.2.0.jar new.py 2>
如果想在自己的编译器里编写python并选择python3,则在spark默认的python源需要改为python3,这样在spark-submit时就会选择python3,而不是默认的python2.
在环境变量里增加下列代码即可:
export PYSPARK_PYTHON=python3
且还需要pip一些包,pyspark、kafka等。
下面是以搜狗数据为实验的生产者代码
注意生产者代码可以直接在自己的编译器运行即可,不必提交:
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from kafka.producer import SimpleProducer
from kafka.client import KafkaClient
import json
import time
import sys
def main():
##测试生产模块
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
fileName1 = '/Volumes/DATA/BigData/bigdata_homework/sogou.500w.utf8'
fileName2 = '/Users/tcd-pc/Desktop/sogou.10k.utf8'
with open(fileName1,encoding='utf-8') as file:
a = 20111230000000
for fileline in file:
#print(int(fileline.split('\t')[0])//10000-a//10000)
if int(fileline.split('\t')[0])//10000%2==0:
a = int(fileline.split('\t')[0])
time.sleep(10)
try:
producer.send('test1',fileline.encode('utf-8'))
producer.flush()
except kafkaError as e:
print(e)
if __name__ == '__main__':
main()
下面是消费者的代码框架,具体分析的代码就不放了:
# -*- coding: utf-8 -*-
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import pyspark
import jieba
from jieba import analyse
import sys
import os
os.system('hdfs dfs -rm -r /tmp1/checkpoint/')
zookeeper="localhost:2181"
kafkatopic="test1"
groupid='test-consumer-group'
#创建上下文
def createContext():
sconf = SparkConf()
sconf.set('spark.cores.max', 8)
sc = SparkContext(appName="KafkaWordCount", conf=sconf)
ssc2 = StreamingContext(sc, 60)
ssc2.checkpoint("/tmp1/checkpoint/")
return ssc2
def main_main(ssc):
consumer = KafkaUtils.createStream(ssc, zookeeper, groupid, {kafkatopic: 1})
lines1 = consumer.map(lambda x: x[1])
words = lines1.map(lambda line: line.split("\t"))
#分割为 时间 用户ID 搜素内容 rank1 rank2 点击的网址
seqs = words.map(lambda word: [word[0], word[1],word[2],word[3], word[4],
word[5].split("//")[1].split("/")[0] if len(word[5].split("//"))>1 else 'nokey',
analyse.extract_tags(word[2] if len(word[2])>0 else 'nokey', topK=1, withWeight=False)
])
def main():
##测试消费模块
ssc = StreamingContext.getOrCreate("/tmp1/checkpoint/",createContext)
#调用测试
main_main(ssc)
ssc.start()
ssc.awaitTermination()
if __name__ == '__main__':
main()
用spark streaming流数据分析时,会遇到需要排序的情况,而spark并没有这样的方法直接可以用,如果需要可以参考本渣写的:
webs_time = seqs.map(lambda item: (item[5], 1)) \
.reduceByKey(lambda x, y: x + y) \
.map(lambda x: (1, [x[0], x[1]])).groupByKey() \
.mapValues(lambda x: sorted(x, reverse=True, key=lambda y: y[1])[:10])
用groupbykey让一个key对应所有这个key的数据,然后用mapValues对key下的所有数据进行排序。其中groupbykey之前用了map方法把需要的key提出来,其他的数据用list表示。
实验表明:这样的操作实在时非常慢,但还是可以排序的。。。。。。。