一、背景

这仍是2个月前做的一次接口功能测验,关于locust脚本的单机多核运转,以及主从节点之间的数据通信。

先简略告知下背景,在APP上线之前,需求对登录接口进行功能测验。经过评估,我仍是优先选择了locust来进行脚本开发,本次用到了locust的单机多核运转才能,只不过这儿还涉及到主从节点之间数据通信。现成的可参考的有效文档甚少,所以仍是自己摸着官方文档过河比较靠谱。

顺带提一下,学习结构这种东西最好的教程其实还得是官方文档以及结构源码了,这儿贴上locust官方文档链接,需求的能够自行学习:docs.locust.io/en/stable/w…

二、代码编写

其实脚本代码的编写一大重点便是如何处理测验数据,不同的测验需求对于测验数据的处理是不同的。比方这次的需求,手机号不能重复。别的考虑到长期的负载压力,数据量还得满足。

最后测验数据还需求处理,那么我运用的测验号段是非真实号码段,测验完毕后能够查询对应号段内的手机号,进行相关事务数据的清理。

1. 代码概览

仍是老样子,先附上全部代码,然后对其结构进行拆分讲解。

import random
import time
from collections import deque
from locust import HttpUser, task, run_single_user, TaskSet, events
from locust.runners import WorkerRunner, MasterRunner
CURRENT_TIMESTAMP = str(round(time.time() * 1000))
RANDOM = str(random.randint(10000000, 99999999))
MOBILE_HEADER = {
    "skip-request-expired": "true",
    "skip-auth": "true",
    "skip-sign": "true",
    "os": "IOS",
    "device-id": "198EA6A4677649018708B400F3DF69FB",
    "nonce": RANDOM,
    "sign": "12333",
    "version": "1.2.0",
    "timestamp": CURRENT_TIMESTAMP,
    "Content-Type": "application/json"
}
last_mobile = ""
worker_mobile_deque = deque()
# 13300120000, 13300160000 新用户注册号段
@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
    if not isinstance(environment.runner, WorkerRunner):
        mobile_list = []
        for i in range(13300120000, 13300160000):
            mobile_list.append(i)
        mobile_list_length = len(mobile_list)
        print("列表已生成,总计数量:", mobile_list_length)
        worker_count = environment.runner.worker_count
        chunk_size = int(mobile_list_length / worker_count)
        print(f"均匀每个worker分得的手机号数量:{chunk_size}")
        for i, worker in enumerate(environment.runner.clients):
            start_index = i * chunk_size
            if i   1 < worker_count:
                end_index = start_index   chunk_size
            else:
                end_index = len(mobile_list)
            data = mobile_list[start_index:end_index]
            environment.runner.send_message("mobile_list", data, worker)
def setup_mobile_list(environment, msg, **kwargs):
    len_msg_data = len(msg.data)
    print(f"worker收到的master传来的数据号段:{msg.data[0]} ~ {msg.data[len_msg_data-1]}")
    global worker_mobile_deque
    worker_mobile_deque = deque(msg.data)
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
    if not isinstance(environment.runner, MasterRunner):
        environment.runner.register_message('mobile_list', setup_mobile_list)
