这里写目录标题
手动反爬虫,禁止转载: 原博地址 https://blog.csdn.net/lys_828/article/details/121304149(CSDN博主:Be_melting)
知识梳理不易,请尊重劳动成果,文章仅发布在CSDN网站上,在其他网站看到该博文均属于未经作者授权的恶意爬取信息
8 数据探索
第6部分简单进行了数据金字塔的倒数第二部分的内容的涉及,接下来进行倒数第三部分数据的探究。
8.1 数据初探
首先进行准备工作,创建step3文件夹,并添加example01.py文件,执行如下代码,进行航班和对应编号的查询,取出前三行数据进行查看,代码及输出结果如下。
8.2 数据分组汇总
上述的输出结果中左侧一列已经获取到了所有的唯一航班公司,右侧一列是对应的航班号,因此可以进行分组汇总,获取航班公司下所有的航班编号和航班总和。先进行一个测试代码输出,将获取到的数据转化为rdd的数据类型,然后进行拆分,输出结果如下。
因此就可以进一步进行分组的汇总计算出每个航空公司的航班总和和具体有哪里航班号,其中有一个注意事项,就是要统计同一公司下面的航班号,需要有一个容器进行放置,所以需要对于第二个元素添加列表操作,完整代码如下。
rdd1 = total_flights_by_month.rdd\
.map (lambda x : (x[0],[x[1]])) \
.reduceByKey(lambda a ,b : a+ b) \
.map(lambda tuple:
{
'Carrier': tuple[0],
'FleetCount': len(tuple[1]),
'TailNumbers': sorted(
filter(
lambda x: x is not None and x != '', tuple[1] # empty string tail numbers were getting through
)
)
}
)
print(rdd1.take(3))
程序运行结果如下。(比如UA美国航空公司,下面一共有722艘航班,具体的航班编号都在列表中进行存放)
将获得的数据存放到Mongo DB中,执行代码如下。
import pymongo_spark
pymongo_spark.activate()
rdd1.saveToMongoDB(
'mongodb://localhost:27017/example.airplanes_per_carrier'
)
程序正常运行后,在Compass界面刷新后就可以看到保存的数据。
8.3 利用Flask进行数据展示
将step2中的web文件夹下的所有文件(包含文件夹)全部copy到新建的step3文件夹下面,然后创建8.0版本on_time08.py,在最后面添加如下代码。
@app.route("/airlines/<carrier_code>")
def airline(carrier_code):
airline_airplanes = client.example.airplanes_per_carrier.find_one(
{
'Carrier' : carrier_code}
)
# print (airline_airplanes)
return render_template('airlines.html',
airline_airplanes=airline_airplanes,
carrier_code=carrier_code)
最后的return是airlines.html渲染页面,就需要继续配置该文件,详细的代码内容如下。
{% extends "index.html" %}
{% block body2 %}
<!-- Navigation guide -->
/ <a href="/airlines">航空公司</a> / <a href="/airlines/{
{carrier_code}}">{
{carrier_code}}</a>
<!-- Summary -->
<p class="lead">航空公司 {
{ carrier_code }}</p>
<h4>搜索到: {
{airline_airplanes.FleetCount}} 架飞机</h4>
<ul class="nav nav-pills">
{% for tail_number in airline_airplanes.TailNumbers -%}
<li class="button">
<a href="/airplane/flights/{
{tail_number}}">{
{tail_number}}</a>
</li>
{% endfor -%}
</ul>
{% endblock %}
文件修改完毕后,运行on_time08.py,在浏览器的网路地址栏输入:http://localhost:5000/airlines/VX,回车后输出的内容如下。
点击飞机的机尾编号,会出现对应的航班信息,比如点击第一个N281VA编号,输出的结果如下。
自此,飞机与航班的信息就完成了匹配,但是还缺少一环,就是航空公司与飞机之间的匹配。可以修改主页的信息,让其显示美国航空公司的情况,通过点击航空公司,进一步到达飞机信息的页面,从而完成数据的衔接。创建一个9.0版本on_time09.py,具体的代码如下。(需要把原来主页的设置给注释掉,然后重新进行编写代码)
# @app.route('/')
# def hello_world():
# res = db['on_time']
# print(list(res.find({})))
# return bson.json_util.dumps(list(res.find({})))
@app.route("/")
@app.route("/airlines")
@app.route("/airlines/")
def airlines():
airlines = client.example.airplanes_per_carrier.find()
return render_template('all_airlines.html',airlines=airlines)
@app.route("/airlines/<carrier_code>")
def airline(carrier_code):
airline_summary = client.example.airlines.find_one(
{
'CarrierCode': carrier_code}
)
airline_airplanes = client.example.airplanes_per_carrier.find_one(
{
'Carrier' : carrier_code}
)
# print (airline_airplanes)
print (airline_summary)
return render_template('airlines2.html',
airline_summary=airline_summary,
airline_airplanes=airline_airplanes,
carrier_code=carrier_code)
airlines函数中返回了all_airlines.html渲染页面,故需要配置该文件,其中的详细代码如下。
{% extends "index.html" %}
{% block body2 %}
<!-- Navigation guide -->
/ <a href="/airplanes">飞机</a>
<p class="lead" style="margin: 10px; margin-left: 0px;">
<!-- Airline Name and website-->
商用飞机
</p>
<!-- Chart of fleet manufacturers -->
<div>
<p style="margin: 0px;">制造商所制造的飞机</p>
<div id="chart"><svg class="chart"></svg></div>
</div>
<script src="/static/airplanes.js"></script>
<!-- Generate form from search_config and request args -->
<form action="/airplanes" method="get">
{% for item in search_config %}
{% if 'label' in item %}
<label for="{
{item['field']}}">{
{item['colname']}}</label>
{% else %}
<label for="{
{item['field']}}">{
{item['colname']}}</label>
{% endif %}
<input name="{
{item['field']}}" style="width: 36px; margin-right: 10px;" value="{
{args[item['field']] if args[item['field']] else ''}}"></input>
{% endfor %}
<button type="submit" class="btn btn-xs btn-default" style="height: 25px">提交查询</button>
</form>
<table class="table table-condensed table-striped">
<!-- Create table header, based on search_config -->
<thead>
{% for item in search_config %}
{% if 'label' in item %}
<th>{
{item['colname']}}</th>
{% else %}
<th>{
{item['colname']}}</th>
{% endif %}
{% endfor %}
</thead>
<!-- Create table content, based on airplanes for each <tr> and search_config for each <td> -->
<tbody>
{% for airplane in airplanes %}
<tr>
{% for item in search_config %}
<td>{
{airplane[item['field']]}}</td>
{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
<!-- css for x axis in chart -->
<style>
.axis text {
font: 8px sans-serif;
}
.axis path,
.axis line {
fill: none;
stroke: #000;
shape-rendering: crispEdges;
}
.bar {
fill: #ff6600;
}
</style>
{% import "macros.jnj" as common %}
{% if nav_offsets and nav_path -%}
{
{ common.display_nav(nav_offsets, nav_path, airplane_count)|safe }}
{% endif -%}
{% endblock %}
最终两个文件配置完毕后,运行on_time09.py文件,点击输出的连接,会跳出浏览器页面,输出结果如下。
任意点其中的一个航空公司,比如AA,就会跳转到AA下面的飞机界面,如下。(实现了航空公司与飞机之间的连接,前面已经完成了飞机与航班的之间的连接)
8.4 利用爬虫获取补充数据
7.4部分已经通过爬虫获取到 飞机对应的信息,这里是对航空公司的信息进行爬虫获取额外的补充信息。具体的思路就是首先通过航空公司的简写符号去匹配官网下载的airlines.csv中的航空公司的全称,比如AA匹配的就是American Airlines,进一步利用维基百科获取航空公司的详细介绍。
(1)通过航空公司简写符号匹配公司全称。新建example02.py,创建启动环境后从航班数据中获取航空公司的简写字段的信息,代码如下,输出结果就是上面首页中显示的14个航空公司的简写符号。。
from SparkReady import start_spark
import time
import pymongo_spark
spark = start_spark('C05S02',12,'12g')
pymongo_spark.activate()
spark.conf.set("spark.sql.shuffle.partitions",10)
on_time_dataframe = spark.read.parquet('../data/on_time.parquet')
on_time_dataframe.registerTempTable("on_time_performance")
carrier_codes = spark.sql(
"SELECT DISTINCT Carrier FROM on_time_performance"
)
carrier_codes.show()
(2)获取airlines.csv文件中的数据。首先需要根据网站上字段的介绍,进行各字段数据类型的设置,才能再读取文件中的内容,官网上的数据类型介绍如下。
接着就是根据提示的数据类型进行字段结构的设置,设置完毕后进行csv文件数据的读取,代码如下。
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.types import StructType, StructField
schema = StructType([
StructField("ID", IntegerType(), True), # "ArrDelay":5.0
StructField("Name", StringType(), True), # "CRSArrTime":"2015-12-31T03:20:00.000-08:00"
StructField("Alias", StringType(), True), # "CRSDepTime":"2015-12-31T03:05:00.000-08:00"
StructField("IATA", StringType(), True), # "Carrier":"WN"
StructField("ICAO", StringType(), True), # "DayOfMonth":31
StructField("CallSign", StringType(), True), # "DayOfWeek":4
StructField("Country", StringType(), True), # "DayOfYear":365
StructField("Active", StringType(), True), # "DepDelay":14.0
])
airlines = spark.read.format('com.databricks.spark.csv')\
.options(header='false', nullValue='\\N')\
.schema(schema)\
.load('../data/airlines.csv')
airlines.show(5)
输出结果如下。(第一行中数据可以认为是噪音数据直接删除,也可以默认直接加载,后续会自动被剔除)
(3)根据航空公司的简写字段进行数据合并。CSV数据读入后需要先注册为表,然后才能进行SQL操作,进一步的就是两个表之间的连接操作,详细的代码如下。
airlines.registerTempTable("airlines")
airlines = spark.sql("SELECT Name, IATA AS CarrierCode from airlines")
our_airlines = carrier_codes.join(airlines, carrier_codes.Carrier == airlines.CarrierCode)
our_airlines = our_airlines.select('Name', 'CarrierCode')
our_airlines.show()
输出结果中就会输出航空公司的全称和对应的简写字符,输出结果如下。
最后就是将获得的数据保存到本地,直接执行如下代码。
our_airlines.repartition(1).write.mode("overwrite").json("../data/our_airlines.json")
(4)进行维基百科爬虫。获取到航空公司的全程后,可以借助维基百科的搜索引擎获取公司的详细介绍。需要先安装模块pip install wikipedia
import sys, os, re
from lib import utils
import wikipedia
from bs4 import BeautifulSoup
import tldextract
# Load our airlines...
our_airlines = utils.read_json_lines_file('../data/our_airlines.jsonl')
# Build a new list that includes wikipedia data
with_url = []
for airline in our_airlines:
print (airline['Name'])
# Get the wikipedia page for the airline name
wikipage = wikipedia.page(airline['Name'])
# Get the summary
summary = wikipage.summary
airline['summary'] = summary
# Get the HTML of the page
page = BeautifulSoup(wikipage.html())
# Task: get the logo from the right 'vcard' column
# 1) Get the vcard table
vcard_table = page.find_all('table', class_='vcard')[0]
# 2) The logo is always the first image inside this table
first_image = vcard_table.find_all('img')[0]
# 3) Set the url to the image
logo_url = 'http:' + first_image.get('src')
airline['logo_url'] = logo_url
# Task: Get the company website
# 1) Find the 'Website' table header
th = page.find_all('th', text='Website')[0]
# 2) find the parent tr element
tr = th.parent
# 3) find the a (link) tag within the tr
a = tr.find_all('a')[0]
# 4) finally get the href of the a tag
url = a.get('href')
airline['url'] = url
# Get the domain to display with the url
url_parts = tldextract.extract(url)
airline['domain'] = url_parts.domain + '.' + url_parts.suffix
with_url.append(airline)
utils.write_json_lines_file(with_url, '../data/our_airlines_with_wiki2.jsonl')
程序运行后,最终获取到的14个航空公司的详细信息如下。(summary后的内容就是航空公司的简介)
(5)进行飞机数据的组合并存盘。7.4部分还有一个遗留问题就是进行保存,这里有一个隐藏的问题点就是字段中有两个TailNum,在进行最终的导出时候需要去掉一个字段,详细的代码如下。
tail_num_plus_inquiry=tail_num_plus_inquiry.drop(faa_tail_number_inquiry["TailNum"])
tail_num_plus_inquiry.show(3)
tail_num_plus_inquiry.registerTempTable("tail_num_plus_inquiry")
airplanes = spark.sql("""SELECT
TailNum AS TailNum,
engine_manufacturer AS EngineManufacturer,
engine_model AS EngineModel,
manufacturer AS Manufacturer,
mfr_year AS ManufacturerYear,
model AS Model,
owner AS Owner,
owner_state AS OwnerState,
serial_number AS SerialNumber
FROM
tail_num_plus_inquiry""")
airplanes.repartition(1).write.mode("overwrite").json('../data/airplanes.json')
本可以直接将数据存入Mongo DB中,这里介绍通过手动直接上传Json文件到数据库中,解压保存在本地的文件,然后取出里面的数据文件另存为airplanes.jsonl,接着打开Compass界面,新建一个数据表,选择刚刚另存为的文件后默认为Json格式,直接进行IMPORT。
大概5s钟左右,数据就全部导入到airplanes数据表中,详细内容如下。
8.5 丰富网址主页信息
有了爬虫数据的补充,在进行数据展示的界面上既可以进一步丰富内容,比如增加航空公司的简介,飞机的详细信息的展示。
8.5.1 航空公司信息的补充展示
新建9.0版本的on_time09.py,由于航空公司的展示是在首页点击之后的,因此在最后面添加的内容也是基于主页之后,详细的代码如下。
@app.route("/")
@app.route("/airlines")
@app.route("/airlines/")
def airlines():
airlines = client.example.airplanes_per_carrier.find()
return render_template('all_airlines.html',airlines=airlines)
@app.route("/airlines/<carrier_code>")
def airline(carrier_code):
airline_summary = client.example.airlines.find_one(
{
'CarrierCode': carrier_code}
)
airline_airplanes = client.example.airplanes_per_carrier.find_one(
{
'Carrier' : carrier_code}
)
# print (airline_airplanes)
print (airline_summary)
return render_template('airlines2.html',
airline_summary=airline_summary,
airline_airplanes=airline_airplanes,
carrier_code=carrier_code)
返回指定了airlines2.html需要进行文件代码的设计,该文件中的全部代码内容如下。
{% extends "index.html" %}
{% block body2 %}
<!-- Navigation guide -->
/ <a href="/airlines">航空公司</a> / <a href="/airline/{
{carrier_code}}">{
{carrier_code}}</a>
<!-- Logo -->
<img src="{
{airline_summary.logo_url}}" style="float: right;"/>
<p class="lead">
<!-- Airline Name and website-->
{
{airline_summary.Name}}
/ <a href="{
{airline_summary.url}}">{
{airline_summary.domain}}</a>
</p>
<!-- Summary -->
<p style="text-align: justify;">{
{airline_summary.summary}}</p>
<h4>本公司一共有: {
{airline_airplanes.FleetCount}} 架飞机</h4>
<ul class="nav nav-pills">
{% for tail_number in airline_airplanes.TailNumbers -%}
<li class="button">
<a href="/airplane/flights/{
{tail_number}}">{
{tail_number}}</a>
</li>
{% endfor -%}
</ul>
{% endblock %}
然后运行on_time09.py文件后点击主页的任意一家航空公司,发现跳转页面后并没有出现想象中的公司简介,因为数据库中并没有完善后的airlines数据,因此需要新建一个airlines数据表,将扩充完数据的our_airlines_with_wiki2.jsonl文件进行导入,然后再次重启on_time09.py文件,重新点击任意一个公司的名称进入后就可以看到详细的简介了。(图片因为需要翻墙,国内可能无法加载)
8.5.2 飞机详细信息的补充展示
创建新文件example05.py,获取飞机制造商前十名的数据,具体代码如下。
from SparkReady import start_spark
import time
spark = start_spark('C05S05',12,'12g')
airplanes = spark.read.json('../data/airplanes.jsonl')
airplanes.registerTempTable("airplanes")
manufacturer_counts = spark.sql("""SELECT
Manufacturer,
COUNT(*) AS Total
FROM
airplanes
GROUP BY
Manufacturer
ORDER BY
Total DESC"""
)
manufacturer_counts.show(10)
输出结果如下。
进一步可以通过计算全部的数据量,从而得到各大生产商的飞机数量占比。第一步就是获取所有的飞机数据,另外这里可以扩展一下在Spark中如何将一个常量添加到一个字段中,涉及到crossJoin()方法的使用,具体代码如下。
total_airplanes = spark.sql(
"""SELECT
COUNT(*) AS OverallTotal
FROM airplanes"""
)
print("Total airplanes: {}".format(total_airplanes.collect()[0].OverallTotal))
mfr_with_totals = manufacturer_counts.crossJoin(total_airplanes)
mfr_with_totals.show(5)
输出结果如下。
下面通过两种方式计算占比,一种是rdd的处理方式,一种是SQL的处理方式,详细代码如下。
#rdd
mfr_with_totals = mfr_with_totals.rdd.map(
lambda x: {
'Manufacturer': x.Manufacturer,
'Total': x.Total,
'Percentage': round(
(
float(x.Total)/float(x.OverallTotal)
) * 100,
2
)
}
)
mfr_with_totals.toDF().show()
#SQL
relative_manufacturer_counts = spark.sql("""SELECT
Manufacturer,
COUNT(*) AS Total,
ROUND(
100 * (
COUNT(*)/(SELECT COUNT(*) FROM airplanes)
),
2
) AS PercentageTotal
FROM
airplanes
GROUP BY
Manufacturer
ORDER BY
Total DESC, Manufacturer
LIMIT 20"""
)
relative_manufacturer_counts.show()
输出结果如下。(左侧是通过rdd进行输出,右侧是通过SQL进行输出结果,两者结果是一致的)
最后将数据保存到Mongo DB中,需要先将数据转化为字典,然后再进行保存,详细代码如下。
relative_manufacturer_counts = relative_manufacturer_counts.rdd.map(lambda row: row.asDict())
grouped_manufacturer_counts = relative_manufacturer_counts.groupBy(lambda x: 1
import pymongo_spark
pymongo_spark.activate()
grouped_manufacturer_counts.saveToMongoDB(
'mongodb://localhost:27017/example.airplane_manufacturer_totals'
)
运行完成后,刷新Compass软件界面,然后就可以发现airplane_manufacturer_totals表已经存在数据库中了,输出如下。
数据保存完毕后,加上补充的数据,字段的数量又进行了增多,因此想要进行查询的搜索,可以将待查询的数据存放到ES数据库,新建一个文件example06.py,完成本地文件向ES数据库的导入,代码如下。
from SparkReady import start_spark
import time
spark = start_spark('C05S06',24,'8g')
airplanes = spark.read.json("../data/airplanes.jsonl")
airplanes.show()
airplanes.write.format("org.elasticsearch.spark.sql")\
.option("es.resource","example2/airplanes")\
.option("es.batch.size.entries","100")\
.mode("overwrite")\
.save()
程序运行后,打开ES数据库的可视化界面,可以发现数据已经成功导入。
新建10.0版本的on_time10.py,在最后添加搜索条件的设置,具体函数如下。(config.AIRPLANE_RECORDS_PER_PAGE = 10)
@app.route("/airplanes/")
def search_airplanes():
search_config = [
{
'field': 'TailNum', 'label': 'Tail Number','colname':'飞机尾号'},
{
'field': 'Owner', 'sort_order': 0,'colname':'所属公司'},
{
'field': 'OwnerState', 'label': 'Owner State','colname':'公司所属州'},
{
'field': 'Manufacturer', 'sort_order': 1,'colname':'生产商'},
{
'field': 'Model', 'sort_order': 2,'colname':'型号'},
{
'field': 'ManufacturerYear', 'label': 'MFR Year','colname':'生产年份'},
{
'field': 'SerialNumber', 'label': 'Serial Number','colname':'序列号'},
{
'field': 'EngineManufacturer', 'label': 'Engine MFR', 'sort_order': 3,'colname':'发动机制造商'},
{
'field': 'EngineModel', 'label': 'Engine Model', 'sort_order': 4,'colname':'发动机型号'}
]
# Pagination parameters
start = request.args.get('start') or 0
start = int(start)
end = request.args.get('end') or config.AIRPLANE_RECORDS_PER_PAGE
end = int(end)
# Navigation path and offset setup
nav_path = search_helpers.strip_place(request.url)
nav_offsets = search_helpers.get_navigation_offsets(
start,
end,
config.AIRPLANE_RECORDS_PER_PAGE
)
# Build the base of our elasticsearch query
query = {
'query': {
'bool': {
'must': []}
},
'from': start,
'size': config.AIRPLANE_RECORDS_PER_PAGE
}
arg_dict = {
}
for item in search_config:
field = item['field']
value = request.args.get(field)
arg_dict[field] = value
if value:
query['query']['bool']['must'].append({
'match': {
field: value}})
# Query elasticsearch, process to get records and count
results = elastic.search(query, index='example2')
print ('in es part')
print (results)
airplanes, airplane_count = search_helpers.process_search(results)
# Persist search parameters in the form template
return render_template(
'all_airplanes.html',
search_config=search_config,
args=arg_dict,
airplanes=airplanes,
airplane_count=airplane_count['value'],
nav_path=nav_path,
nav_offsets=nav_offsets,
)
import json
@app.route("/airplanes/chart/manufacturers.json")
def airplane_manufacturers_chart():
mfr_chart = client.example.airplane_manufacturer_totals.find_one()
return json.dumps(mfr_chart)
返回的是all_airplanes.html文件,该文件中配置的代码如下。
{% extends "index.html" %}
{% block body2 %}
<!-- Navigation guide -->
/ <a href="/airplanes">飞机</a>
<p class="lead" style="margin: 10px; margin-left: 0px;">
<!-- Airline Name and website-->
商用飞机
</p>
<!-- Chart of fleet manufacturers -->
<div>
<p style="margin: 0px;">制造商所制造的飞机</p>
<div id="chart"><svg class="chart"></svg></div>
</div>
<script src="/static/airplanes.js"></script>
<!-- Generate form from search_config and request args -->
<form action="/airplanes" method="get">
{% for item in search_config %}
{% if 'label' in item %}
<label for="{
{item['field']}}">{
{item['colname']}}</label>
{% else %}
<label for="{
{item['field']}}">{
{item['colname']}}</label>
{% endif %}
<input name="{
{item['field']}}" style="width: 36px; margin-right: 10px;" value="{
{args[item['field']] if args[item['field']] else ''}}"></input>
{% endfor %}
<button type="submit" class="btn btn-xs btn-default" style="height: 25px">提交查询</button>
</form>
<table class="table table-condensed table-striped">
<!-- Create table header, based on search_config -->
<thead>
{% for item in search_config %}
{% if 'label' in item %}
<th>{
{item['colname']}}</th>
{% else %}
<th>{
{item['colname']}}</th>
{% endif %}
{% endfor %}
</thead>
<!-- Create table content, based on airplanes for each <tr> and search_config for each <td> -->
<tbody>
{% for airplane in airplanes %}
<tr>
{% for item in search_config %}
<td>{
{airplane[item['field']]}}</td>
{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
<!-- css for x axis in chart -->
<style>
.axis text {
font: 8px sans-serif;
}
.axis path,
.axis line {
fill: none;
stroke: #000;
shape-rendering: crispEdges;
}
.bar {
fill: #ff6600;
}
</style>
{% import "macros.jnj" as common %}
{% if nav_offsets and nav_path -%}
{
{ common.display_nav(nav_offsets, nav_path, airplane_count)|safe }}
{% endif -%}
{% endblock %}
上面代码中又使用了airplanes.js文件,也需要进行配置,进行绘制图形,代码如下。
var margin = {
top: 20, right: 30, bottom: 30, left: 40},
width = 900 - margin.left - margin.right,
height = 300 - margin.top - margin.bottom;
var x = d3.scale.ordinal()
.rangeRoundBands([0, width], .1);
var y = d3.scale.linear()
.range([height, 0]);
var xAxis = d3.svg.axis()
.scale(x)
.orient("bottom")
.tickFormat(function(d) {
return truncate(d, 14);
});
var yAxis = d3.svg.axis()
.scale(y)
.orient("left");
var chart = d3.select(".chart")
.attr("width", width + margin.left + margin.right)
.attr("height", height + margin.top + margin.bottom)
.append("g")
.attr("transform", "translate(" + margin.left + "," + margin.top + ")");
d3.json("/airplanes/chart/manufacturers.json", function(error, data) {
var data = data.data;
x.domain(data.map(function(d) {
return d.Manufacturer; }));
y.domain([0, d3.max(data, function(d) {
return d.Total; })]);
chart.append("g")
.attr("class", "x axis")
.attr("transform", "translate(0," + height + ")")
.call(xAxis);
chart.append("g")
.attr("class", "y axis")
.call(yAxis);
chart.selectAll(".bar")
.data(data)
.enter().append("rect")
.attr("class", "bar")
.attr("x", function(d) {
return x(d.Manufacturer); })
.attr("y", function(d) {
return y(d.Total); })
.attr("height", function(d) {
return height - y(d.Total); })
.attr("width", x.rangeBand());
});
function truncate(d, l) {
if(d.length > l)
return d.substring(0,l)+'...';
else
return d;
}
三个文件修改完毕,保存运行后,在浏览器网址栏输入:http://localhost:5000/airplanes,回车后输出如下内容。(很完美输出了想要的效果)
最后一个问题,就是要解决航空公司多表达方式的问题,比如柱状图中第5/7/8均是同一家公司,而采用不同的称号,所以在进行可视化之前应该将此类数据进行合并处理。
新建一个文件example07.py文件,进行生厂商的全部信息的输出,代码如下。
from SparkReady import start_spark
spark = start_spark('C05S07',24,'8g')
airplanes = spark.read.json('../data/airplanes.jsonl')
airplanes.registerTempTable("airplanes")
manufacturer_variety = spark.sql(
"""SELECT
DISTINCT(Manufacturer) AS Manufacturer
FROM
airplanes
ORDER BY
Manufacturer"""
)
manufacturer_variety_local = manufacturer_variety.collect()
for mfr in manufacturer_variety_local:
print(mfr.Manufacturer)
输出结果如下。(只进行部分输出结果的截图,可以发现并不只有前面提到的5/7/8公司有此类现象,其余的公司也有)
解决问题的方式是通过两个函数,第一个函数是进行长度的计算比较,第二个函数就是进行单词的切分获得比对数据的依据
def longest_common_beginning(s1, s2):
if s1 == s2:
return s1
min_length = min(len(s1), len(s2))
i = 0
while i < min_length:
if s1[i] == s2[i]:
i += 1
else:
break
return s1[0:i]
# Compare two manufacturers, returning a tuple describing the result
def compare_manufacturers(mfrs):
mfr1 = mfrs[0]
mfr2 = mfrs[1]
lcb = longest_common_beginning(mfr1, mfr2)
lcb = lcb.strip() # remove extra spaces
len_lcb = len(lcb)
record = {
'mfr1': mfr1,
'mfr2': mfr2,
'lcb': lcb,
'len_lcb': len_lcb,
'eq': mfr1 == mfr2
}
return record
然后就是获得两列的数据,调用上面的两个函数,假定前5个单词重复就认为是同一个公司,否则就是不同的公司,最后就是把数据进行合并去除重复后转化为DF数据类型。
comparison_pairs = manufacturer_variety.crossJoin(manufacturer_variety)
comparisons = comparison_pairs.rdd.map(compare_manufacturers)
matches = comparisons.filter(lambda f: f['eq'] == False and f['len_lcb'] > 5)
common_lcbs = matches.groupBy(lambda x: x['lcb'])
mfr1_map = common_lcbs.map(lambda x: [(y['mfr1'], x[0]) for y in x[1]]).flatMap(lambda x: x)
mfr2_map = common_lcbs.map(lambda x: [(y['mfr2'], x[0]) for y in x[1]]).flatMap(lambda x: x)
map_with_dupes = mfr1_map.union(mfr2_map)
mfr_dedupe_mapping = map_with_dupes.distinct()
mapping_dataframe = mfr_dedupe_mapping.toDF()
数据清洗完毕后注册为表,方便后续的SQL操作,然后再把数据拼接到原来的大数据中,最后保存到本地。
mapping_dataframe.registerTempTable("mapping_dataframe")
mapping_dataframe = spark.sql(
"SELECT _1 AS Raw, _2 AS NewManufacturer FROM mapping_dataframe"
)
airplanes_w_mapping = airplanes.join(
mapping_dataframe,
on=airplanes.Manufacturer == mapping_dataframe.Raw,
how='left_outer'
)
airplanes_w_mapping.registerTempTable("airplanes_w_mapping")
resolved_airplanes = spark.sql("""SELECT
TailNum,
SerialNumber,
Owner,
OwnerState,
IF(NewManufacturer IS NOT null,NewManufacturer,Manufacturer) AS Manufacturer,
Model,
ManufacturerYear,
EngineManufacturer,
EngineModel
FROM
airplanes_w_mapping""")
resolved_airplanes.repartition(1).write.mode("overwrite").json("../data/resolved_airplanes.json")
数据生成好后,可以将example05.py文件整体进行复制,修改为文件名为example08.py,然后文件中只需要修改两处,第一个就是文件读取的是清洗过后的数据,最后存放在数据库里面的表名称可以修改以表示区别,代码如下。
airplanes = spark.read.json('../data/resolved_airplanes.jsonl')
grouped_manufacturer_counts.saveToMongoDB(
'mongodb://localhost:27017/example.airplane_manufacturer_totals2'
)
example08.py文件运行成功后,可以在Compass软件上查看到生产的表数据,结果如下。(注意此次的第5条记录中,公司计数的总值为233,即为之前三个公司合并后的总和)
数据成功清洗完毕入库后,就可以更新最后一版11.0的on_time11.py文件,只需要讲10.0版本的数据来源的地址更改为现在的地址:client.example.airplane_manufacturer_totals2。运行on_time11.py文件,刷新网址,输出的页面内容如下。