使用multiprocessing包
面向过程的代码如下
# 导入多进程开发模块
import multiprocessing,time
def main():
print("主进程程序,进程id:{},进程名字:{};".format(multiprocessing.current_process().pid,
multiprocessing.current_process().name));
if __name__ == '__main__':
main();
# 导入多进程开发模块
import multiprocessing,time
# 多进程处理函数
def worker(delay,count):
for num in range(count):
# 打印多进程的执行id和名字
print("第:{}次,进程id:{},进程名字:{};".format(num,multiprocessing.current_process().pid,multiprocessing.current_process().name));
time.sleep(delay);
def main():
for item in range(3):
# 循环3次,每次循环创建一个进程
# target = worker执行worker函数
# args=(0,10) 线程休眠0,执行10次
# name = "test进程:{}".format(item) 进程名字
process = multiprocessing.Process(target=worker,args=(0,10),name="test进程:{}".format(item));
# 执行进程启动
process.start();
if __name__ == '__main__':
main();
面向对象,需要继承类,重写run方法
import multiprocessing,time
class MyProcess(multiprocessing.Process):
def __init__(self,name,delay,count):
super().__init__(name = name);
self._delay = delay;
self._count = count;
def run(self):
for item in range(self._count):
print("第:{}次,进程id:{},进程名字:{};".format(item, multiprocessing.current_process().pid,
multiprocessing.current_process().name));
time.sleep(self._delay);
def main():
for item in range(3):
process = MyProcess("测试多进程",1,10);
process.start();
if __name__ == '__main__':
main();
测试进程的子进程强制执行
# 导入多进程开发模块
import multiprocessing,time
# 多进程处理函数
def send(msg):
time.sleep(5);
# 打印多进程的执行id和名字
print("进程id:{},进程名字:{};".format(multiprocessing.current_process().pid,multiprocessing.current_process().name));
def main():
process = multiprocessing.Process(target=send,args=("奥利给",),name="测试发送进程");
# 启动子进程
process.start();
# process.join(),代表子进程强制执行,不加的话会导致主进程先执行之后才会执行子进程
process.join();
print("进程id:{},进程名字:{};信息发送完毕".format(multiprocessing.current_process().pid,multiprocessing.current_process().name));
if __name__ == '__main__':
main();
强制进程中断
# 导入多进程开发模块
import multiprocessing,time
# 多进程处理函数
def send(msg):
time.sleep(10);
# 打印多进程的执行id和名字
print("进程id:{},进程名字:{};".format(multiprocessing.current_process().pid,multiprocessing.current_process().name));
def main():
process = multiprocessing.Process(target=send,args=("奥利给",),name="测试发送进程");
# 启动子进程
process.start();
time.sleep(2);
# 如果子线程还存在,进程中断
if process.is_alive():
process.terminate();
print("进程执行被中断 {}".format(process.name));
print("进程id:{},进程名字:{};信息发送完毕".format(multiprocessing.current_process().pid,multiprocessing.current_process().name));
if __name__ == '__main__':
main();
守护进程 随着其中一个进程的存在而存在
# 导入多进程开发模块
import multiprocessing,time
# 守护进程
def status():
item = 1;
while True:
print("进程id:{},进程名字:{},item = {};".format(
multiprocessing.current_process().pid,
multiprocessing.current_process().name,
item
));
item+=1;
time.sleep(1);
# 多进程处理函数
def work():
process = multiprocessing.Process(target=status, name="守护进程", daemon=True);
# 启动守护进程;
process.start();
# 进程工作10次
for item in range(10):
# 打印多进程的执行id和名字
print("进程id:{},进程名字:{},item = {};".format(
multiprocessing.current_process().pid,
multiprocessing.current_process().name,
item
));
time.sleep(2);
def main():
process = multiprocessing.Process(target=work,name="测试工作进程");
# 启动子进程
process.start();
if __name__ == '__main__':
main();
使用fork和os来进行子进程的创建,fork是Linux环境下子进程创建的一种方式,Windows里面可能无法创建完成
使用下面代码会报错
# 导入多进程开发模块
import multiprocessing,time,os
def chile():
print("chile 父进程id :{},子进程id:{}".format(os.getppid(),os.getpid()))
def main():
print("main进程id:{},进程名字:{};".format(
multiprocessing.current_process().pid,
multiprocessing.current_process().name
));
newpid = os.fork();
print("fock新的子进程id".format(newpid));
if newpid == 0:
chile();
else:
print("父进程执行,父进程id:{}".format(os.getpid()));
if __name__ == '__main__':
main();
psutil
获取全部进程信息
import psutil
def main():
# psutil.process_iter() 获取全部进程
for process in psutil.process_iter():
print("进程编号:{},进程名称:{},创建时间:{}".format(
process.pid,
process.name(),
process.create_time()
));
if __name__ == '__main__':
main();
关闭操作系统的exe程序
慎用,慎用
import psutil
def main():
# psutil.process_iter() 获取全部进程
for process in psutil.process_iter():
if process.name == 'notepad.exe':
# 进程强制中断
process.terminate();
print("目前有文档程序在运行,强制关闭");
if __name__ == '__main__':
main();
获取CPU使用情况
import psutil
def main():
print("物理CPU数量:{}".format(psutil.cpu_count(logical=False)))
print("逻辑CPU数量:{}".format(psutil.cpu_count(logical=True)))
print("用户CPU使用时间:{},系统CPU使用时间:{},CPU空闲时间:{}".format(
psutil.cpu_times().user,
psutil.cpu_times().system,
psutil.cpu_times().idle
));
for item in range(10):
print("CPU使用率监控:{}".format(psutil.cpu_percent(interval=1,percpu=True)))
if __name__ == '__main__':
main();
查看系统磁盘
import psutil
def main():
print("磁盘分区 获取全部磁盘信息: {}".format(psutil.disk_partitions()));
print("磁盘使用率 获取E磁盘使用率: {}".format(str(psutil.disk_usage("e:"))));
print("磁盘IO 获取磁盘IO使用率: {}".format(str(psutil.disk_io_counters())));
if __name__ == '__main__':
main();
查看本机网络连接,直接输出当前硬件中所有支持的网络设备以及各自的网络状态的信息获取;
扫描二维码关注公众号,回复:
13034550 查看本文章
import psutil
def main():
print("数据统计 网络数据交互信息:{}".format(str(psutil.net_io_counters())));
print("接口统计 网络接口信息:{}".format(psutil.net_if_addrs()));
print("接口状态 网络接口状态:{}".format(psutil.net_if_stats()));
if __name__ == '__main__':
main();
看到这里,可以理解为什么有人说
python是这个世界上最好的语言 和
人生苦短,我学python了
相比较其他语言需要实现一些功能的大量代码,python只需要短短一行,而且和系统的硬件之间进行交互也非常的容易,不得不感叹python对于运维和自动化的支持真的是太好了;