class VcodeLoginUser(TaskSet):
    # wait_time = between(5, 5)
    @task
    def vcode_login(self):
        test_mobile = worker_mobile_deque.popleft()
        print("当时获取的手机号:", test_mobile)
        # print("当时行列巨细:", len(worker_mobile_deque))
        global last_mobile
        last_mobile = test_mobile
        with self.client.post("/g/sendMobileVcode",
                              headers=MOBILE_HEADER,
                              json={"busiType": "login", "mobile": str(test_mobile)}) as send_response:
            try:
                send_response_json = send_response.json()
                if send_response_json["message"] == "success":
                    params = {"mobile": str(test_mobile), "vcode": "111111"}
                    # print(test_mobile, "登录恳求参数:", params)
                    with self.client.post("/g/vcodeLogin",
                                          json=params,
                                          headers=MOBILE_HEADER,
                                          catch_response=True) as login_response:
                        # print(login_response.json)
                        login_response_json = login_response.json()
                        if login_response_json["message"] != "success":
                            login_response.failure("message not equal success")
                        elif login_response_json["code"] != 0:
                            login_response.failure("code not equal 0")
                        elif login_response_json["data"]["rId"] == "":
                            login_response.failure("rid is null")
                        elif login_response_json["data"]["mobile"] != str(test_mobile):
                            login_response.failure("mobile is error,入参手机号{},回来的手机号{}"
                                                   .format(test_mobile, login_response.json()["data"]["mobile"]))
                        # print(test_mobile, "恳求成果:", login_response.json())
                else:
                    send_response.failure("{} send code fail".format(test_mobile))
            except Exception as e:
                send_response.failure("send code fail {}".format(e))
    @events.test_stop.add_listener
    def on_test_stop(environment, **kwargs):
        print("脚本完毕")
        print("当时行列巨细:", len(worker_mobile_deque))
        print("最后的手机号:", last_mobile)
class LocustLogin(HttpUser):
    tasks = [VcodeLoginUser]
    host = "https://qa.test.com"
if __name__ == '__main__':
    run_single_user(LocustLogin)

2. 代码拆解-要加必要的断语

首先是根据locust开发的http恳求的脚本大结构是不变的,依旧是两大块:HttpUserTaskSet,这儿不再对其讲解了,大伙看下官方文档就明白了。

接下来便是类VcodeLoginUser,能够看到在这儿面是界说了单个用户的详细动作。留意这儿要加上必要的断语。否则仅靠结构的非200外的过错断语仍是不够的。

比方我这儿关注登录成功后的几个必要字段:coderIdmobile,这些一定是要符合断语的才能够。

果不其然,压测进程中就发现了并发状况下会呈现的问题:入参手机号是a,接口回来的手机号是b。并发量越大过错越多。如果我只断语code=0,那么这个问题就不简略发现了,虽然接口回来的code都是成功的,但是事务上现已存在过错了。

...
        with self.client.post("/g/sendMobileVcode",
                              headers=MOBILE_HEADER,
                              json={"busiType": "login", "mobile": str(test_mobile)}) as send_response:
            try:
                send_response_json = send_response.json()
                if send_response_json["message"] == "success":
                    params = {"mobile": str(test_mobile), "vcode": "111111"}
                    # print(test_mobile, "登录恳求参数:", params)
                    with self.client.post("/g/vcodeLogin",
                                          json=params,
                                          headers=MOBILE_HEADER,
                                          catch_response=True) as login_response:
                        # print(login_response.json)
                        login_response_json = login_response.json()
                        if login_response_json["message"] != "success":
                            login_response.failure("message not equal success")
                        elif login_response_json["code"] != 0:
                            login_response.failure("code not equal 0")
                        elif login_response_json["data"]["rId"] == "":
                            login_response.failure("rid is null")
                        elif login_response_json["data"]["mobile"] != str(test_mobile):
                            login_response.failure("mobile is error,入参手机号{},回来的手机号{}"
                                                   .format(test_mobile, login_response.json()["data"]["mobile"]))
                        # print(test_mobile, "恳求成果:", login_response.json())
                else:
                    send_response.failure("{} send code fail".format(test_mobile))
            except Exception as e:
                send_response.failure("send code fail {}".format(e))
...

3. 代码拆解-单机多核处理

接下来便是重点了,如何在单台机器上用到多cpu。最开端的时分我忽略了这点,后来发现负载上不去,一打开资源监视器才发现只有1个cpu在满负载运转。

这儿示意图仅供参考,我的win笔记本是12c的。

Locust单机多核压测,以及主从节点的数据通信处理

因为Locust是单进程的,不能充分利用多核CPU,所以需求我们压力机上敞开一个master进程,然后再敞开多个slave进程,组成一个单机分布式体系即可。

敞开的方法也很简略:

# 敞开 master 
locust -f locustfile.py --master
# 敞开 slave
locust -f locustfile.py --slave

这儿我们敞开 slave 节点的时分能够敞开对应多个命令行窗口,当时没截图,借用网上的图片示意一下:

