3.3.4.1 Locust中的关联
方法一:采用方法返回值调用并传递,类似Loadrunner中Action中的传递.
Init: 无脚本
Action: 取出第1个请求的值
传递给第2个请求
End :无脚本
from locust import HttpLocust, TaskSet, task
def md5(str):
import hashlib
m = hashlib.md5()
m.update(str)
return m.hexdigest()
class UserBehavior(TaskSet):
# getOpenId = ''
# getAccessToken = ''
def on_start(self):
""" on_start is called when a locust start before any task is scheduled"""
pass
def login(self):
""" post login ,always the first step. """
postLoginBody = {"clientId":"xxxx",\
"consideringTheReusability":"true",
"password":md5('q123456'),\
"grantType":"password",\
"service":"authorize",\
"version":"1.0.0.150109-PRD",\
"clientSecret":"xx9D24A874B",\
"username":'[email protected]'}
response = self.client.post('/oauth?service=authorize&version=', postLoginBody)
rc = response.json()['data']
print 'request = %s, request body = %s, response =%s', ('/oauth?service=authorize&version=', postLoginBody, rc)
return rc
@task(1)
def searchDevice(self):
loginTag = self.login()
searchBody = {"clientId": "foscloud", 'openId' : loginTag['openId'], 'accessToken': loginTag['accessToken']}
response = self.client.get('/gateway?service=user_ipc_setting_v2_0.list&version=', \
params = searchBody)
print 'request = %s, request body = %s, response =%s', ('/gateway?service=user_ipc_setting_v2_0.list&version=', searchBody, response.content)
class ApiFosUser(HttpLocust):
task_set = UserBehavior
host = 'http://10.16.40.11:5902'
min_wait = 0
max_wait = 0
方法二:采用方法返回值调用并传递,类似Loadrunner中Init中的传递.
Init: 取出第1个请求的值
Action: 传递给第2个请求
End :无脚本
from locust import HttpLocust, TaskSet, task
def md5(str):
import hashlib
m = hashlib.md5()
m.update(str)
return m.hexdigest()
class UserBehavior(TaskSet):
# getOpenId = ''
# getAccessToken = ''
loginTag = {}
def on_start(self):
""" on_start is called when a locust start before any task is scheduled"""
self.loginTag = self.login()
def login(self):
""" post login ,always the first step. """
postLoginBody = {"clientId":"xxx",\
"consideringTheReusability":"true",
"password":md5('q123456'),\
"grantType":"password",\
"service":"authorize",\
"version":"1.0.0.150109-PRD",\
"clientSecret":"xx82A99D24A874B",\
"username":'[email protected]'}
response = self.client.post('/oauth?service=authorize&version=', postLoginBody)
rc = response.json()['data']
print 'request = %s, request body = %s, response =%s', ('/oauth?service=authorize&version=', postLoginBody, rc)
return rc
@task(1)
def searchDevice(self):
# loginTag = self.login()
searchBody = {"clientId": "foscloud", 'openId' : self.loginTag['openId'], 'accessToken': self.loginTag['accessToken']}
response = self.client.get('/gateway?service=user_ipc_setting_v2_0.list&version=', \
params = searchBody)
print 'request = %s, request body = %s, response =%s', ('/gateway?service=user_ipc_setting_v2_0.list&version=', searchBody, response.content)
class ApiFosUser(HttpLocust):
task_set = UserBehavior
host = 'http://10.16.40.11:5902'
min_wait = 0
max_wait = 0
方法一和方法二执行的效果和结果不同,是由于Init方法中的调用不同造成的,与Loadrunner的效果一致。