angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Evitar el envío de mensajes duplicados a un sistema remoto

La consideración de todos los estados y condiciones posibles no es deseable, sino un requisito.

21 febrero 2023 Actualizado 21 febrero 2023
post main image
https://www.pexels.com/nl-nl/@monoar-rahman-22660

Muchas veces las aplicaciones deben enviar mensajes a un sistema remoto. En un mundo perfecto sólo tenemos que ocuparnos del Flujo Feliz: no ocurren cosas malas como excepciones, errores.

Por desgracia, el mundo no es perfecto. Además de los errores de programación, las conexiones pueden fallar, los sistemas de bases de datos pueden fallar, los sistemas remotos pueden fallar. Una simple pregunta - ¿puedes escribir algo de código para enviar mensajes a un sistema remoto - fácilmente puede convertirse en un proyecto muy complejo.

En este post muestro una forma de evitar el envío de mensajes duplicados en caso de que falle la actualización de un registro de mensajes de la base de datos.

Compromisos

Si estás desarrollando un sistema que realiza transacciones financieras, los requisitos son totalmente diferentes a los de un sistema que envía entradas por correo electrónico para un concierto. El primero no debe fallar nunca y debe poder reiniciarse y continuar exactamente desde donde lo dejó. El sistema que envía las entradas puede tener un problema unas cuantas veces al mes, con lo que los consumidores pueden no recibirlas o recibirlas dos veces.

Lo que quiero decir con esto es que siempre hay compensaciones. ¿Merece la pena dedicar mucho tiempo y desarrollar una aplicación a prueba de balas o aceptar fallos esporádicos? Si se produce un fallo esporádico, hay que conocer las consecuencias. Un fallo que impide enviar un correo electrónico de vez en cuando no es tan malo como un fallo que sigue enviando los mismos correos electrónicos cientos o incluso miles de veces.

Nuestro servicio de entrega

Nuestro servicio deliver se conecta a una base de datos con mensajes que deben ser enviados a un sistema remoto.

  +----------+      +----------+      +----------+
  | database |      | deliver- |      | remote   |
  | message  |<-----| service  |----->| system   |
  | records  |      |          |      |          |
  +----------+      +----------+      +----------+

El servicio deliver, se ejecuta en cada tick, por ejemplo, se ejecuta cada segundo. El registro de mensajes tiene una bandera 'is_delivered'. Esta bandera es inicialmente 'False' y se actualiza a 'True' después de que un mensaje ha sido enviado. Este indicador también se utiliza cuando se seleccionan registros de mensajes de la base de datos.

Pasos:

  1. Seleccionar mensajes de la base de datos que aún no han sido entregados.
  2. Entregar uno a uno un mensaje al sistema remoto.
  3. (post-update) Una vez que un mensaje ha sido entregado, actualizamos la bandera 'is_delivered'.

Llamo a la acción del paso 3. post-update porque es una actualización después de enviar el mensaje.

Aquí está el código de Happy Flow sin ninguna comprobación de errores y manejo de errores:

# happy flow code

class RemoteSystem:
    def receive_data(self, data):
        self.data = data
        print('remote system: received data = {}'.format(data))

# database message record
class Message:
    def __init__(self, data):
        self.data = data
        self.is_delivered = False
    def __str__(self):
        return '<Message: data = {}, is_delivered = {}>'.format(self.data, self.is_delivered)

class DeliverService:
    def __init__(self):
        # database
        self.messages = [Message('data1'), Message('data2')]
        # remote system
        self.rem_sys = RemoteSystem()

    def select_messages_to_deliver(self):
        return [message for message in self.messages if not message.is_delivered]

    def deliver_message(self, message):
        # send to remote system
        self.rem_sys.receive_data(message.data)

    def deliver_messages(self):
        delivered_count = 0
        for message in self.select_messages_to_deliver():
            # deliver message
            self.deliver_message(message)
            delivered_count += 1
            # update message record
            message.is_delivered = True
        return delivered_count

def main():
    print('\nHAPPY FLOW\n{}'.format('-'*40))
    ds = DeliverService()
    for tick in range(2):
        # debug
        print('on tick-{} messages:'.format(tick))
        for message in ds.messages:
            print('- {}'.format(message))
        # deliver_messages
        delivered_count = ds.deliver_messages()
        # debug
        print('on tick-{} messages delivered to remote system: {}'.format(tick, delivered_count))

if __name__ == '__main__':
    main()

Y aquí está el resultado después de la ejecución:

HAPPY FLOW
----------------------------------------
on tick-0 messages:
- <Message: data = data1, is_delivered = False>
- <Message: data = data2, is_delivered = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1 messages:
- <Message: data = data1, is_delivered = True>
- <Message: data = data2, is_delivered = True>
on tick-1 messages delivered to remote system: 0

Los mensajes fueron entregados en el primer tick, en el segundo tick, ningún mensaje fue entregado.

Hecho, ¡listo! Eso fue fácil ... en el mundo perfecto ...

Haz una lista de todas las cosas que pueden ir mal, mira las implicaciones y cómo resolverlas

