多线程:
import threading
import time
#进程是多个资源的集合。
#线程是就是进程里面具体干活的。
#线程和线程之间是互相独立的。
def down_load():
time.sleep(5)
print("运行完了")
def movie():
time.sleep(1)
print('movie')
# threading.Thread(target=down_load,args=('name','abfd'))
start_time = time.time()
# for i in range(10):
# t = threading.Thread(target=down_load)
# t.start()
# t.join()
# thread_list = []
# for i in range(5):
# t = threading.Thread(target=movie)
# t.start()
# thread_list.append(t)
#
# print('thread_list',thread_list)
#
# for thread in thread_list:
# thread.join() #主线程等待子线程结束
for i in range(5):
t = threading.Thread(target=movie)
t.start()
while threading.activeCount()!=1:
pass
print(threading.activeCount()) #查看当前线程数
print(threading.current_thread())#查看当前线程
end_time = time.time()
print(end_time - start_time)
多进程
import multiprocessing,time
def down_load():
time.sleep(5)
print("运行完了")
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=down_load)
p.start()
while len(multiprocessing.active_children())!=0:#等待子进程结束
pass
print(multiprocessing.current_process())
print('end')
线程池:
import threadpool
import requests,time,threading
from hashlib import md5
def down_load_pic(url):
print(threading.current_thread())
req = requests.get(url)
m = md5(url.encode())
with open( m.hexdigest()+'.png','wb') as fw:
fw.write(req.content)
url_list = ['http://www.nnzhp.cn/wp-content/uploads/2019/10/f410afea8b23fa401505a1449a41a133.png',
'http://www.nnzhp.cn/wp-content/uploads/2019/11/481b5135e75c764b32b224c5650a8df5.png',
'http://www.nnzhp.cn/wp-content/uploads/2019/11/b23755cdea210cfec903333c5cce6895.png',
'http://www.nnzhp.cn/wp-content/uploads/2019/11/542824dde1dbd29ec61ad5ea867ef245.png']
pool = threadpool.ThreadPool(20)#实例化一个线程池
reqs = threadpool.makeRequests(down_load_pic,url_list)#分配数据
[pool.putRequest(req) for req in reqs]
# for req in reqs:
# pool.putRequest(req)
print(threading.activeCount())
pool.wait() #等待
print('end')
守护线程:
#主线程结束,守护线程立马死掉。
import threading,time
def down_load():
time.sleep(5)
print("运行完了")
for i in range(10):
t = threading.Thread(target=down_load)
t.setDaemon(True) #设置子线程为守护线程
t.start()
print('over')
锁:
#多个线程操作同一个数据的时候,就得加锁
import threading
num = 0
lock = threading.Lock() #申请一把锁
def add():
global num
# lock.acquire()#加锁
# num+=1
# lock.release()#解锁 #死锁
with lock:#简写,用with也会帮你加锁,解锁
num+=1
for i in range(20):
t = threading.Thread(target=add,)
t.start()
while threading.activeCount() !=1:
pass
print(num)
异步任务:
import yagmail,threading
def send_mail():
smtp = yagmail.SMTP(host='smtp.163.com',
user='[email protected]',
password='houyafan123'
)
smtp.send(to='[email protected]',cc=['[email protected]','[email protected]'],subject='标题',
contents='正文',attachments=[r'/Users/nhy/PycharmProjects/mjz/day6/jsonpath模块.py']
)
def async_send_mail():
t = threading.Thread(target=send_mail)
t.start()
虚拟环境:
pip install virtualenv
e://virtual_envs #建个文件夹专门放虚拟环境的
cd e://virtual_envs
virtualenv py3 #干净的环境,没有第三方模块,只有pip
cd /User/virtual_envs/py3/bin #进入虚拟环境目录,linux/mac
cd e://virtual_envs//py3/Scripts #进入虚拟环境目录,windows
activate #Windows
source ./activate #linux/mac
deactivate 退出虚拟环境
搭建环境:
1、搭建测试环境
1、申请服务器
2、安装依赖的软件 jdk1.8、mysql、redis、tomcat等等
3、获取代码,修改配置文件,(编译、打包)
4、导入基础数据(建表、导入数据)
5、代码放到服务器上,启动
日常部署:
2、日常部署
1、拉取最新代码,修改配置文件,(编译、打包)
2、如果有变动的sql,执行
3、服务器上代码替换成最新的,重启
单元测试:
import unittest
def add(a,b):
return a+b
#python unittest
#java junit
#php phpunit
class AddTest(unittest.TestCase):
def test_normal(self):
result = add(1,1)
self.assertEqual(2,result)
def test_error(self):
result = add(1,1)
self.assertEqual(1,result,'结果计算错误')
unittest.main()
单元测试,生成报告:
import unittest
def add(a,b):
return a+b
import HTMLTestRunner
import BeautifulReport as bfr
class AddTest(unittest.TestCase):
@classmethod
def setUpClass(cls):#所有用例执行之前执行它
print('setUpClass')
@classmethod
def tearDownClass(cls):#所有用例执行之后执行它
print('tearDownClass')
def setUp(self):
print('setUp')
#每条用例执行之前都会执行它
def tearDown(self):
print('tearDown')
#每条用例执行之后都会执行它
def test_normal(self):
result = add(1,1)
self.assertEqual(2,result)
print('test_normal')
def test_error(self):
print('test_error')
result = add(1,1)
self.assertEqual(1,result,'结果计算错误')
# unittest.main() #这个是
#testcase
#testsuite #用例集合
#testrunner #运行测试用例
#testloader #查找测试用例
test_suite = unittest.makeSuite(AddTest)
report = bfr.BeautifulReport(test_suite)
report.report(filename='bf_report.html',description='bf测试报告',log_path='/Users/nhy/Desktop')
print(report.failure_count) #失败次数
print(report.success_count) #通过次数
#htmltestruner产生测试报告
# file = open('report.html','wb')
# runner = HTMLTestRunner.HTMLTestRunner(file,title='测试报告')
# runner.run(test_suite)
# file.close()
参数化:
import parameterized
import unittest,BeautifulReport
#数据驱动
#代码驱动
#关键字驱动
data = [
['admin','123456',True,'正常登录'],
['admin','1122',False,'冻结用户登录'],
['sdfsdf','1111',False,'黑名单用户登录']
]
data2 = [
['admin','123456',True],
['admin','1122',False],
['sdfsdf','1111',False]
]
def login(user,password):
if user=='admin' and password=='123456':
return True
return False
class LoginTest(unittest.TestCase):
@parameterized.parameterized.expand(data)
def test_login(self,user,password,expect,desc):
self._testMethodDoc = desc #自己指定
result = login(user,password)
self.assertEqual(expect,result)
@parameterized.parameterized.expand(data2)
def test_login2(self,user,password,expect):
'''登录'''
result = login(user,password)
self.assertEqual(expect,result)
bf = BeautifulReport.BeautifulReport(unittest.makeSuite(LoginTest))
bf.report(filename='11-17测试报告',description='接口测试报告')