压力测试:必须了解的限流策略

1、配置限流策略

http {
    
    
      #Nginx限流。语法:limit_req_zone  key  zone  rate      
      #参数说明 key: 定义需要限流的对象($binary_remote_addr表示基于客户端ip(remote_addr)进行限流,binary_表示压缩内存占用量)  
      #参数说明 zone: 定义共享内存区来存储访问信息(定义了一个大小为10M的内存区,用于存储IP地址访问信息。)
      #参数说明 rate: 用于设置每个IP的最大访问速率(rate=5r/s表示每秒处理5个请求,rate=30r/m表示每分钟处理30个,即每21个。)

      limit_req_zone $binary_remote_addr zone=myLimitS2:10m rate=2r/s; #内存区名为myLimitS2,每秒处理2个请求
      limit_req_zone $binary_remote_addr zone=myLimitS10:10m rate=10r/s; #内存区名为myLimitS10,每秒处理10个请求
      limit_req_zone $binary_remote_addr zone=myLimitS20:10m rate=20r/s; #内存区名为myLimitS20,每秒处理20个请求
      limit_req_zone $binary_remote_addr zone=myLimitS30:10m rate=30r/s; #内存区名为myLimitS30,每秒处理30个请求
      limit_req_zone $binary_remote_addr zone=myLimitS40:10m rate=40r/s; #内存区名为myLimitS40,每秒处理40个请求
      limit_req_zone $binary_remote_addr zone=myLimitS50:10m rate=50r/s; #内存区名为myLimitS50,每秒处理50个请求
  }

2、应用限流策略

2.1、延时

  server {
    
    
      location / {
    
    
          limit_req zone=myLimitS2 burst=25; 
          #设置漏桶并发容量burst=25,瞬间处理能力qps=rate=2,并发请求数<=25时都会按rate被排队处理;漏桶容量会以rate设置的速度释放(需要burst/qps=25/2=12.5);新请求会依次进入漏桶占用释放的容量并排队;超过漏桶容量的会直接返回limit_req_status
       #limit_req_status 503; #自定义返回状态 
    } 
  }

2.2、不延时

  server{
    
    
      location / {
    
    
          limit_req zone=myLimitS2 burst=40 nodelay;        
          #设置漏桶并发容量burst=25,瞬间处理能力qps=rate=2,并发请求数<=25时都会按rate被排队处理;漏桶容量会以rate设置的速度释放(需要burst/qps=25/2=12.5);新请求会依次进入漏桶占用释放的容量并排队;超过漏桶容量的会直接返回limit_req_status
          #limit_req_status 503; #自定义返回状态 
      }    
  }

注意:设定的burst与实际测量出来的burst可能有±5左右的偏差,可以忽略。

3、Python压力测试代码(本人python菜鸡,借鉴的忘了哪位大神的代码,感谢)

import datetime

  import json

  import requests

  import logging

  import threading

  import time

  import sys

  from time import sleep, ctime

  logging.basicConfig(

      level=logging.INFO,

      format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

  logger = logging.getLogger(__name__)

  reponse_time = []

  OK = []

  errorCount = []

  class runScript():

      def API(self, url, params):

          try:

              r = requests.get(url, params=params, timeout=10)

              #print(r.status_code)

              code = r.status_code

              if code != 200:

                  print(code)

                  errorCount.append[1]

              else:

                  js = json.dumps(r.json())

                  #print(r.json()) #json格式的响应数据

                  # print(r.elapsed.total_seconds()) 响应时间

                  #print("ooo" + js) #没有解码的响应数据

                  return [r.json(), r.elapsed.total_seconds(), js]

              #r.raise_for_status()  # 如果响应状态码不是 200,就主动抛出异常

          except requests.RequestException as e:

              print('failed.' + e)

              errorCount.append[1]

      def circulation(self, url, params):

          status = 0

          datas = "none."

          try:

              obj = self.API(url, params)

              if obj != None:

                  #print(obj)

                  reponse_time.append(obj[1])

                  datas = json.loads(obj[2])["Msg"]

                  status = json.loads(obj[2])["Code"]

                  OK.append(datas)

          except Exception as e:

              return

  def test(url, params):

      Restime = runScript()

      Restime.circulation(url, params)

  def main(num, url, params):

      print("↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓")

      start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %f')

      threads = []

      for i in range(num):

          t = threading.Thread(target=test, args=(url, params))

          threads.append(t)

      for t in range(num):

          threads[t].start()

          #time.sleep(0.0001)

      for j in range(num):

          threads[j].join()

      print("↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑")

      print("Starting at:", start_time)

      print("All done at:", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'))

      # print(OK)

      print('线程数:', len(threads))

      print('响应次数:', len(reponse_time))

      print('正常响应次数:', len(OK))

      print('错误次数:', len(errorCount))

      print('总响应最大时长:', (0 if len(reponse_time)==0 else max(reponse_time)))

      print('总响应最小时长:', (0 if len(reponse_time)==0 else min(reponse_time)))

      print('总响应时长:', (0 if len(reponse_time)==0 else sum(reponse_time)))

      print('平均响应时长:', (0 if len(reponse_time)==0 else (sum(reponse_time) / len(reponse_time))))

      # print('QPS(TPS)= 并发数/平均响应时间:',num  / (sum(reponse_time) / len(reponse_time)))

  if __name__ == '__main__':

      num = input('输入需要开启的线程数量:')

      url = 'http://192.168.11.35:8598/test.html'  # 地址

      #params = {'UserName': 'admin', 'UserPwd': '123456'}  # 参数

      main(int(num), url, params)

python测试结果预览:

 输入需要开启的线程数量:50
  Starting at: 2020-09-17 11:18:33 909979
  All done at: 2020-09-17 11:18:43 978149
  线程数: 50
  响应次数: 21
  正常响应次数: 21
  错误次数: 0
  总响应最大时长: 9.978622
  总响应最小时长: 0.946371
  总响应时长: 106.58336500000001
  平均响应时长: 5.075398333333334

我是个自动化测试人员,有自己的专业讨论交流群整理过许多自动化测试视频资源,在这个过程中帮到了我很多。如果你不想再体验一次自学时找不到资料,没人解答问题,坚持几天便放弃的感受的话,可以加入我们扣扣群【313782132 】,里面有各种软件测试资源和技术讨论。

确实软件测试是IT相关行业中最容易入门的学科~不需要开发人员烧脑的逻辑思维、不需要运维人员24小时的随时待命,需要的是细心认真的态度和IT相关知识点广度的了解,每个测试人员从入行到成为专业大牛的成长路线可划分为:软件测试、自动化测试、测试开发工程师 3个阶段。

等你来加入我们的软件测试交流群,里面有各种软件测试资料和技术交流。

猜你喜欢

转载自blog.csdn.net/weixin_50271247/article/details/109242117