Como nuestro mundo no es perfecto, debemos tener en cuenta todos los errores posibles que pueden ocurrir. De cada uno de estos errores, debemos analizar las consecuencias que tiene en el funcionamiento de nuestro servicio.

Consideremos los errores temporales, o transitorios, y los errores permanentes. Un error transitorio suele ser puntual o de corta duración. Si un sistema remoto no está disponible durante un breve periodo de tiempo, recibimos un timeout. Un ejemplo de error permanente es la caída de un servidor de base de datos.

Para nuestro servicio de entrega, he aquí una lista (limitada) de cosas que pueden fallar, sus implicaciones y cómo solucionarlas:

  • Error de selección de base de datos: no se pueden seleccionar mensajes de la base de datos.
    Nuestros mensajes no serán entregados. Eso no es bueno, pero ya está. Puede que nuestro servidor de base de datos esté temporalmente caído, podemos investigarlo más a fondo.
    Como solucionarlo: Seguimos intentando hasta que la base de datos responda de nuevo.
  • Sistema remoto (temporal) no disponible.
    De nuevo nuestros mensajes no serán entregados. Esto no es bueno, pero ya está.
    Cómo solucionarlo: Ya tenemos la bandera 'is_delivered'. Sólo actualizamos esta bandera a 'True' si el sistema remoto acusa recibo de nuestro mensaje.
  • Error de actualización posterior de la base de datos: no se actualiza un registro de mensaje después (!) de la entrega.
    En este caso, el indicador "is_delivered" no se actualiza. Nuestro mensaje fue o no entregado al sistema remoto, pero esta condición no se marcó en el registro del mensaje. La consecuencia es que el mensaje será seleccionado y entregado en el tick, etc. El sistema remoto recibirá nuestro mensaje varias veces. ¡Terrible!
    Cómo solucionarlo: Ver apartado siguiente.

Impedir el envío de un mensaje varias veces en un error de postactualización

La forma más fácil es detener completamente (abortar) nuestro servicio de entrega. Entonces comprobamos qué está mal y reiniciamos después de haber solucionado el error post-update. Debemos establecer manualmente el indicador 'is_delivered' del último mensaje.

Para tener más control, añadimos un indicador 'delivery_in_progress' al registro del mensaje. Ponemos este indicador a "True" antes de enviar el mensaje al sistema remoto. Restablecemos este indicador a "False" cuando actualizamos el registro del mensaje para establecer el indicador "is_delivered". Al seleccionar mensajes para su entrega, ahora también incluimos el indicador "delivery_in_progress".

Los nuevos pasos:

  1. Seleccionar los mensajes de la base de datos que no han sido entregados y cuya entrega no está en curso.
  2. (pre-update) Poner el indicador 'delivery_in_progress' en 'True'.
  3. Entregue uno a uno los mensajes al sistema remoto.
  4. (post-update) Una vez que se ha entregado un mensaje, actualizamos el indicador "is_delivered" a "True" y el indicador "delivery_in_progress" a "False".

Ahora, si la acción posterior a la actualización falla, el mensaje no se envía, pero la operación no se bloquea. Los mensajes nuevos se siguen enviando.

Seleccionar mensajes, antes:

def select_messages(self):
    return [message for message in self.messages if not message.is_delivered]

Se convierte:

def select_messages(self):
    return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]

Este método no es gratuito ya que requiere un comando de actualización extra.

Por si quieres probarlo, aquí tienes el código. Para probar el código he añadido los siguientes 'interruptores para el servicio de entrega:

  • 'inject_post_update_error': para simular un error post-update
  • use_delivery_in_progress': para usar la bandera 'delivery_in_progress' en el registro del mensaje.

Hacemos dos 'ticks' (ejecuciones) para cada una de las siguientes condiciones:

  • Flujo feliz
  • Fallo posterior a la actualización => incorrecto
  • Fallo post-actualización + 'delivery_in_progress'-flag => correcto
# prevent duplicate messages caused by post-update errors
class SendError(Exception):
    pass

class PostUpdateError(Exception):
    pass

class RemoteSystem:
    def send_data(self, data):
        print('remote system: received data = {}'.format(data))

# database message record
class Message:
    def __init__(self, data):
        self.data = data
        self.is_delivered = False
        self.delivery_in_progress = False
    def __str__(self):
        return '<Message: data = {}, is_delivered = {}, delivery_in_progress = {}>'.format(self.data, self.is_delivered, self.delivery_in_progress)

class DeliverService:
    def __init__(self, inject_post_update_error, use_delivery_in_progress):
        # switch to inject update_failure
        self.inject_post_update_error = inject_post_update_error
        # switch to prevent sending duplicates
        self.use_delivery_in_progress = use_delivery_in_progress
        # database
        self.messages = [Message('data1'), Message('data2')]
        # remote system
        self.rem_sys = RemoteSystem()

    def select_messages(self):
        if self.use_delivery_in_progress:
            return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]
        else:
            return [message for message in self.messages if not message.is_delivered]

    def deliver_message(self, message):
        try:
            self.rem_sys.send_data(message.data)
            return True
        except Exception as e:
            raise SendError()

    def deliver_messages(self):
        tick_delivered_count = 0
        tick_send_error_count = 0
        tick_post_update_error_count = 0
        for message in self.select_messages():
            # pre-update message record
            if self.use_delivery_in_progress:
                message.delivery_in_progress = True
            # deliver message
            try:
                self.deliver_message(message)
                tick_delivered_count += 1
            except SendError:
                tick_send_error_count += 1
            # post-update message record
            try:
                if self.inject_post_update_error:
                    raise PostUpdateError()
                message.is_delivered = True
                if self.use_delivery_in_progress:
                    message.delivery_in_progress = False
            except PostUpdateError:
                tick_post_update_error_count += 1
        return (tick_delivered_count, tick_send_error_count, tick_post_update_error_count)

