前言
在進(jìn)行性能測(cè)試時(shí),我們需要對(duì)測(cè)試結(jié)果進(jìn)行監(jiān)控和分析,以便于及時(shí)發(fā)現(xiàn)問(wèn)題并進(jìn)行優(yōu)化。
Locust在內(nèi)存中維護(hù)了一個(gè)時(shí)間序列數(shù)據(jù)結(jié)構(gòu),用于存儲(chǔ)每個(gè)事件的統(tǒng)計(jì)信息。 這個(gè)數(shù)據(jù)結(jié)構(gòu)允許我們?cè)贑harts標(biāo)簽頁(yè)中查看不同時(shí)間點(diǎn)的性能指標(biāo),但是正因?yàn)長(zhǎng)ocust WebUI上展示的數(shù)據(jù)實(shí)際上是存儲(chǔ)在內(nèi)存中的。所以在Locust測(cè)試結(jié)束后,這些數(shù)據(jù)將不再可用。 如果我們需要長(zhǎng)期保存以便后續(xù)分析測(cè)試數(shù)據(jù),可以考慮將Locust的測(cè)試數(shù)據(jù)上報(bào)到外部的數(shù)據(jù)存儲(chǔ)系統(tǒng),如InfluxDB,并使用Grafana等可視化工具進(jìn)行展示和分析。
本文將介紹如何使用Locust進(jìn)行負(fù)載測(cè)試,并將測(cè)試數(shù)據(jù)上報(bào)到InfluxDB。同時(shí),我們將使用Grafana對(duì)測(cè)試數(shù)據(jù)進(jìn)行展示和分析。
最終效果:
influxDB
InfluxDB是一款開(kāi)源的時(shí)間序列數(shù)據(jù)庫(kù),專為處理大量的時(shí)間序列數(shù)據(jù)而設(shè)計(jì)。時(shí)間序列數(shù)據(jù)通常是按照時(shí)間順序存儲(chǔ)的數(shù)據(jù)點(diǎn),每個(gè)數(shù)據(jù)點(diǎn)都包含一個(gè)時(shí)間戳和一個(gè)或多個(gè)與之相關(guān)的值。這種數(shù)據(jù)類型在許多場(chǎng)景下都非常常見(jiàn),如監(jiān)控系統(tǒng)、物聯(lián)網(wǎng)設(shè)備、金融市場(chǎng)數(shù)據(jù)等。在這些場(chǎng)景下,數(shù)據(jù)上報(bào)是一種關(guān)鍵的需求,因?yàn)樗梢詭椭覀儗?shí)時(shí)了解系統(tǒng)的狀態(tài)和性能。
注: InfluxDB 開(kāi)源的時(shí)單機(jī)版本,集群版本并未開(kāi)元,但是對(duì)于普通用戶的日常場(chǎng)景已經(jīng)完全夠用。
以下是關(guān)于InfluxDB的關(guān)鍵特性和優(yōu)勢(shì)的表格:
安裝運(yùn)行InfluxDB
如果你已經(jīng)安裝了Docker,可以直接使用官方的InfluxDB鏡像來(lái)運(yùn)行InfluxDB:
docker run -p 8086:8086 -v $PWD:/var/lib/influxdb influxdb:1.8
使用Python 上報(bào)數(shù)據(jù)到influxdb
首先,確保已經(jīng)安裝了influxdb
庫(kù):
pip install influxdb
然后,使用以下代碼上報(bào)數(shù)據(jù)到InfluxDB:
以下是一個(gè)使用Python操作InfluxDB上報(bào)數(shù)據(jù)的示例,對(duì)照MySQL進(jìn)行注釋:
import time
from influxdb import InfluxDBClient
# 連接到InfluxDB(類似于連接到MySQL數(shù)據(jù)庫(kù))
client = InfluxDBClient(host='localhost', port=8086)
# 創(chuàng)建數(shù)據(jù)庫(kù)(類似于在MySQL中創(chuàng)建一個(gè)新的數(shù)據(jù)庫(kù))
client.create_database('mydb')
# 切換到創(chuàng)建的數(shù)據(jù)庫(kù)(類似于在MySQL中選擇一個(gè)數(shù)據(jù)庫(kù))
client.switch_database('mydb')
# 上報(bào)數(shù)據(jù)(類似于在MySQL中插入一條記錄)
data = [
{
# 在InfluxDB中,measurement相當(dāng)于MySQL中的表名
"measurement": "cpu_load",
# tags相當(dāng)于MySQL中的索引列,用于快速查詢
"tags": {
"host": "server01",
"region": "us-west"
},
# time為時(shí)間戳,是InfluxDB中的關(guān)鍵字段
"time": int(time.time_ns()),
# fields相當(dāng)于MySQL中的數(shù)據(jù)列,用于存儲(chǔ)實(shí)際的數(shù)據(jù)值
"fields": {
"value": 0.64
}
}
]
# 寫入數(shù)據(jù)(類似于在MySQL中執(zhí)行INSERT語(yǔ)句)
client.write_points(data)
在這個(gè)示例中,我們首先連接到InfluxDB(類似于連接到MySQL數(shù)據(jù)庫(kù)),然后創(chuàng)建一個(gè)名為mydb
的數(shù)據(jù)庫(kù)(類似于在MySQL中創(chuàng)建一個(gè)新的數(shù)據(jù)庫(kù)),并切換到創(chuàng)建的數(shù)據(jù)庫(kù)(類似于在MySQL中選擇一個(gè)數(shù)據(jù)庫(kù))。接著,我們準(zhǔn)備了一條名為cpu_load
的數(shù)據(jù)(在InfluxDB中,measurement相當(dāng)于MySQL中的表名),并為數(shù)據(jù)添加了host
和region
標(biāo)簽(類似于MySQL中的索引列)。最后,我們將數(shù)據(jù)寫入到InfluxDB中(類似于在MySQL中執(zhí)行INSERT語(yǔ)句)。
執(zhí)行上面的代碼后我們可以看到我們的操作成功了:
如果我們安裝了influx-cli就可以在命令行中直接查詢剛才寫入的數(shù)據(jù):
bingohe@MacBook-Pro ~ $ /usr/local/Cellar/influxdb@1/1.11.1/bin/influx
Connected to http://localhost:8086 version 1.8.10
InfluxDB shell version: 1.11.1
> show databases;
name: databases
name
----
_internal
mydb
> use mydb
Using database mydb
> show measurements;
name: measurements
name
----
cpu_load
> select * from cpu_load;
name: cpu_load
time host region value
---- ---- ------ -----
1688874870046897000 server01 us-west 0.64
Locust 數(shù)據(jù)寫入到 influx
在 【Python】萬(wàn)字長(zhǎng)文,Locust 性能測(cè)試指北(上) 中我們提到過(guò)Locust的生命周期,我們也通過(guò)Locust生命周期實(shí)現(xiàn)了集合點(diǎn)的功能。現(xiàn)在我們一起來(lái)通過(guò)它實(shí)現(xiàn)測(cè)試數(shù)據(jù)的實(shí)時(shí)展示。
Locust的生命周期
test_start
:測(cè)試開(kāi)始時(shí)觸發(fā)。spawning_start
:生成用戶時(shí)觸發(fā)。user_add
:每個(gè)用戶被添加時(shí)觸發(fā)。spawning_complete
:所有用戶生成完成時(shí)觸發(fā)。request
:每個(gè)請(qǐng)求發(fā)生時(shí)觸發(fā)。test_stop
:測(cè)試停止時(shí)觸發(fā)。
上報(bào)數(shù)據(jù)
我們先來(lái)看看常用的事件里面可以獲取到的數(shù)據(jù):
import time
from locust import HttpUser, task, between, events
@events.request.add_listener
def request_handler(*args, **kwargs):
print(f"request args: {args}")
print(f"request kwargs: {kwargs}")
@events.worker_report.add_listener
def worker_report_handlers(*args, **kwargs):
print(f"worker_report args: {args}")
print(f"worker_report kwargs: {kwargs}")
@events.test_start.add_listener
def test_start_handlers(*args, **kwargs):
print(f"test_start args: {args}")
print(f"test_start kwargs: {kwargs}")
@events.test_stop.add_listener
def test_stop_handlers(*args, **kwargs):
print(f"test_stop args: {args}")
print(f"test_stop kwargs: {kwargs}")
class QuickstartUser(HttpUser):
wait_time = between(1, 2)
@task
def root(self):
with self.client.get("/", json={"time": time.time()}, catch_response=True) as rsp:
rsp_json = rsp.json()
if rsp_json["id"] != 5:
# 失敗時(shí)上報(bào)返回的數(shù)據(jù)
rsp.failure(f"{rsp_json}")
運(yùn)行一次測(cè)試時(shí)能看到這些生命周期內(nèi)的Locust 對(duì)外暴露的數(shù)據(jù):
test_start args: ()
test_start kwargs: {'environment': <locust.env.Environment object at 0x10c426c70>}
request args: ()
request kwargs: {'request_type': 'GET', 'response_time': 2.6886250000011103, 'name': '/', 'context': {}, 'response': <Response [200]>, 'exception': None, 'start_time': 1688888321.896039, 'url': 'http://0.0.0.0:10000/', 'response_length': 8}
request args: ()
request kwargs: {'request_type': 'GET', 'response_time': 2.735957999998817, 'name': '/', 'context': {}, 'response': <Response [200]>, 'exception': CatchResponseError("{'id': 6}"), 'start_time': 1688888323.421389, 'url': 'http://0.0.0.0:10000/', 'response_length': 8}
test_stopping args: ()
test_stopping kwargs: {'environment': <locust.env.Environment object at 0x10c426c70>}
test_stop args: ()
test_stop kwargs: {'environment': <locust.env.Environment object at 0x10c426c70>}
從上面的監(jiān)控我們可以看到,每次任務(wù)啟動(dòng)和停止的時(shí)候會(huì)分別調(diào)用@events.test_start.add_listener
和@events.test_stop.add_listener
裝飾的函數(shù),每次請(qǐng)求發(fā)生的的時(shí)候都會(huì)調(diào)用@events.request.add_listener
監(jiān)聽(tīng)器裝飾的函數(shù),我們就是要利用這一點(diǎn)來(lái)進(jìn)行數(shù)據(jù)的上報(bào)。
通過(guò)查看 Locust 的 EventHook
源碼注釋我們可以看到標(biāo)準(zhǔn)的使用方法:
#.../site-packages/locust/event.py
...
class EventHook:
"""
Simple event class used to provide hooks for different types of events in Locust.
Here's how to use the EventHook class::
my_event = EventHook()
def on_my_event(a, b, **kw):
print("Event was fired with arguments: %s, %s" % (a, b))
my_event.add_listener(on_my_event)
my_event.fire(a="foo", b="bar")
If reverse is True, then the handlers will run in the reverse order
that they were inserted
"""
...
結(jié)合前面的寫數(shù)據(jù)到 influxDB
的實(shí)現(xiàn),上報(bào)數(shù)據(jù)這一項(xiàng)一下子就變簡(jiǎn)單了:
簡(jiǎn)單實(shí)現(xiàn)每次請(qǐng)求數(shù)據(jù)上報(bào) 到 influxDB
下面的代碼運(yùn)行Locust測(cè)試后會(huì)自動(dòng)創(chuàng)建一個(gè)locust_requests
的 measurement
,然后將每次請(qǐng)求的數(shù)據(jù)上報(bào)。
import time
from datetime import datetime
from influxdb import InfluxDBClient
from locust import HttpUser, task, between, events
client = InfluxDBClient(host='localhost', port=8086, database="mydb")
def request(request_type, name, response_time, response_length, response, context, exception, url, start_time):
_time = datetime.utcnow()
was_successful = True
if response:
was_successful = 199 < response.status_code < 400
tags = {
'request_type': request_type,
'name': name,
'success': was_successful,
'exception': str(exception),
}
fields = {
'response_time': response_time,
'response_length': response_length,
}
data = {"measurement": 'locust_requests', "tags": tags, "time": _time, "fields": fields}
client.write_points([data])
# 在每次請(qǐng)求的時(shí)候通過(guò)前面定義的request函數(shù)寫數(shù)據(jù)到 DB
events.request.add_listener(request)
class QuickstartUser(HttpUser):
wait_time = between(1, 2)
@task
def root(self):
with self.client.get("/", json={"time": time.time()}, catch_response=True) as rsp:
rsp_json = rsp.json()
if rsp_json["id"] != 5:
rsp.failure(f"{rsp_json}")
優(yōu)化升級(jí)
上面的這個(gè)上報(bào)很粗糙,每次請(qǐng)求會(huì)上報(bào)一次數(shù)據(jù),會(huì)影響實(shí)際的壓測(cè),如果我們將要上報(bào)的數(shù)據(jù)放在一個(gè)數(shù)據(jù)結(jié)構(gòu)中中,異步的上報(bào)這個(gè)數(shù)據(jù)將極大的提升性能
# 將 __flush_points 方法中的寫入操作放到一個(gè)單獨(dú)的線程中,避免阻塞主線程,提高性能。
self.write_thread = threading.Thread(target=self.__write_points_worker)
# 批量寫入
if len(self.write_batch) >= self.batch_size or time.time() - self.last_flush_time >= self.interval_ms / 1000:
# 使用 gzip 壓縮上報(bào)的數(shù)據(jù)
influxdb_writer = InfluxDBWriter('localhost', 8086, 'mydb', batch_size=1000, gzip_enabled=True)
...
配置Grafana
在測(cè)試數(shù)據(jù)被上報(bào)到InfluxDB之后,可以通過(guò)Grafana進(jìn)行數(shù)據(jù)展示和分析。需要先在Grafana中配置InfluxDB數(shù)據(jù)源,然后創(chuàng)建相應(yīng)的圖表和儀表盤。
在創(chuàng)建圖表和儀表盤時(shí),可以選擇InfluxDB作為數(shù)據(jù)源,并使用InfluxQL查詢語(yǔ)言進(jìn)行數(shù)據(jù)查詢和過(guò)濾。可以根據(jù)需要選擇不同的圖表類型和顯示方式,以展示測(cè)試結(jié)果數(shù)據(jù)的趨勢(shì)和變化。
總結(jié)
本文介紹了如何將Locust測(cè)試數(shù)據(jù)上報(bào)到InfluxDB,并通過(guò)Grafana進(jìn)行展示和分析。通過(guò)將測(cè)試數(shù)據(jù)與監(jiān)控工具相結(jié)合,可以更好地了解系統(tǒng)的性能和穩(wěn)定性,及時(shí)發(fā)現(xiàn)問(wèn)題并進(jìn)行優(yōu)化,也可以方便后續(xù)進(jìn)行測(cè)試數(shù)據(jù)分析。希望本文能對(duì)大家有所幫助。