版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/ygc123189/article/details/79088864
项目github地址:https://github.com/kocor01/spider_cloub/
Python版本为3.6
最近突然想玩玩云图,动手写了个简单的爬虫,搭建了简单的爬虫架构
爬虫爬取最近比较火的电影《芳华》分词后生成云图
使用了 jieba分词,云图用wordcloud生成
用了朋友的2B姿势的自拍照片简单的P了下(为了不暴露,P成全黑的),作为生成云图的底图模板
云图底图模板:
生成的云图效果:
爬虫基础框架
spider_main.py 爬虫入口
url_manager.py URL管理器
html_downloader.py 网页下载器
html_parser.py 数据提取器
html_outputer.py 数据处理器
word_cloud.py 云图生成器
extra_dict文件夹文件如下:
li.png 云图底图模板
simhei.ttf 生成云图的字体文件
str.txt 爬取的电影短评
stop_words.txt 分词排除的词
cut_str.txt jieba分词后文件
yun.png 最后生成的云图
代码如下:
spider_main.py 爬虫入口
#coding:utf-8
import url_manager,html_parser,html_outputer,html_downloader,word_cloud
class SpiderMain(object):
def __init__(self):
# URL管理器
self.urls = url_manager.UrlManager()
# 网页下载器
self.downloader = html_downloader.HtmlDownloader()
# 数据提取器
self.parser = html_parser.HtmlParser()
# 数据处理器
self.outputer = html_outputer.HtmlOutputer()
# 云图生成器
self.cloud = word_cloud.Wordcloud()
def craw(self, root_url):
count =1
# 爬虫入口URL
self.urls.add_new_url(root_url)
# 待爬取URL
wait_url = self.urls.has_new_url()
if wait_url is not None:
while wait_url:
try:
# 获取一个待爬取URL
new_url = self.urls.get_new_url()
print("carw %d : %s" % (count, new_url))
# 爬取页面
html_cont = self.downloader.download(new_url)
# 数据提取
new_url, new_datas = self.parser.parser(new_url, html_cont)
# 添加新待爬取URL
self.urls.add_new_url(new_url)
# 数据加工处理
self.outputer.collect_data(new_datas)
# 爬虫循环控制
if count == 10:
break
count = count + 1
except:
print("craw failed")
# 数据加工输出
self.outputer.process_data()
#print("finish")
# 分词
self.outputer.cut_str()
# 生成云图
self.cloud.make()
print("finish")
if __name__ == "__main__":
# 爬虫入口URL
root_url = "https://movie.douban.com/subject/26862829/comments?status=P"
obj_spider = SpiderMain()
# 启动爬虫
obj_spider.craw(root_url)
url_manager.py URL管理器
#coding:utf-8
class UrlManager(object):
def __init__(self):
self.new_urls = set()
self.old_urls = set()
def add_new_url(self, url):
if url is None:
return
if url not in self.new_urls and url not in self.old_urls:
self.new_urls.add(url)
def add_new_urls(self, urls):
if urls is None or len(urls) == 0:
return
for url in urls:
self.add_new_url(url)
def has_new_url(self):
return len(self.new_urls) != 0
def get_new_url(self):
new_url = self.new_urls.pop()
self.old_urls.add(new_url)
return new_url
html_downloader.py 网页下载器
#coding:utf-8
import urllib.request
class HtmlDownloader(object):
def download(self, url):
if url is None:
return None
request = urllib.request.Request(url)
request.add_header("user-agent", "Mozilla/5.0")
response = urllib.request.urlopen(url)
if response.getcode() != 200:
return None
return response.read()
html_parser.py 数据提取器
#coding:utf-8
import http.cookiejar
from bs4 import BeautifulSoup
import re
import urllib.parse
class HtmlParser(object):
def parser(self, page_url, content):
if page_url is None or content is None:
return
soup = BeautifulSoup(content, "html.parser", from_encoding='utf-8')
new_url = self._get_new_url(page_url, soup)
new_datas = self._get_new_datas(page_url, soup)
return new_url, new_datas
def _get_new_url(self, page_url, soup):
new_url = soup.find('div', id="paginator").find('a', class_="next").get('href')
new_full_url = urllib.parse.urljoin(page_url, new_url)
return new_full_url
def _get_new_datas(self, page_url, soup):
res_datas = set()
contents = soup.find_all('div', class_="comment-item")
for content in contents:
res_datas.add(content.find('div', class_="comment").find('p').get_text())
return res_datas
html_outputer.py 数据处理器
#coding:utf-8
import pymysql
import jieba.analyse
class HtmlOutputer(object):
def __init__(self):
self.datas = []
def collect_data(self, data):
res_datas = set()
if data is None:
return
for d in data:
self.datas.append(d)
def process_data(self):
#print(len(self.datas))
file_object = open('./extra_dict/str.txt', 'w',encoding='utf-8',errors='ignore')
data_str = ''
for data in self.datas:
#data_str += data
file_object.write(data)
#print(data_str)
file_object.close()
def cut_str(self):
content = open('./extra_dict/str.txt',encoding='utf-8',errors='ignore').read()
jieba.analyse.set_stop_words("./extra_dict/stop_words.txt")
tags = jieba.analyse.extract_tags(content, topK=1000,withWeight=True)
file_object = open('./extra_dict/cut_str.txt', 'w')
for v, n in tags:
#权重是小数,为了凑整,乘了一万
#print(v + '\t' + str(int(n * 10000)))
data_str = v + '\t' + str(int(n * 10000)) + '\n'
file_object.write(data_str)
file_object.close()
word_cloud.py 云图生成器
from os import path
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator
class Wordcloud(object):
def make(self):
d = path.dirname(__file__)
# Read the whole text.
text = open(path.join(d, './extra_dict/cut_str.txt')).read()
# read the mask / color image taken from
alice_coloring = np.array(Image.open(path.join(d, "./extra_dict/li.png")))
stopwords = set(STOPWORDS)
stopwords.add("said")
wc = WordCloud(font_path="./extra_dict/simhei.ttf",background_color="white", max_words=2000, mask=alice_coloring,
stopwords=stopwords, max_font_size=40, random_state=42)
# generate word cloud
wc.generate(text)
# create coloring from image
image_colors = ImageColorGenerator(alice_coloring)
# show
plt.imshow(wc, interpolation="bilinear")
plt.axis("off")
plt.figure()
# recolor wordcloud and show
# we could also give color_func=image_colors directly in the constructor
plt.imshow(wc.recolor(color_func=image_colors), interpolation="bilinear")
plt.axis("off")
plt.figure()
plt.imshow(alice_coloring, cmap=plt.cm.gray, interpolation="bilinear")
plt.axis("off")
wc.to_file(path.join(d, "./extra_dict/yun.png"))
plt.show()