def main():

    tests = [
        {
            'title': 'HAPPY FLOW => CORRECT',
            'inject_post_update_error': False,
            'use_delivery_in_progress': False,
        },
        {
            'title': 'POST-UPDATE FAILURE => WRONG',
            'inject_post_update_error': True,
            'use_delivery_in_progress': False,
        },
        {
            'title': 'POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT',
            'inject_post_update_error': True,
            'use_delivery_in_progress': True,
        },
    ]

    for test in tests:    
        print('\n{}\n{}'.format(test['title'], '-'*40))
        ds = DeliverService(
            inject_post_update_error=test['inject_post_update_error'],
            use_delivery_in_progress=test['use_delivery_in_progress'],
        )
        for tick in range(2):
            # debug
            print('on tick-{}, messages:'.format(tick))
            for message in ds.messages:
                print('- {}'.format(message))
            # deliver messages
            counts = ds.deliver_messages()
            # debug
            print('on tick-{} messages delivered to remote system: {}'.format(tick, counts[0]))
            if test['inject_post_update_error']:
                print('on tick-{} post-update error count: {}'.format(tick, counts[2]))

if __name__ == '__main__':
    main()

Y el resultado después de la ejecución:

HAPPY FLOW => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = True, delivery_in_progress = False>
- <Message: data = data2, is_delivered = True, delivery_in_progress = False>
on tick-1 messages delivered to remote system: 0

POST-UPDATE FAILURE => WRONG
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-1 messages delivered to remote system: 2
on tick-1 post-update error count: 2

POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = True>
- <Message: data = data2, is_delivered = False, delivery_in_progress = True>
on tick-1 messages delivered to remote system: 0
on tick-1 post-update error count: 0

¿Qué hemos solucionado exactamente?

Lo que hemos solucionado es enviar el mismo mensaje varias veces en caso de fallo de post-update. Nuestro servicio sigue intentándolo, eso es bueno. Si una acción posterior a la actualización falla una o dos veces, o durante un breve periodo de tiempo, los mensajes no se enviarán varias veces, y nuestro servicio seguirá funcionando, no se abortará. Es un gran alivio.

Lo malo es que seguimos sin saber si realmente se han enviado mensajes cuando se ha producido el fallo post-update. Una forma de evitarlo es almacenar la respuesta del sistema remoto, junto con el identificador de registro del mensaje, en otro almacén de datos (con dependencias mínimas).

Operaciones idempotentes

Wikipedia: La propiedad de ciertas operaciones en matemáticas y ciencias de la computación por la que se pueden aplicar varias veces sin cambiar el resultado más allá de la aplicación inicial.

Una buena forma es tener un sistema (o servicio) remoto que pueda comprobar si hay mensajes duplicados comprobando un campo específico del mensaje. Entonces simplemente añadimos un identificador único a cada mensaje que creamos. El sistema remoto descartará automáticamente los mensajes duplicados que reciba.

Resumen

Tener en cuenta todos los errores posibles es una de las tareas más importantes a la hora de diseñar una aplicación de software. Entra completamente en conflicto con la presión que sufren los diseñadores y programadores para entregar una aplicación que funcione lo antes posible. Muchos programadores saben que su código tiene algunas vulnerabilidades, pero las limitaciones de tiempo a menudo hacen imposible corregirlas todas antes de que una aplicación entre en producción.

En el código anterior, sólo se ha tenido en cuenta la consecuencia más grave: el envío repetido del mismo mensaje. Esto se solucionó añadiendo una bandera 'delivery_in_progress'. Muchos otros problemas también deben ser resueltos, tales como:

  • Cuántos reintentos antes de abandonar el envío de mensajes al sistema remoto.
  • Cómo controlar los fallos de selección, los fallos de entrega y los fallos de actualización, etc.
  • Cómo recuperarse de los fallos
  • ¿Se permite cambiar el orden de los mensajes al enviarlos?

Por último, la mayoría de los proyectos de software nunca se terminan, se liberan.

Enlaces / créditos

[rabbitmq-discuss] Exactly Once Delivery
https://groups.google.com/g/rabbitmq-discuss/c/eniPTe1aKvk

Handling Duplicate Messages (Idempotent Consumers)
https://codeopinion.com/handling-duplicate-messages-idempotent-consumers

Deje un comentario

Comente de forma anónima o inicie sesión para comentar.

Comentarios

Deje una respuesta.

Responda de forma anónima o inicie sesión para responder.