Locust单机多核压测,以及主从节点的数据通信处理

敞开后,你的web界面就能够实时看到当时启动的节点数了。

Locust单机多核压测,以及主从节点的数据通信处理

4. 代码拆解-处理主从节点数据通信

敞开主从节点却是很简略,测验数据就需求针对性进行处理了。

因为我的测验登选用的手机号不能够重复,所以要保证不同 slave 节点上同时运转的代码产生的手机号都不能够重复。

继续扒了下官方文档,发现能够通过增加事件监听器来完成我的需求。

这儿我加了三个监听器分别来处理不同的作业:

  • @events.init.add_listener:在locust运转初始化的时分履行
  • @events.test_start.add_listener: 在测验代码开端运转的时分履行
  • @events.test_stop.add_listener: 在测验代码完毕运转的时分履行

@events.test_start.add_listener 首先,在@events.test_start.add_listener里,我主要处理全量数据的生成,以及把这些手机号均匀分配给生成的 slave 节点。

@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
    if not isinstance(environment.runner, WorkerRunner):
        mobile_list = []
        for i in range(13300120000, 13300160000):
            mobile_list.append(i)
        mobile_list_length = len(mobile_list)
        print("列表已生成,总计数量:", mobile_list_length)
        worker_count = environment.runner.worker_count
        chunk_size = int(mobile_list_length / worker_count)
        print(f"均匀每个worker分得的手机号数量:{chunk_size}")
        for i, worker in enumerate(environment.runner.clients):
            start_index = i * chunk_size
            if i   1 < worker_count:
                end_index = start_index   chunk_size
            else:
                end_index = len(mobile_list)
            data = mobile_list[start_index:end_index]
            environment.runner.send_message("mobile_list", data, worker)

留意这儿最后一行中界说的mobile_list,需求界说一个对应函数来接纳这个数据。

def setup_mobile_list(environment, msg, **kwargs):
    len_msg_data = len(msg.data)
    print(f"worker收到的master传来的数据号段:{msg.data[0]} ~ {msg.data[len_msg_data-1]}")
    global worker_mobile_deque
    worker_mobile_deque = deque(msg.data)

这样,不同的 slave 节点脚步分配到的手机号段便是不同的了,解决测验数据重复的问题。

别的,我界说另一个全局变量worker_mobile_deque,这样不同的 slave 节点接纳的数据就能够放到行列里,运转的时分从行列里边取,用一个少一个,直到行列里的数据用完。

@events.init.add_listener 接着便是在@events.init.add_listener里要注册上面界说的数据字段和处理函数。

@events.init.add_listener
def on_locust_init(environment, **_kwargs):
    if not isinstance(environment.runner, MasterRunner):
        environment.runner.register_message('mobile_list', setup_mobile_list)

@events.test_stop.add_listener 最后,在@events.test_stop.add_listener这儿能够做一些后置处理,我是简略起见,只是记载输出了本次测验用到了哪个号码段,这样我下次运转脚本的时分能够从后面的数据开端,最大化测验数据的运用,不浪费。

    @events.test_stop.add_listener
    def on_test_stop(environment, **kwargs):
        print("脚本完毕")
        print("当时行列巨细:", len(worker_mobile_deque))
        print("最后的手机号:", last_mobile)

三、小结

脚本调试完后能够稳定运转,接下来便是测验的进程了,进行了服务器单节点、多节点负载才能的测验,水平拓展才能的测验,以及服务动态扩容、长期高负载测验。测验的角度调查测验报告,服务各项目标的状况。只不过涉及到开发端,调优分析的作业并未能参加许多。不过大约仍是那些常见问题,后续有机会能够再单独分享了。

从运用角度来看,locust深得我爱,比起 jemter真的太轻便了,代码灵敏度也十分高,单机负载才能也是响当当的,这点比jemeter强太多了。我这个项目不需求十分高的量,所以单机只用了8c就够了。如果有小伙伴需求十分高的并发,locust 也支持多机器分布式,进一步扩展并发才能。