angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Empêcher l'envoi de messages en double à un système distant

La prise en compte de tous les états et conditions possibles n'est pas souhaitable mais obligatoire.

21 février 2023 Mise à jour 21 février 2023
Dans API, Internet, Autre
post main image
https://www.pexels.com/nl-nl/@monoar-rahman-22660

Les applications doivent souvent envoyer des messages à un système distant. Dans un monde parfait, nous n'avons à nous occuper que du flux heureux : il n'y a pas de mauvaises choses qui se produisent, comme des exceptions ou des erreurs.

Malheureusement, le monde n'est pas parfait. Outre les erreurs de programmation, les connexions peuvent échouer, les systèmes de base de données peuvent échouer, les systèmes distants peuvent échouer. Une simple question - pouvez-vous écrire du code pour envoyer des messages à un système distant - peut facilement devenir un projet très complexe.

Dans ce post, je montre une façon d'éviter d'envoyer des messages en double dans le cas où la mise à jour d'un enregistrement de message de la base de données échoue.

Compromis

Si vous développez un système qui effectue des transactions financières, les exigences sont totalement différentes de celles d'un système qui envoie des billets par e-mail pour un concert. Le premier ne doit jamais tomber en panne et doit être capable de redémarrer et de reprendre exactement là où il s'est arrêté. Le système qui envoie les billets peut avoir un problème quelques fois par mois, et les consommateurs peuvent alors ne pas recevoir leurs billets, ou les recevoir deux fois.

Ce que je veux dire par là, c'est qu'il y a toujours des compromis à faire. Cela vaut-il la peine de passer beaucoup de temps à développer une application à toute épreuve ou faut-il accepter des défaillances sporadiques ? Si une telle défaillance sporadique se produit, vous devez en connaître les conséquences. Une panne qui empêche l'envoi d'un courriel une fois de temps en temps n'est pas aussi grave qu'une panne qui continue à envoyer les mêmes courriels des centaines, voire des milliers de fois.

Notre service de livraison

Notre service de livraison se connecte à une base de données contenant des messages qui doivent être envoyés à un système distant.

  +----------+      +----------+      +----------+
  | database |      | deliver- |      | remote   |
  | message  |<-----| service  |----->| system   |
  | records  |      |          |      |          |
  +----------+      +----------+      +----------+

Le service de livraison s'exécute à chaque tic, par exemple toutes les secondes. L'enregistrement du message comporte un drapeau 'is_delivered'. Cet indicateur est initialement 'False' et mis à jour en 'True' après l'envoi d'un message. Cet indicateur est également utilisé lors de la sélection des enregistrements de messages dans la base de données.

Etapes :

  1. Sélectionnez les messages de la base de données qui n'ont pas encore été livrés.
  2. Délivrer un par un les messages au système distant.
  3. (post-update) Une fois qu'un message a été livré, nous mettons à jour le drapeau 'is_delivered'.

J'appelle l'action de l'étape 3. post-update parce que c'est une mise à jour après l'envoi du message.

Voici le code de Happy Flow sans aucune vérification et gestion des erreurs :

# happy flow code

class RemoteSystem:
    def receive_data(self, data):
        self.data = data
        print('remote system: received data = {}'.format(data))

# database message record
class Message:
    def __init__(self, data):
        self.data = data
        self.is_delivered = False
    def __str__(self):
        return '<Message: data = {}, is_delivered = {}>'.format(self.data, self.is_delivered)

class DeliverService:
    def __init__(self):
        # database
        self.messages = [Message('data1'), Message('data2')]
        # remote system
        self.rem_sys = RemoteSystem()

    def select_messages_to_deliver(self):
        return [message for message in self.messages if not message.is_delivered]

    def deliver_message(self, message):
        # send to remote system
        self.rem_sys.receive_data(message.data)

    def deliver_messages(self):
        delivered_count = 0
        for message in self.select_messages_to_deliver():
            # deliver message
            self.deliver_message(message)
            delivered_count += 1
            # update message record
            message.is_delivered = True
        return delivered_count

