最近公司在安排做性能测试,我从来没做过完整的性能测试,只是用jmeter做过一些接口并发,这次学习了一下locust,尝试用locust实现单接口的性能测试。ps:公司也没有说有过性能测试经验,所以就是自己摸着石头过河,写下来我做性能测试的过程,欢迎大家批评指正,让我能够进步。
一、分析性能测试的场景以及具体需求
1、跟领导沟通,得到某AA接口需要400的并发量
2、系统在线用户数要达到10W
3、没有要求做全链路压测
二、任务分解
目前看来,只有1,2是领导要求我们做的。
那么先针对任务1进行分解。AA接口需要大量基础数据的准备,因此,首先要做好基础数据的准备。方法:通过调用接口,进行接口跑业务流,到AA接口为止。然后对AA接口单独进行性能高并发的测试。
那么针对任务2进行分解。首先要确定一下哪些接口高频使用,对接口进行按权重测试。
本文主要针对任务1来讲解。对于任务分解,欢迎大家指点。
三、实操
直接上代码:
"""
@Desc: AA接口50并发
@Time: 2021/8/10 10:00
@Author: kally
@Version: 1.0
@FileName: locustfile.py
"""
# coding=utf-8
import logging
import os
import queue
import sys
import time
from locust.clients import ResponseContextManager
from models.dev_weigh import CoTpSiteProgress
from locust import between, task, HttpUser, TaskSet, User
import requests
import json
from gevent.lock import Semaphore
sem = Semaphore()
class Rendezvous:
target_user_count = 0
_current_user_count = 0
_sem = None
def __init__(self, user_count=1):
self.target_user_count = user_count
def _get_lock(self, name):
if self._sem is None:
self._sem = Semaphore()
self._sem.acquire()
logging.info(f"Rendezvous: acquire by {name} !!!!!!")
return self._sem
def wait(self, name=""):
self._current_user_count += 1
_sem = self._get_lock(name)
if self._current_user_count >= self.target_user_count:
self._current_user_count = 0
self._sem = None
time.sleep(5)
logging.info(f"Rendezvous: release by {name}!!!!!!")
_sem.release()
else:
logging.info(f"Rendezvous: wait by {name}")
_sem.wait()
class QuickstartUser(TaskSet):
id = 0
count = 0
Rendezvous = Rendezvous(400)
user_data_queue = queue.Queue()
driver = CoTpSiteProgress.select().where(
(CoTpSiteProgress.status == 1) & (CoTpSiteProgress.node_now == "020"))
# 遍历数据
for p in driver:
rid.append(p.rid)
user_data_queue.put_nowait((p.co_code, p.driver_id_card, p.rid))
print(len(user_data_queue.queue))
# print(rid)
def __init__(self, parent):
super().__init__(parent)
self.__class__.count += 1
self.id = self.__class__.count
@task
def task_2(self):
'''AAA'''
self.Rendezvous.wait(str(self.id))
try:
coCode, driver_id_card, rid = self.user_data_queue.get()
print(coCode, driver_id_card, rid)
except queue.Empty:
print('account data run out, test ended.')
sys.exit(0)
headers = {
"lhUserName": "admin",
"lhToken": "token",
"lhCoCode": coCode,
"lhSysCode": "******",
"lhWxToken": "1"
}
data = {
"coCode": coCode,
"driverIdCard": driver_id_card
}
logging.info(f"发送请求头 {headers} !!!!!!")
logging.info(f"发送body数据 {data} !!!!!!")
resp = self.client.request(
catch_response=True,
method="post",
url="/AAAAAAAAAA/AAAAAA/AA",
headers=headers,
json=data
)
logging.info(f"返回数据 {json.loads(resp.text)} !!!!!!")
with resp as r:
r: ResponseContextManager
if json.loads(r.text)['status'] == 0:
r.success()
print(r.text)
else:
r.failure(f"AA失败 :{r.status_code}")
class User(HttpUser):
tasks = [QuickstartUser]
wait_time = between(1, 2)
host = "http://****"
if __name__ == '__main__':
os.system(f'locust -f locustfile.py --host=http://*** -u 400 -r 100')
1、做了基础数据后,从数据库取得接口AA需要的入参数据,放入队列中,需要跑AA接口的时候,直接从队列获取。这里token没有写,是因为公司内网部署,不需要,帮我们节省了这部分的工作。
2、写了集合点的类,需要并发多少,直接输入相应的数值即可。
3、做好日志记录。在项目下,建立locust.conf文件,录入
host = ****
users = 5
spawn-rate = 1
logfile = locust.log
loglevel = INFO
跑完并发,可以直接从日志查看相应记录。
4、最后,在本机电脑录入localhost:8089
开始搞起来,图表就如同我封面所示,可以查看相应的数据
5、对于服务器监控,我公司暂时用K8S上面自有的插件进行资源的监控
四、写在最后
第一次搞性能测试,诸多不足,希望大家多提意见,一起进步。
有好的学习途径,也可以跟我说说。龇牙笑