angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Uso de Locust para probar la carga de una aplicación FastAPI con users concurrentes

Escriba las pruebas para users en el código regular de Python , luego ejecute las pruebas con users concurrentes.

24 mayo 2021
En API, Testing
post main image
https://unsplash.com/@marsel_shots

Acabo de completar mi primera aplicación FastAPI . Esta app permite que los user tengan sus propios elementos, lo que significa que los modelos de datos user tienen todos un campo user_id. Nada especial, pero como FastAPI introduce algunas cosas nuevas para mí, como Dependency Injection, no estaba seguro de si mi aplicación funcionaría como yo quería.

Mi pregunta era: si pruebo la carga del API con muchos users concurrentes, ¿sigue funcionando?

Advertencia. Sólo ejecute una prueba de carga en su ordenador si está seguro de que puede soportarla.

Definición de la prueba

Necesitaba una herramienta que pudiera

  • Inicializar múltiples users
  • Ejecutar simultáneamente pruebas para cada user
  • Hacer esto lo más rápido posible

Para inicializar un user debo registrar el user con una dirección de correo electrónico única. Después obtengo el token de acceso para este user, y establezco la cabecera para las operaciones GET y POST .

Quiero la siguiente prueba para cada user:

  • Añadir un elemento con un nombre único (POST)
    Esto devuelve el elemento añadido, incluyendo su id.
  • Leer de nuevo el elemento por id (GET)
    Esto devuelve el elemento.
  • Comprobar si el nombre del elemento devuelto es igual al nombre único

Seleccionar Locust para la prueba

Para aplicaciones web triviales utilizo Apache Bench (ab), una herramienta muy fácil de usar para hacer pruebas de estrés. Pero no es realmente útil aquí porque necesitamos realizar la inicialización y escribir una prueba (simple).

Ya escribí muchas pruebas para mi aplicación FastAPI con Pytest, y no hubo problemas. He leído sobre pytest-xdist pero parece que tiene problemas y/o es demasiado lento(?).

Decidí utilizar Locust en este caso. Es fácil de usar y puedes codificar tus pruebas en Python. Es una maravilla.

El código

A continuación se muestra el código de la prueba. Algunas notas:

import logging
Registro todo también en un archivo.

def unique_str()
En la prueba necesito direcciones de correo electrónico únicas y nombres de ciudades. También podríamos un UUID4 aquí.

network_timeout, connection_timeout
No quiero que la prueba falle sino que el cliente espere.

wait_time
Quiero que la siguiente prueba comience inmediatamente cuando termine la actual.

tarea ficticia
Habilito ésta y deshabilito todas las demás tareas al desarrollar el método 'on_start'.

(POST/GET) catch_response=True
Junto con la construcción 'with' esto nos permite marcar una petición como fallida llamando a response.failure().

(GET) name=<URL>
Locust muestra todas las URLs pero aquí GET tiene una URL diferente cada vez debido al ID. Con el 'name=' sólo se muestra esta URL.

Comprobación del código de estado
Locust considera que una petición es correcta si el código de respuesta es <400, pero quiero comprobar explícitamente el valor.

RescheduleTask()
Cuando se produce un error no es necesario continuar una tarea por lo que lanzamos una excepción. Se podría hacer de forma más avanzada.

# test_cities_wrc.py

import logging
import random
import time

from locust import HttpUser, task, between
from locust.exception import RescheduleTask


test_email_host = 'my-local-test.test'

def unique_str():
    return '{:.10f}.{}'.format(time.time(), random.randint(1000, 9999))

def unique_email():
    return 'e.'  +  unique_str()  +  '@'  +  test_email_host

def unique_city_name():
    return 'c.'  +  unique_str()