def main():
    print('\nHAPPY FLOW\n{}'.format('-'*40))
    ds = DeliverService()
    for tick in range(2):
        # debug
        print('on tick-{} messages:'.format(tick))
        for message in ds.messages:
            print('- {}'.format(message))
        # deliver_messages
        delivered_count = ds.deliver_messages()
        # debug
        print('on tick-{} messages delivered to remote system: {}'.format(tick, delivered_count))

if __name__ == '__main__':
    main()

Et voici le résultat après exécution :

HAPPY FLOW
----------------------------------------
on tick-0 messages:
- <Message: data = data1, is_delivered = False>
- <Message: data = data2, is_delivered = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1 messages:
- <Message: data = data1, is_delivered = True>
- <Message: data = data2, is_delivered = True>
on tick-1 messages delivered to remote system: 0

Les messages ont été délivrés au premier tick, au second tick, aucun message n'a été délivré.

C'est fait, c'est prêt ! C'était facile ... dans un monde parfait ...

Faites une liste de toutes les choses qui peuvent mal tourner, regardez les implications et comment les résoudre.

Parce que notre monde n'est pas parfait, nous devons prendre en compte toutes les erreurs possibles qui peuvent se produire. Pour chacune de ces erreurs, nous devons examiner les conséquences que cette erreur a sur le fonctionnement de notre service.

Considérons les erreurs temporaires, ou transitoires, et les erreurs permanentes. Une erreur transitoire est généralement ponctuelle ou de courte durée. Si un système distant n'est pas disponible pendant une courte période, nous recevons un timeout. Un exemple d'erreur permanente est un serveur de base de données qui tombe en panne.

Pour notre service de livraison, voici une liste (limitée) de choses qui peuvent mal tourner, leurs implications et comment les résoudre :

  • Erreur de sélection de la base de données : échec de la sélection des messages dans la base de données.
    Nos messages ne seront pas livrés. Ce n'est pas bon, mais c'est tout. Peut-être que notre serveur de base de données est temporairement hors service, nous pouvons examiner cela plus en détail.
    Comment résoudre ce problème ? Nous continuons simplement à essayer jusqu'à ce que la base de données réponde à nouveau.
  • Système distant (temporaire) non disponible.
    Une fois de plus, nos messages ne seront pas transmis. Ce n'est pas bon, mais c'est comme ça.
    Comment résoudre ce problème ? Nous avons déjà le drapeau 'is_delivered'. Nous ne mettons à jour ce drapeau à 'True' que si le système distant accuse réception de notre message.
  • Erreur de post-mise à jour de la base de données : échec de la mise à jour d'un enregistrement de message après ( !) la livraison.
    Dans ce cas, le drapeau "is_delivered" n'est pas mis à jour. Notre message a été ou n'a pas été remis au système distant, mais cette condition n'a pas été signalée dans l'enregistrement du message. La conséquence est que le message sera sélectionné et remis sur le tic-tac, etc. Le système distant recevra notre message plusieurs fois. C'est terrible !
    Comment résoudre ce problème ? Voir la section suivante.

Empêcher l'envoi multiple d'un message en cas d'erreur de post-mise à jour

Le moyen le plus simple est d'arrêter complètement (interrompre) notre service de livraison. Ensuite, nous vérifions ce qui ne va pas et nous redémarrons après avoir corrigé l'erreur post-update. Nous devons définir manuellement le drapeau 'is_delivered' du dernier message.

Pour avoir plus de contrôle, nous ajoutons un indicateur "delivery_in_progress" à l'enregistrement du message. Nous mettons cet indicateur à 'True' avant ( !) d'envoyer le message au système distant. Nous remettons cet indicateur à 'False' lorsque nous mettons à jour l'enregistrement du message pour définir l'indicateur 'is_delivered'. Lors de la sélection des messages à livrer, nous incluons également l'indicateur 'delivery_in_progress'.

