angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Using Locust to load test a FastAPI app with concurrent users

Write tests for users in regular Python code, then run the tests with concurrent users.

24 May 2021 Updated 24 May 2021
In API, Testing
post main image
https://unsplash.com/@marsel_shots

I just completed my first FastAPI app. This app allows users to have their own items, meaning that the user data models all have a user_id field. Nothing special but as FastAPI introduces some things new to me, like Dependency Injection, I was unsure if my app would work the way I wanted.

My question was: if I load test the API with many concurrent users, does it still work?

Warning. Only run a load test on your computer if you are sure it can handle it.

Defining the test

I needed a tool that can:

  • Initialize multiple users
  • Concurrently run tests for every user
  • Do this as fast as possible

To initialize a user I must register the user with a unique email address. After that I get the access token for this user, and set the header for the GET and POST operations.

I want the following test for every user:

  • Add an item with a unique name (POST)
    This returns the item added, including its id.
  • Read back the item by id (GET)
    This returns the item.
  • Check if the name in the returned item is equal to the unique name

Selecting Locust for the test

For trivial web applications I use Apache Bench (ab), a very easy to use tool to stress test. But not really useful here because we need to perform initialization and write a (simple) test.

I already wrote many tests for my FastAPI app with Pytest, and there were no problems. I read about pytest-xdist but it looks like it has problems and/or is too slow(?).

I decided to use Locust in this case. It is easy to use and you can code your tests in Python. Wonderful.

The code

Below is the code for the test. Some notes:

import logging
I log everything also to a file.

def unique_str()
In the test I need unique email addresses and city names. We could also a UUID4 here.

network_timeout, connection_timeout
I do not want the test to fail but the client to wait.

wait_time
I want the next test to start immediately when the current one finishes.

dummy task
I enable this one and disable all other task when developing the 'on_start' method.

(POST/GET) catch_response=True
Together with the 'with' construct this enables us to mark a request as failed by calling response.failure().

(GET) name=<URL>
Locust shows all URLs but here GET has a different URL everytime because of the ID. Using the 'name=' it only shows this URL.

Status_code checks
Locust considers a request successful if the response code is <400, but it I want to explicitly check the value.

RescheduleTask()
When an error occurs there is no need to continue a task so we raise an exception. Could be done more advanced.

# test_cities_wrc.py

import logging
import random
import time

from locust import HttpUser, task, between
from locust.exception import RescheduleTask


test_email_host = 'my-local-test.test'

def unique_str():
    return '{:.10f}.{}'.format(time.time(), random.randint(1000, 9999))

def unique_email():
    return 'e.' + unique_str() + '@' + test_email_host

def unique_city_name():
    return 'c.' + unique_str()


class WebsiteTestUser(HttpUser):
    network_timeout = 30.0
    connection_timeout = 30.0
    #wait_time = between(0.5, 3.0)

    def on_start(self):

        base_url = 'http://127.0.0.1:8020/api/v1'

        # set up urls
        register_url = base_url + '/auth/register'
        get_token_url = base_url + '/auth/token'
        # urls used in task
        self.cities_create_url = base_url + '/cities'
        self.cities_get_by_id_url = base_url + '/cities/'

        # get unique email
        email = unique_email()
        password = 'abcdefghi'

        # register
        response = self.client.post(
            register_url,
            json={'email': email, 'password': password},
        )
        if response.status_code != 201:
            error_msg = 'register: response.status_code = {}, expected 201'.format(response.status_code)
            logging.error(error_msg)

        # get_token
        # - username instead of email
        # - x-www-form-urlencoded (instead of json)
        response = self.client.post(
            get_token_url,
            data={'username': email, 'password': password},
        )
        access_token = response.json()['access_token']
        logging.debug('get_token: for email = {}, access_token = {}'.format(email, access_token))

        # set headers with access token
        self.headers = {'Authorization': 'Bearer ' + access_token}

    def on_stop(self):
        pass

    # enable this dummy task to develop 'on_start'
    #@task
    def dummy(self):
        pass

    @task
    def cities_write_read_check(self):
        # add city to api
        city_name = unique_city_name()
        logging.debug('cities_create: city_name = {}'.format(city_name))
        with self.client.post(
            self.cities_create_url,
            json={'name': city_name},
            headers=self.headers,
            catch_response=True,
        ) as response:

            if response.status_code != 201:
                error_msg = 'cities_create: response.status_code = {}, expected 201, city_name = {}'.format(response.status_code, city_name)
                logging.error(error_msg)
                response.failure(error_msg)
                raise RescheduleTask()

            response_dict = response.json()
            if 'data' not in response_dict:
                error_msg = 'cities_create: data not in response_dict, city_name = {}'.format(city_name)
                logging.error(error_msg)
                response.failure(error_msg)
                raise RescheduleTask()

            city_id = response_dict['data']['id']
            logging.debug('cities_create: for city_name = {}, city_id = {}'.format(city_name, city_id))

        # get city from api and check
        with self.client.get(
            self.cities_get_by_id_url + city_id,
            headers=self.headers,
            name=self.cities_get_by_id_url + 'uuid', 
            catch_response=True,
        ) as response:

            if response.status_code != 200:
                error_msg = 'cities_get_by_id: response.status_code = {}, expected 200, city_name = {}'.format(response.status_code, city_name)
                logging.error(error_msg)
                response.failure(error_msg)
                raise RescheduleTask()

            if 'data' not in response_dict:
                error_msg = 'cities_get_by_id: data not in response_dict, city_name = {}'.format(city_name)
                logging.error(error_msg)
                response.failure(error_msg)
                raise RescheduleTask()

            city_name_returned = response_dict['data']['name']
            logging.debug('cities_get_by_id: for city_id = {}, city_name_returned = {}'.format(city_id, city_name_returned))
            
            if city_name_returned != city_name:
                error_msg = 'cities_get_by_id: city_name_returned = {} not equal city_name = {}'.format(city_name_returned, city_name)
                logging.error(error_msg)
                response.failure(error_msg)
                raise RescheduleTask()

Running the test

WARNING: When you run a test like this you are stressing your CPU!

To run the test, and log to a file 'lc.log', open a terminal and enter the command:

locust  -L DEBUG --logfile lc.log -f ./test_cities_wrc.py

And then in your browser type:

http://localhost:8089/

Then for example enter:

  • Number of total users to simulate: 20
  • Spawn rate (users spawned/second): 20
  • Host: http:127.0.0.1

and hit the 'Start swarming' button. Immediately you should hear the fans of your computer starting to run at maximum speed ...

What was the result?

I indeed found a problem with my code. This problem did not show with Pytest. It appeared that somewhere in my AccessControlManager Class used with Dependency Injection, a variable could be overwritten by another request. As a consequence a wrong user_id was returned and the test failed, not all the time but when two or more users accessed the API at the same moment.

Summary

I was uncertain if my first FastAPI app would work under load with many concurrent users. Locust let me simulate this situation in a very easy way, by coding the test in Python. Locust has many options and also can run headless, that is, without a browser.

This test not only helped me to detect the problem but also to isolate it. I fixed the problem but this also told me to read more about Dependency Injection in FastAPI.

Links / credits

Locust
https://docs.locust.io/en/stable/

Leave a comment

Comment anonymously or log in to comment.

Comments (1)

Leave a reply

Reply anonymously or log in to reply.

avatar

Hello ! This is a nice article. But I was wondering one thing :
Imagine you have setup your engine like this :
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_size=5, max_overflow=5)
And I have setup the middleware just like you said, when you try to start 11 requests at the same time, it just hangs. There's not really a queue or nothing. Is this because my routes are async ?
Thanks !