这次的例子是计算航空公司的平均延迟时间,并画图
直接上代码:
import csv
import matplotlib.pyplot as plt
import matplotlib as mpl
mpl.use("TkAgg") # Use TKAgg to show figures
from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"
fields = ('date','airline','flightnum','origin','dest','dep','dep_delay','arv','arv_delay','airtime','distance')
Flight = namedtuple('Flight', fields)
def parse(row):
# 将每一行解析为一个元祖
row[0] = datetime.strptime(row[0], DATE_FMT).date()
row[5] = datetime.strptime(row[5], TIME_FMT).time()
row[6] = float(row[6])
row[7] = datetime.strptime(row[7], TIME_FMT).time()
row[8] = float(row[8])
row[9] = float(row[9])
row[10] = float(row[10])
return Flight(*row[:11])
def split(line):
# 将一行内容作为文件给csv.reader
reader = csv.reader(StringIO(line))
return reader.next()
def plot(delays):
airlines = [d[0] for d in delays]
minutes = [d[1] for d in delays]
index = list(xrange(len(airlines)))
fig, axe = plt.subplots()
bars = axe.barh(index, minutes)
for idx, air, min in zip(index, airlines, minutes):
if min>0:
bars[idx].set_color('#d9230f')
axe.annotate("%0.0f min" % min, xy=(min+1, idx+0.5), va='center')
else:
bars[idx].set_color('#469408')
axe.annotate("%0.0f min" % min, xy=(min+1, idx+0.5), va='center')
ticks = plt.yticks([idx+0.5 for idx in index], airlines)
xt = plt.xticks()[0]
plt.xticks(xt,[' '] * len(xt))
plt.grid(axis='x', color = 'white', linestyle='-')
plt.title("Total Minutes Delayed per Airline")
plt.show()
def main(sc):
# 打开查询airline全程的文件,作为一个字典用于查询
airlines = dict(sc.textFile("flight/airlines.csv").map(split).collect())
# 将这个字典发送给所有worker节点
airline_lookup = sc.broadcast(airlines)
# 打开航班信息
flights = sc.textFile("flight/flights.csv").map(split).map(parse)
# 计算延误时间
delays = flights.map(lambda f:(airline_lookup.value[f.airline],add(f.dep_delay, f.arv_delay)))
# 对同个航空公司的延误时间相加
delays = delays.reduceByKey(add).collect()
# 根据延误时间排序
delays = sorted(delays, key=itemgetter(1))
for d in delays:
print "%0.0f minutes delayed\t%s" % (d[1], d[0])
# 画图
plot(delays)
if __name__=="__main__":
# spark配置
conf = SparkConf().setMaster("local[*]")
conf = conf.setAppName(APP_NAME)
sc = SparkContext(conf=conf)
main(sc)
将上面的代码保存为:app.py
然后确保要用的两个csv文件已经放到hdfs上对应的目录
$ spark-submit app.py
就可以运行了