Les nouvelles étapes :

  1. Sélectionner dans la base de données les messages qui n'ont pas été livrés et dont la livraison n'est pas en cours.
  2. (pré-mise à jour) Définir l'enregistrement de message 'delivery_in_progress'-flag à 'True'.
  3. Délivrer un par un les messages au système distant.
  4. (post-update) Une fois qu'un message a été livré, nous mettons à jour l'indicateur "is_delivered" à "True" et l'indicateur "delivery_in_progress" à "False".

Maintenant, si l'action post-mise à jour échoue, le message n'est pas envoyé, mais l'opération n'est pas bloquée. Les nouveaux messages sont toujours envoyés.

Sélection des messages, avant :

def select_messages(self):
    return [message for message in self.messages if not message.is_delivered]

Devient :

def select_messages(self):
    return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]

Cette méthode n'est pas gratuite car elle nécessite une commande de mise à jour supplémentaire.

Au cas où vous voudriez essayer, voici le code. Pour tester le code, j'ai ajouté les interrupteurs suivants au service de livraison :

  • 'inject_post_update_error' : pour simuler une erreur post-mise à jour.
  • 'use_delivery_in_progress' : pour utiliser le drapeau 'delivery_in_progress' dans l'enregistrement du message.

Nous effectuons deux "ticks" (passages) pour chacune des conditions suivantes :

  • Happy Flow
  • Échec post-mise à jour => faux
  • Echec post-mise à jour + 'delivery_in_progress'-flag => correct
# prevent duplicate messages caused by post-update errors
class SendError(Exception):
    pass

class PostUpdateError(Exception):
    pass

class RemoteSystem:
    def send_data(self, data):
        print('remote system: received data = {}'.format(data))

# database message record
class Message:
    def __init__(self, data):
        self.data = data
        self.is_delivered = False
        self.delivery_in_progress = False
    def __str__(self):
        return '<Message: data = {}, is_delivered = {}, delivery_in_progress = {}>'.format(self.data, self.is_delivered, self.delivery_in_progress)

class DeliverService:
    def __init__(self, inject_post_update_error, use_delivery_in_progress):
        # switch to inject update_failure
        self.inject_post_update_error = inject_post_update_error
        # switch to prevent sending duplicates
        self.use_delivery_in_progress = use_delivery_in_progress
        # database
        self.messages = [Message('data1'), Message('data2')]
        # remote system
        self.rem_sys = RemoteSystem()

    def select_messages(self):
        if self.use_delivery_in_progress:
            return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]
        else:
            return [message for message in self.messages if not message.is_delivered]

    def deliver_message(self, message):
        try:
            self.rem_sys.send_data(message.data)
            return True
        except Exception as e:
            raise SendError()

    def deliver_messages(self):
        tick_delivered_count = 0
        tick_send_error_count = 0
        tick_post_update_error_count = 0
        for message in self.select_messages():
            # pre-update message record
            if self.use_delivery_in_progress:
                message.delivery_in_progress = True
            # deliver message
            try:
                self.deliver_message(message)
                tick_delivered_count += 1
            except SendError:
                tick_send_error_count += 1
            # post-update message record
            try:
                if self.inject_post_update_error:
                    raise PostUpdateError()
                message.is_delivered = True
                if self.use_delivery_in_progress:
                    message.delivery_in_progress = False
            except PostUpdateError:
                tick_post_update_error_count += 1
        return (tick_delivered_count, tick_send_error_count, tick_post_update_error_count)

def main():

    tests = [
        {
            'title': 'HAPPY FLOW => CORRECT',
            'inject_post_update_error': False,
            'use_delivery_in_progress': False,
        },
        {
            'title': 'POST-UPDATE FAILURE => WRONG',
            'inject_post_update_error': True,
            'use_delivery_in_progress': False,
        },
        {
            'title': 'POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT',
            'inject_post_update_error': True,
            'use_delivery_in_progress': True,
        },
    ]

    for test in tests:    
        print('\n{}\n{}'.format(test['title'], '-'*40))
        ds = DeliverService(
            inject_post_update_error=test['inject_post_update_error'],
            use_delivery_in_progress=test['use_delivery_in_progress'],
        )
        for tick in range(2):
            # debug
            print('on tick-{}, messages:'.format(tick))
            for message in ds.messages:
                print('- {}'.format(message))
            # deliver messages
            counts = ds.deliver_messages()
            # debug
            print('on tick-{} messages delivered to remote system: {}'.format(tick, counts[0]))
            if test['inject_post_update_error']:
                print('on tick-{} post-update error count: {}'.format(tick, counts[2]))