class WebsiteTestUser(HttpUser):
     network_timeout  = 30.0
     connection_timeout  = 30.0
    #wait_time  = between(0.5, 3.0)

    def on_start(self):

        base_url = 'http://127.0.0.1:8020/api/v1'

        # set up urls
        register_url = base_url  +  '/auth/register'
        get_token_url = base_url  +  '/auth/token'
        # urls used in task
        self.cities_create_url = base_url  +  '/cities'
        self.cities_get_by_id_url = base_url  +  '/cities/'

        # get unique email
        email = unique_email()
        password = 'abcdefghi'

        # register
        response = self.client.post(
            register_url,
            json={'email': email, 'password': password},
        )
        if response.status_code != 201:
            error_msg = 'register: response.status_code = {}, expected 201'.format(response.status_code)
            logging.error(error_msg)

        # get_token
        # -  username instead of email
        # - x-www-form-urlencoded (instead of json)
        response = self.client.post(
            get_token_url,
            data={'username': email, 'password': password},
        )
        access_token = response.json()['access_token']
        logging.debug('get_token: for email = {}, access_token = {}'.format(email, access_token))

        # set headers with access token
        self.headers = {'Authorization': 'Bearer '  +  access_token}

    def on_stop(self):
        pass

    # enable this dummy task to develop 'on_start'
    #@task
    def dummy(self):
        pass

    @task
    def cities_write_read_check(self):
        # add city to api
        city_name = unique_city_name()
        logging.debug('cities_create: city_name = {}'.format(city_name))
        with self.client.post(
            self.cities_create_url,
            json={'name': city_name},
            headers=self.headers,
             catch_response=True,
        ) as response:

            if response.status_code != 201:
                error_msg = 'cities_create: response.status_code = {}, expected 201, city_name = {}'.format(response.status_code, city_name)
                logging.error(error_msg)
                response.failure(error_msg)
                raise  RescheduleTask()

            response_dict = response.json()
            if 'data' not in response_dict:
                error_msg = 'cities_create: data not in response_dict, city_name = {}'.format(city_name)
                logging.error(error_msg)
                response.failure(error_msg)
                raise  RescheduleTask()

            city_id = response_dict['data']['id']
            logging.debug('cities_create: for city_name = {}, city_id = {}'.format(city_name, city_id))

        # get city from api and check
        with self.client.get(
            self.cities_get_by_id_url  +  city_id,
            headers=self.headers,
             name=self.cities_get_by_id_url  +  'uuid', 
             catch_response=True,
        ) as response:

            if response.status_code != 200:
                error_msg = 'cities_get_by_id: response.status_code = {}, expected 200, city_name = {}'.format(response.status_code, city_name)
                logging.error(error_msg)
                response.failure(error_msg)
                raise  RescheduleTask()

            if 'data' not in response_dict:
                error_msg = 'cities_get_by_id: data not in response_dict, city_name = {}'.format(city_name)
                logging.error(error_msg)
                response.failure(error_msg)
                raise  RescheduleTask()

            city_name_returned = response_dict['data']['name']
            logging.debug('cities_get_by_id: for city_id = {}, city_name_returned = {}'.format(city_id, city_name_returned))
            
            if city_name_returned != city_name:
                error_msg = 'cities_get_by_id: city_name_returned = {} not equal city_name = {}'.format(city_name_returned, city_name)
                logging.error(error_msg)
                response.failure(error_msg)
                raise  RescheduleTask()

Ejecutando el test

ADVERTENCIA: ¡Cuando ejecutas una prueba como ésta estás estresando tu CPU!

Para ejecutar la prueba, y el registro en un archivo 'lc.log', abrir un terminal y entrar en el comando:

locust  -L  DEBUG  --logfile  lc.log  -f ./test_cities_wrc.py

Y luego en su navegador escriba:

http://localhost:8089/

Luego, por ejemplo, introduzca:

  • Número de user totales a simular: 20
  • Tasa de desove (users desovados/segundo): 20
  • Anfitrión: http:127.0.0.1

y pulsa el botón 'Start swarming'. Inmediatamente deberías oír cómo los ventiladores de tu ordenador empiezan a funcionar a máxima velocidad...

¿Cuál fue el resultado?

Efectivamente, encontré un problema en mi código. Este problema no apareció con Pytest. Parecía que en algún lugar de mi AccessControlManager Class utilizado con Dependency Injection, una variable podría ser sobrescrita por otra solicitud. Como consecuencia, se devolvía un user_id erróneo y la prueba fallaba, no siempre, sino cuando dos o más user accedían al API en el mismo momento.

Resumen

No sabía si mi primera aplicación FastAPI funcionaría bajo carga con muchos users concurrentes. Locust me permitió simular esta situación de forma muy sencilla, codificando la prueba en Python. Locust tiene muchas opciones y también puede ejecutarse sin cabeza, es decir, sin navegador.

Esta prueba no sólo me ayudó a detectar el problema sino también a aislarlo. Arreglé el problema pero esto también me indicó que debía leer más sobre Dependency Injection en FastAPI.

Enlaces / créditos

Locust
https://docs.locust.io/en/stable/

Deje un comentario

Comente de forma anónima o inicie sesión para comentar.

Comentarios (1)

Deje una respuesta.

Responda de forma anónima o inicie sesión para responder.

avatar

Hello ! This is a nice article. But I was wondering one thing :
Imagine you have setup your engine like this :
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_size=5, max_overflow=5)
And I have setup the middleware just like you said, when you try to start 11 requests at the same time, it just hangs. There's not really a queue or nothing. Is this because my routes are async ?
Thanks !