说明
目标是每日爬取b站当日top100的视频并下载到本地,目标网址:传送门
实现
核心代码如下:
# !/usr/bin/env python
# -*-coding:utf-8-*-
import requests
import random
import time
import os
import sys
from ..utils import Mp4info
class DownloadVideo:
def __init__( self ):
self.api_url = 'http://api.vc.bilibili.com/board/v1/ranking/top?'
def _getJson(self, url, num):
headers = {
'User-Agent':
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36'
}
params = {
'page_size' : 10,
'next_offset': str(num),
'tag' : '今日热门',
'platform' : 'pc'
}
try:
html = requests.get(url, params = params, headers = headers, verify = False, timeout = 2)
return html.json()
except BaseException:
print('request error')
pass
def _download(self, url, path ):
start = time.time() # 开始时间
size = 0
headers = {
'User-Agent':
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36'
}
response = requests.get(url, headers = headers, stream = True, verify = False, timeout = 2) # stream属性必须带上
chunk_size = 1024 # 每次下载的数据大小
content_size = int(response.headers['content-length']) # 总大小
if response.status_code == 200:
print('[文件大小]:%0.2f MB' % (content_size / chunk_size / 1024)) # 换算单位
with open(path, 'wb') as file:
for data in response.iter_content(chunk_size = chunk_size):
file.write(data)
size += len(data) # 已下载的文件大小
def _dispatcher(self):
for i in range(10):
url = self.api_url
num = i * 10 + 1
html = self._getJson(url, num)
infos = html['data']['items']
for info in infos:
title = info['item']['description'] # 小视频的标题
print(title)
video_url = info['item']['video_playurl'] # 小视频的下载链接
try:
# 获得视频时长
file = Mp4info(video_url)
duration = file.get_duration()
print('duration', duration)
if duration > 60 * 60:
continue
except BaseException:
continue
# 为了防止有些视频没有提供下载链接的情况
try:
currentVideoPath = os.path.join(sys.path[0], 'bilibili_video') # 当前目录作为下载目录
# 创建文件夹存放下载的视频
if not os.path.exists(currentVideoPath):
os.makedirs(currentVideoPath)
self._download(video_url, path = '{}/{}.mp4'.format(currentVideoPath,title))
print('成功下载一个!')
except BaseException:
print('凉凉,下载失败')
pass
time.sleep(int(format(random.randint(2, 8)))) # 设置随机等待时间
def run(self):
self._dispatcher()
运行:
curl -d "task_id=12345" http://127.0.0.1:5000/spider/bilibili/addjob
结果: