互站网数据销量的获取,有需求老铁可以学习,废话不多说,直接上代码!
特别申明以下代码,仅仅学习参考!不可用于商业化!转载请声明出处!请尊重原创!
sql语句
DROP TABLE IF EXISTS `cmf_huzhan`;
CREATE TABLE `cmf_huzhan` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`price` varchar(255) NOT NULL DEFAULT '1' COMMENT '状态;1:显示;0:不显示',
`title` varchar(255) NOT NULL COMMENT '友情链接评级',
`num` varchar(255) NOT NULL DEFAULT '0' COMMENT '排序',
`url` varchar(255) NOT NULL DEFAULT '' COMMENT '链接地址',
`status` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '互站名称',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=56 DEFAULT CHARSET=utf8mb4 COMMENT='互站表';
import json
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging
import time
import requests
import pymysql
from functools import reduce
from lxml import etree
import urllib3
urllib3.disable_warnings()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class ORM():
def __init__(self, table):
self.__table__ = table
def insert_string(self):
mappings = dict()
fields = []
params = []
# 将属性存放到dict中
for k, v in self.__dict__.items():
if k[0:1] == '_':
continue
if (type(v) == str):
v = "'" + pymysql.escape_string(v) + "'"
mappings[k] = v
fields.append(k)
params.append(v)
# params.append(getattr(self,k,''))
# params.append(getattr(self,k,v.default))
sql = 'insert into {} ({}) values ({})'.format(self.__table__, self.join(fields), self.join(params))
# print('SQL:%s'%sql)
return sql
# join函数,可以处理数字等非字符串
def join(self, attrs, pattern=','):
return reduce(lambda x, y: '{}{}{}'.format(x, pattern, y), attrs)
class Model(ORM):
def __init__(self, table):
super().__init__(table)
try:
# 获取一个数据库连接,注意如果是UTF-8类型的,需要制定数据库
self._db = pymysql.connect(host='localhost', port=3306, user='root',
passwd='root',
db='py',
charset='UTF8')
# autocommit=True 是否自动提交
# 使用cursor()方法创建一个游标对象
self._cur = self._db.cursor()
# 使用execute()方法执行SQL查询
self._cur.execute('SELECT VERSION()')
data = self._cur.fetchone()
for d in data:
# 注意int类型需要使用str函数转义
print('database version: %s' % data)
except Exception:
print("发生异常")
def select(self, limit=2000000):
# 使用cursor()方法获取操作游标
cursor = self._db.cursor()
# SQL 查询语句
sql = "SELECT * FROM %s \
WHERE 1 limit %s" % (self.__table__, limit)
# 执行SQL语句
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchall()
# print(results)
data = []
for row in results:
data.append(row)
# 关闭数据库连接
self._db.close()
return data
def select_list(self):
# 使用cursor()方法获取操作游标
cursor = self._db.cursor()
# SQL 查询语句
sql = "SELECT * FROM %s \
WHERE 1 " % (self.__table__)
# 执行SQL语句
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchall()
# print(results)
data = []
for row in results:
data.append(row)
# 关闭数据库连接
self._db.close()
return data
def select_sale(self, store_name, days):
# 使用cursor()方法获取操作游标
cursor = self._db.cursor()
# SQL 查询语句
args = '%' + days + '%'
sql = "SELECT good_id,sale_num,update_time FROM %s \
WHERE 1 and good_id<>1 and sale_num<>0 and store_name='%s' and update_time like '%s'" % (
self.__table__, store_name, args)
# print(sql)
# 执行SQL语句
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchall()
# print(len(results))
if len(results) == 0:
rows = 0
pass
else:
rows = results
# 关闭数据库连接
# self._db.close()
return rows
def select_jd(self):
# 使用cursor()方法获取操作游标
cursor = self._db.cursor()
# SQL 查询语句
sql = "SELECT * FROM %s \
WHERE 1 and status =1 " % (self.__table__)
# print(sql)
# 执行SQL语句
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchall()
# print(results)
data = []
for row in results:
data.append(row)
# 关闭数据库连接
self._db.close()
# print(data)
return data
def tb_shop(self, goodid):
# 使用cursor()方法获取操作游标
cursor = self._db.cursor()
# SQL 查询语句
start_times = time.strftime('%Y-%m-%d', time.localtime(time.time()))
start_times = '%' + start_times + '%'
sql = "SELECT company FROM %s WHERE good_id='%s' " % (
self.__table__, goodid)
# 执行SQL语句
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchone()
# print(results)
if results == None:
row = 0
else:
row = results[0]
return row
def select_order(self):
# 使用cursor()方法获取操作游标
cursor = self._db.cursor()
# SQL 查询语句
sql = "SELECT * FROM `%s` where orderStatus='暂停' " % (self.__table__)
# print(sql)
# 执行SQL语句
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchall()
# print(results)
# for row in results:
# print(row)
# datas = row
# 关闭数据库连接
self._db.close()
return results
def execute(self, sql):
self._cur.execute(sql)
data = self._cur.fetchone()
def insert(self):
cursor = self._db.cursor()
sql = self.insert_string()
# print(sql)
# cursor.execute(sql)
try:
# 执行sql语句
cursor.execute(sql)
insert_id = cursor.lastrowid
# 执行sql语句
self._db.commit()
return insert_id
except Exception:
print("insert发生异常" + sql)
# 发生错误时回滚
self._db.rollback()
def url_list():
url = 'https://www.huzhan.com/code/key/%E5%B0%8F%E8%AF%B4/order/sales/page/2'
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3',
'Host': 'www.huzhan.com',
'Referer': 'https://www.huzhan.com/code/goods185419.html',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'
}
data = requests.get(url, headers=headers).text
html = etree.HTML(data)
# /html/body/div[3]/div[1]/div[3]/div/dl[3]/dd/p[2]/a
title = html.xpath('//*/dd/p[2]/a/@title')
price = html.xpath('//*/dd/p[1]/em/strong/text()')
href = html.xpath('//*/dd/p[2]/a/@href')
for pa in range(0, 24):
http = 'https://www.huzhan.com'
hrefs = http + href[pa]
time.sleep(2)
total_s(hrefs,title[pa],price[pa],hrefs)
# print(title[pa], price[pa], hrefs)
def total_s(Referer,title,price,hrefs):
try:
s = requests.session()
print(Referer)
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
s.mount('http://', adapter)
s.mount('https://', adapter)
s.keep_alive = False
logging.captureWarnings(True) # ssl
urllib3.disable_warnings() # ssl
url = 'https://www.huzhan.com/apage/'
urls = Referer
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Content-Length': '43',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Host': 'www.huzhan.com',
'Origin': 'https://www.huzhan.com',
'Referer': urls,
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36',
'X-Requested-With': 'XMLHttpRequest',
}
headerss = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3',
'Host': 'www.huzhan.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.12 Safari/537.36'
}
time.sleep(1)
da = requests.get(urls, headers=headerss).text
htmls = etree.HTML(da)
pro = htmls.xpath('//*/div[1]/span[1]/a/@id')[0]
# print(pro)
params = {
"list": "geva",
"pro": pro,
"good": "code",
"page": 0
}
start = time.time()
r = s.post("https://www.huzhan.com/apage/", data=params, headers=headers, verify=False).text
# eval 和json.loads 优先选择json.loads
html = json.loads(r)
print(str(title))
# print(title, price, hrefs, html['total'])
cmf_huzhan = Model('cmf_huzhan')
cmf_huzhan.title = str(title)
cmf_huzhan.num = html['total']
cmf_huzhan.price = price
cmf_huzhan.url = hrefs
good_ids = cmf_huzhan.insert()
# if (good_ids == None):
# raise Exception("插入异常 ", good_ids)
# print('完成')
#
end = time.time() - start
print(end)
except Exception:
pass
url_list()
效果如下
总结
需要特别注意!遇到问题就是post数据请求问题!
r = s.post("https://www.huzhan.com/apage/", data=params, headers=headers, verify=False).text