from urllib.request import Request,urlopen
from fake_useragent import UserAgent
import sqlite3
import requests
import re
import json
import pymongo
class DbUtils(object):
conn = None
cursor = None
@classmethod
def create_db_cursor(cls):
cls.conn = sqlite3.connect('db')
cls.cursor = cls.conn.cursor()
@classmethod
def close_db_cursor(cls):
cls.cursor.close()
cls.conn.close()
@classmethod
def insert(cls, ob):
try:
sql = 'insert table(id,name) values(?,?)'
cls.cursor.excutemany(sql, ob)
i = cls.conn.total_changes
cls.conn.commit()
except Exception as e:
print(e)
else:
if i:
return True
else:
return False
class Spider(object):
def __init__(self):
self.client = pymongo.MongoClient('localhost')
self.db = client['db']
self.ua = UserAgent()
self.base_url = 'https://www.qiushibaike.com/hot/page/1'
self.head = {
'User-Agent': self.ua.random
}
def get_html(self):
request = Request(self.base_url, headers=self.head)
response = urlopen(request)
# print(type(response)) # <class 'http.client.HTTPResponse'>
# print(type(response.read())) # <class 'bytes'>
# print(type(response.read().decode())) # <class 'str'>
html = response.read().decode('utf-8')
pattern = re.compile(r'', re.S)
res_list = re.findall(pattern, html)
print(type(res_list)) # <class 'list'> 没找到的情况下也是list 长度为0
print(type(res_list[0])) # <class 'tuple'>
for item in res_list:
ob = [item]
DbUtils.insert(ob)
def get_html_json_one(self):
response = requests.get(self.base_url,self.head)
print(type(response)) # <class 'requests.models.Response'>
print(type(response.text)) # <class 'str'> json字符串
dict_data = json.loads(response.text) # 字典 json字符串转化为字典
self.db['table'].insert_one(dict_data)
def get_html_json_two(self):
response = requests.get(self.base_url,self.head)
print(type(response)) # <class 'requests.models.Response'>
print(type(response.content.decode('utf-8'))) # <class 'str'>
pattern = re.compile(r'', re.S)
res_list = re.findall(pattern, response.content.decode('utf-8'))
print(type(res_list)) # <class 'list'> 没找到的情况下也是list 长度为0
print(type(res_list[0])) # <class 'tuple'>
DbUtils.close_db_cursor()
if __name__ == '__main__':
spider = Spider()
# spider.get_html()
spider.get_html_byjson_two()
DbUtils.close_db_cursor()
import requests
from urllib.request import urlretrieve
from hashlib import md5
import os
class Downloads(object):
def __init__(self):
self.url = 'http://img.zcool.cn/community/01b2295568a994000001271604da4f.jpg@1280w_1l_2o_100sh.jpg'
self.img_name = md5(self.url.encode('utf-8')).hexdigest()
self.save_path = 'd:\imgs\{}.jpg'.format(self.img_name)
def method_one(self):
urlretrieve(self.url, self.save_path)
def method_two(self):
print(type(requests.get(self.url).content)) # <class 'bytes'> 二进制文件
content = requests.get(self.url).content
f = open(self.save_path, 'wb')# wb 看下面图片
f.write(content)
# 关闭输出流会flush 这个flush不用我们程序员写
f.close()
downloads = Downloads()
# downloads.method_one()
downloads.method_two()