Using Locust to load test a FastAPI app with concurrent users
Write tests for users in regular Python code, then run the tests with concurrent users.
I just completed my first FastAPI app. This app allows users to have their own items, meaning that the user data models all have a user_id field. Nothing special but as FastAPI introduces some things new to me, like Dependency Injection, I was unsure if my app would work the way I wanted.
My question was: if I load test the API with many concurrent users, does it still work?
Warning. Only run a load test on your computer if you are sure it can handle it.
Defining the test
I needed a tool that can:
- Initialize multiple users
- Concurrently run tests for every user
- Do this as fast as possible
To initialize a user I must register the user with a unique email address. After that I get the access token for this user, and set the header for the GET and POST operations.
I want the following test for every user:
- Add an item with a unique name (POST)
This returns the item added, including its id. - Read back the item by id (GET)
This returns the item. - Check if the name in the returned item is equal to the unique name
Selecting Locust for the test
For trivial web applications I use Apache Bench (ab), a very easy to use tool to stress test. But not really useful here because we need to perform initialization and write a (simple) test.
I already wrote many tests for my FastAPI app with Pytest, and there were no problems. I read about pytest-xdist but it looks like it has problems and/or is too slow(?).
I decided to use Locust in this case. It is easy to use and you can code your tests in Python. Wonderful.
The code
Below is the code for the test. Some notes:
import logging
I log everything also to a file.
def unique_str()
In the test I need unique email addresses and city names. We could also a UUID4 here.
network_timeout, connection_timeout
I do not want the test to fail but the client to wait.
wait_time
I want the next test to start immediately when the current one finishes.
dummy task
I enable this one and disable all other task when developing the 'on_start' method.
(POST/GET) catch_response=True
Together with the 'with' construct this enables us to mark a request as failed by calling response.failure().
(GET) name=<URL>
Locust shows all URLs but here GET has a different URL everytime because of the ID. Using the 'name=' it only shows this URL.
Status_code checks
Locust considers a request successful if the response code is <400, but it I want to explicitly check the value.
RescheduleTask()
When an error occurs there is no need to continue a task so we raise an exception. Could be done more advanced.
# test_cities_wrc.py
import logging
import random
import time
from locust import HttpUser, task, between
from locust.exception import RescheduleTask
test_email_host = 'my-local-test.test'
def unique_str():
return '{:.10f}.{}'.format(time.time(), random.randint(1000, 9999))
def unique_email():
return 'e.' + unique_str() + '@' + test_email_host
def unique_city_name():
return 'c.' + unique_str()
class WebsiteTestUser(HttpUser):
network_timeout = 30.0
connection_timeout = 30.0
#wait_time = between(0.5, 3.0)
def on_start(self):
base_url = 'http://127.0.0.1:8020/api/v1'
# set up urls
register_url = base_url + '/auth/register'
get_token_url = base_url + '/auth/token'
# urls used in task
self.cities_create_url = base_url + '/cities'
self.cities_get_by_id_url = base_url + '/cities/'
# get unique email
email = unique_email()
password = 'abcdefghi'
# register
response = self.client.post(
register_url,
json={'email': email, 'password': password},
)
if response.status_code != 201:
error_msg = 'register: response.status_code = {}, expected 201'.format(response.status_code)
logging.error(error_msg)
# get_token
# - username instead of email
# - x-www-form-urlencoded (instead of json)
response = self.client.post(
get_token_url,
data={'username': email, 'password': password},
)
access_token = response.json()['access_token']
logging.debug('get_token: for email = {}, access_token = {}'.format(email, access_token))
# set headers with access token
self.headers = {'Authorization': 'Bearer ' + access_token}
def on_stop(self):
pass
# enable this dummy task to develop 'on_start'
#@task
def dummy(self):
pass
@task
def cities_write_read_check(self):
# add city to api
city_name = unique_city_name()
logging.debug('cities_create: city_name = {}'.format(city_name))
with self.client.post(
self.cities_create_url,
json={'name': city_name},
headers=self.headers,
catch_response=True,
) as response:
if response.status_code != 201:
error_msg = 'cities_create: response.status_code = {}, expected 201, city_name = {}'.format(response.status_code, city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
response_dict = response.json()
if 'data' not in response_dict:
error_msg = 'cities_create: data not in response_dict, city_name = {}'.format(city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
city_id = response_dict['data']['id']
logging.debug('cities_create: for city_name = {}, city_id = {}'.format(city_name, city_id))
# get city from api and check
with self.client.get(
self.cities_get_by_id_url + city_id,
headers=self.headers,
name=self.cities_get_by_id_url + 'uuid',
catch_response=True,
) as response:
if response.status_code != 200:
error_msg = 'cities_get_by_id: response.status_code = {}, expected 200, city_name = {}'.format(response.status_code, city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
if 'data' not in response_dict:
error_msg = 'cities_get_by_id: data not in response_dict, city_name = {}'.format(city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
city_name_returned = response_dict['data']['name']
logging.debug('cities_get_by_id: for city_id = {}, city_name_returned = {}'.format(city_id, city_name_returned))
if city_name_returned != city_name:
error_msg = 'cities_get_by_id: city_name_returned = {} not equal city_name = {}'.format(city_name_returned, city_name)
logging.error(error_msg)
response.failure(error_msg)
raise RescheduleTask()
Running the test
WARNING: When you run a test like this you are stressing your CPU!
To run the test, and log to a file 'lc.log', open a terminal and enter the command:
locust -L DEBUG --logfile lc.log -f ./test_cities_wrc.py
And then in your browser type:
http://localhost:8089/
Then for example enter:
- Number of total users to simulate: 20
- Spawn rate (users spawned/second): 20
- Host: http:127.0.0.1
and hit the 'Start swarming' button. Immediately you should hear the fans of your computer starting to run at maximum speed ...
What was the result?
I indeed found a problem with my code. This problem did not show with Pytest. It appeared that somewhere in my AccessControlManager Class used with Dependency Injection, a variable could be overwritten by another request. As a consequence a wrong user_id was returned and the test failed, not all the time but when two or more users accessed the API at the same moment.
Summary
I was uncertain if my first FastAPI app would work under load with many concurrent users. Locust let me simulate this situation in a very easy way, by coding the test in Python. Locust has many options and also can run headless, that is, without a browser.
This test not only helped me to detect the problem but also to isolate it. I fixed the problem but this also told me to read more about Dependency Injection in FastAPI.
Links / credits
Locust
https://docs.locust.io/en/stable/
Leave a comment
Comment anonymously or log in to comment.
Comments (1)
Leave a reply
Reply anonymously or log in to reply.
Hello ! This is a nice article. But I was wondering one thing :
Imagine you have setup your engine like this :
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_size=5, max_overflow=5)
And I have setup the middleware just like you said, when you try to start 11 requests at the same time, it just hangs. There's not really a queue or nothing. Is this because my routes are async ?
Thanks !
Recent
- Hiding database UUID primary keys of your web application
- Don't Repeat Yourself (DRY) with Jinja2
- SQLAlchemy, PostgreSQL, maximum number of rows per user
- Show the values in SQLAlchemy dynamic filters
- Secure data transfer with Public Key encryption and pyNaCl
- rqlite: a high-availability and distributed SQLite alternative
Most viewed
- Using Python's pyOpenSSL to verify SSL certificates downloaded from a host
- Using UUIDs instead of Integer Autoincrement Primary Keys with SQLAlchemy and MariaDb
- Connect to a service on a Docker host from a Docker container
- Using PyInstaller and Cython to create a Python executable
- SQLAlchemy: Using Cascade Deletes to delete related objects
- Flask RESTful API request parameter validation with Marshmallow schemas