if __name__ == '__main__':
    main()

Et le résultat après exécution :

HAPPY FLOW => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = True, delivery_in_progress = False>
- <Message: data = data2, is_delivered = True, delivery_in_progress = False>
on tick-1 messages delivered to remote system: 0

POST-UPDATE FAILURE => WRONG
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-1 messages delivered to remote system: 2
on tick-1 post-update error count: 2

POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = True>
- <Message: data = data2, is_delivered = False, delivery_in_progress = True>
on tick-1 messages delivered to remote system: 0
on tick-1 post-update error count: 0

Qu'avons-nous résolu exactement ?

Ce que nous avons résolu est l'envoi multiple du même message en cas d'échec de la post-mise à jour. Notre service continue d'essayer, c'est bien. Si une action de post-mise à jour échoue une ou deux fois, ou pendant une courte période, les messages ne seront pas envoyés plusieurs fois, et notre service continuera à fonctionner, il ne sera pas interrompu. C'est un grand soulagement.

Le problème, c'est que nous ne savons toujours pas si les messages ont effectivement été envoyés lorsque l'échec de la post-mise à jour s'est produit. Une façon de contourner ce problème est de stocker la réponse du système distant, ainsi que l'identifiant d'enregistrement du message, dans un autre magasin de données (avec des dépendances minimales).

Opérations idempotentes

Wikipédia : La propriété de certaines opérations en mathématiques et en informatique selon laquelle elles peuvent être appliquées plusieurs fois sans que le résultat ne soit modifié au-delà de l'application initiale".

Un bon moyen est d'avoir un système (ou un service) distant qui peut vérifier les messages en double en vérifiant un champ spécifique dans le message. Il suffit ensuite d'ajouter un identifiant unique à chaque message que nous créons. Le système distant éliminera automatiquement les messages en double qu'il reçoit.

Résumé

La prise en compte de toutes les erreurs possibles est l'une des tâches les plus importantes lors de la conception d'une application logicielle. Elle est en contradiction totale avec la pression exercée sur les concepteurs et les programmeurs pour qu'ils livrent une application fonctionnelle aussi rapidement que possible. De nombreux programmeurs savent que leur code présente certaines vulnérabilités, mais les contraintes de temps font qu'il est souvent impossible de toutes les corriger avant la mise en production de l'application.

Dans le code ci-dessus, seule la conséquence la plus grave - l'envoi répété du même message - a été prise en compte. Ce problème a été résolu en ajoutant un drapeau "delivery_in_progress". De nombreux autres problèmes doivent également être résolus, tels que :

  • Combien de tentatives avant d'abandonner l'envoi de messages au système distant.
  • Comment surveiller les échecs de sélection, les échecs de livraison et les échecs de mise à jour, etc.
  • Comment récupérer les échecs
  • L'ordre des messages peut-il être modifié lors de l'envoi des messages ?

Enfin, la plupart des projets logiciels ne sont jamais terminés, ils sont lancés.

Liens / crédits

[rabbitmq-discuss] Exactly Once Delivery
https://groups.google.com/g/rabbitmq-discuss/c/eniPTe1aKvk

Handling Duplicate Messages (Idempotent Consumers)
https://codeopinion.com/handling-duplicate-messages-idempotent-consumers

Laissez un commentaire

Commentez anonymement ou connectez-vous pour commenter.

Commentaires

Laissez une réponse

Répondez de manière anonyme ou connectez-vous pour répondre.