angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Verhinderung des Versands doppelter Nachrichten an ein entferntes System

Die Berücksichtigung aller möglichen Zustände und Bedingungen ist nicht wünschenswert, sondern eine Voraussetzung.

21 Februar 2023
post main image
https://www.pexels.com/nl-nl/@monoar-rahman-22660

Oftmals müssen Anwendungen Nachrichten an ein entferntes System senden. In einer perfekten Welt hätten wir es nur mit dem Happy Flow zu tun: Es passieren keine schlimmen Dinge wie Ausnahmen oder Fehler.

Leider ist die Welt nicht perfekt. Neben Programmierfehlern können auch Verbindungen ausfallen, Datenbanksysteme können ausfallen, entfernte Systeme können ausfallen. Eine einfache Frage - können Sie etwas Code schreiben, um Nachrichten an ein entferntes System zu senden - kann leicht zu einem sehr komplexen Projekt werden.

In diesem Beitrag zeige ich einen Weg, um zu verhindern, dass doppelte Nachrichten gesendet werden, wenn die Aktualisierung eines Datenbanknachrichtendatensatzes fehlschlägt.

Kompromisse

Wenn Sie ein System entwickeln, das Finanztransaktionen durchführt, dann sind die Anforderungen völlig anders als bei einem System, das Eintrittskarten für ein Konzert per E-Mail verschickt. Ersteres darf niemals ausfallen und muss in der Lage sein, neu zu starten und genau dort fortzufahren, wo es aufgehört hat. Das System, das die Eintrittskarten verschickt, kann ein paar Mal im Monat ein Problem haben, so dass die Verbraucher ihre Eintrittskarten nicht oder zweimal erhalten.

Was ich damit sagen will, ist, dass es immer Abwägungen gibt. Lohnt es sich, viel Zeit in die Entwicklung einer kugelsicheren Anwendung zu investieren, oder nimmt man sporadische Ausfälle in Kauf? Wenn ein solcher sporadischer Fehler auftritt, müssen Sie die Konsequenzen kennen. Ein Fehler, der den Versand einer E-Mail verhindert, ist nicht so schlimm wie ein Fehler, der dazu führt, dass dieselben E-Mails Hunderte oder gar Tausende von Malen verschickt werden.

Unser Zustelldienst

Unser Zustelldienst stellt eine Verbindung zu einer Datenbank mit Nachrichten her, die an ein entferntes System gesendet werden müssen.

  +----------+      +----------+      +----------+
  | database |      | deliver- |      | remote   |
  | message  |<-----| service  |----->| system   |
  | records  |      |          |      |          |
  +----------+      +----------+      +----------+

Der Zustelldienst wird bei jedem Tick ausgeführt, zum Beispiel jede Sekunde. Der Nachrichtendatensatz hat ein 'is_delivered'-Flag. Dieses Flag ist zunächst "False" und wird nach dem Versand einer Nachricht auf "True" aktualisiert. Dieses Flag wird auch bei der Auswahl von Nachrichtensätzen aus der Datenbank verwendet.

Schritte:

  1. Auswahl von Nachrichten aus der Datenbank, die noch nicht zugestellt worden sind.
  2. Eine Nachricht nach der anderen an das entfernte System zustellen.
  3. (post-update) Sobald eine Nachricht zugestellt wurde, aktualisieren wir das 'is_delivered'-Flag.

Ich nenne die Aktion in Schritt 3. post-update, weil es sich um eine Aktualisierung nach dem Senden der Nachricht handelt.

Hier ist der Happy Flow Code ohne jegliche Fehlerprüfung und Fehlerbehandlung:

# happy flow code

class RemoteSystem:
    def receive_data(self, data):
        self.data = data
        print('remote system: received data = {}'.format(data))

# database message record
class Message:
    def __init__(self, data):
        self.data = data
        self.is_delivered = False
    def __str__(self):
        return '<Message: data = {}, is_delivered = {}>'.format(self.data, self.is_delivered)

class DeliverService:
    def __init__(self):
        # database
        self.messages = [Message('data1'), Message('data2')]
        # remote system
        self.rem_sys = RemoteSystem()

    def select_messages_to_deliver(self):
        return [message for message in self.messages if not message.is_delivered]

    def deliver_message(self, message):
        # send to remote system
        self.rem_sys.receive_data(message.data)

    def deliver_messages(self):
        delivered_count = 0
        for message in self.select_messages_to_deliver():
            # deliver message
            self.deliver_message(message)
            delivered_count += 1
            # update message record
            message.is_delivered = True
        return delivered_count

def main():
    print('\nHAPPY FLOW\n{}'.format('-'*40))
    ds = DeliverService()
    for tick in range(2):
        # debug
        print('on tick-{} messages:'.format(tick))
        for message in ds.messages:
            print('- {}'.format(message))
        # deliver_messages
        delivered_count = ds.deliver_messages()
        # debug
        print('on tick-{} messages delivered to remote system: {}'.format(tick, delivered_count))

if __name__ == '__main__':
    main()

Und hier ist das Ergebnis nach der Ausführung:

HAPPY FLOW
----------------------------------------
on tick-0 messages:
- <Message: data = data1, is_delivered = False>
- <Message: data = data2, is_delivered = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1 messages:
- <Message: data = data1, is_delivered = True>
- <Message: data = data2, is_delivered = True>
on tick-1 messages delivered to remote system: 0

Die Nachrichten wurden beim ersten Tick zugestellt, beim zweiten Tick wurden keine Nachrichten zugestellt.

Geschafft, fertig! Das war einfach ... in der perfekten Welt ...

Machen Sie eine Liste aller Dinge, die schief gehen können, schauen Sie sich die Auswirkungen an und wie man sie lösen kann

Da unsere Welt nicht perfekt ist, müssen wir alle möglichen Fehler, die auftreten können, in Betracht ziehen. Bei jedem dieser Fehler müssen wir die Folgen betrachten, die dieser Fehler für den Betrieb unseres Dienstes hat.

Es gibt temporäre oder vorübergehende Fehler und permanente Fehler. Ein vorübergehender Fehler ist in der Regel einmalig oder von kurzer Dauer. Wenn ein entferntes System für eine kurze Zeit nicht verfügbar ist, erhalten wir eine Zeitüberschreitung. Ein Beispiel für einen permanenten Fehler ist ein ausgefallener Datenbankserver.

Für unseren Lieferdienst finden Sie hier eine (begrenzte) Liste von Fehlern, die auftreten können, ihre Auswirkungen und wie sie zu beheben sind:

  • Datenbank-Selektionsfehler: Nachrichten können nicht aus der Datenbank ausgewählt werden.
    Unsere Nachrichten werden nicht zugestellt. Das ist nicht gut, aber das war's. Vielleicht ist unser Datenbankserver vorübergehend ausgefallen, wir können das weiter untersuchen.
    Wie man das Problem löst: Wir versuchen es einfach weiter, bis die Datenbank wieder antwortet.
  • Entferntes System (vorübergehend) nicht verfügbar.
    Wieder werden unsere Nachrichten nicht zugestellt. Das ist nicht gut, aber das war's.
    Wie man das löst: Wir haben bereits das 'is_delivered'-Flag. Wir aktualisieren dieses Flag nur dann auf 'True', wenn das entfernte System den Empfang unserer Nachricht bestätigt.
  • Datenbank-Post-Update-Fehler: Ein Nachrichtendatensatz konnte nach (!) der Zustellung nicht aktualisiert werden.
    In diesem Fall wird das 'is_delivered'-Flag nicht aktualisiert. Unsere Nachricht wurde an das entfernte System zugestellt oder nicht, aber dieser Zustand wurde im Nachrichtendatensatz nicht vermerkt. Die Folge ist, dass die Nachricht ausgewählt und zugestellt wird, wenn sie angekreuzt wird usw. Das entfernte System wird unsere Nachricht mehrfach erhalten. Schrecklich!
    Die Lösung: Siehe nächster Abschnitt.

Verhindern, dass eine Nachricht bei einem Post-Update-Fehler mehrfach gesendet wird

Der einfachste Weg ist, unseren Zustelldienst komplett anzuhalten (abzubrechen). Dann prüfen wir, was falsch ist und starten neu, nachdem wir den Post-Update-Fehler behoben haben. Wir müssen das 'is_delivered'-Flag der letzten Nachricht manuell setzen.

Um mehr Kontrolle zu erhalten, fügen wir ein 'delivery_in_progress'-Flag zum Nachrichtendatensatz hinzu. Wir setzen dieses Kennzeichen auf 'True', bevor (!) wir die Nachricht an das entfernte System senden. Wir setzen dieses Kennzeichen auf 'False' zurück, wenn wir den Nachrichtendatensatz aktualisieren, um das 'is_delivered'-Kennzeichen zu setzen. Bei der Auswahl von Nachrichten für die Zustellung wird nun auch das 'delivery_in_progress'-Flag gesetzt.

Die neuen Schritte:

  1. Auswahl von Nachrichten aus der Datenbank, die noch nicht zugestellt wurden und deren Zustellung noch nicht begonnen hat.
  2. (vor dem Update) Setzen Sie den Nachrichtendatensatz 'delivery_in_progress'-Flag auf 'True'.
  3. Stellen Sie eine Nachricht nach der anderen an das entfernte System zu.
  4. (post-update) Sobald eine Nachricht zugestellt wurde, aktualisieren wir das 'is_delivered'-Flag auf 'True' und das 'delivery_in_progress'-Flag auf 'False'.

Wenn nun die Post-Update-Aktion fehlschlägt, wird die Nachricht nicht gesendet, aber der Vorgang wird nicht blockiert. Neue Nachrichten werden weiterhin versendet.

Auswählen von Nachrichten, vorher:

def select_messages(self):
    return [message for message in self.messages if not message.is_delivered]

Wird:

def select_messages(self):
    return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]

Diese Methode ist nicht kostenlos, da sie einen zusätzlichen Aktualisierungsbefehl erfordert.

Falls Sie es ausprobieren wollen, hier ist der Code. Um den Code zu testen, fügte ich dem Zustelldienst die folgenden 'Schalter hinzu:

  • inject_post_update_error': um einen Post-Update-Fehler zu simulieren
  • use_delivery_in_progress': um das 'delivery_in_progress'-Flag im Nachrichtensatz zu verwenden

Wir führen zwei 'Ticks' (Durchläufe) für jede der folgenden Bedingungen durch:

  • Glücklicher Fluss
  • Fehler bei der Nachverbuchung => falsch
  • Nachverbuchungsfehler + 'delivery_in_progress'-Kennzeichen => richtig
# prevent duplicate messages caused by post-update errors
class SendError(Exception):
    pass

class PostUpdateError(Exception):
    pass

class RemoteSystem:
    def send_data(self, data):
        print('remote system: received data = {}'.format(data))

# database message record
class Message:
    def __init__(self, data):
        self.data = data
        self.is_delivered = False
        self.delivery_in_progress = False
    def __str__(self):
        return '<Message: data = {}, is_delivered = {}, delivery_in_progress = {}>'.format(self.data, self.is_delivered, self.delivery_in_progress)

class DeliverService:
    def __init__(self, inject_post_update_error, use_delivery_in_progress):
        # switch to inject update_failure
        self.inject_post_update_error = inject_post_update_error
        # switch to prevent sending duplicates
        self.use_delivery_in_progress = use_delivery_in_progress
        # database
        self.messages = [Message('data1'), Message('data2')]
        # remote system
        self.rem_sys = RemoteSystem()

    def select_messages(self):
        if self.use_delivery_in_progress:
            return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]
        else:
            return [message for message in self.messages if not message.is_delivered]

    def deliver_message(self, message):
        try:
            self.rem_sys.send_data(message.data)
            return True
        except Exception as e:
            raise SendError()

    def deliver_messages(self):
        tick_delivered_count = 0
        tick_send_error_count = 0
        tick_post_update_error_count = 0
        for message in self.select_messages():
            # pre-update message record
            if self.use_delivery_in_progress:
                message.delivery_in_progress = True
            # deliver message
            try:
                self.deliver_message(message)
                tick_delivered_count += 1
            except SendError:
                tick_send_error_count += 1
            # post-update message record
            try:
                if self.inject_post_update_error:
                    raise PostUpdateError()
                message.is_delivered = True
                if self.use_delivery_in_progress:
                    message.delivery_in_progress = False
            except PostUpdateError:
                tick_post_update_error_count += 1
        return (tick_delivered_count, tick_send_error_count, tick_post_update_error_count)

def main():

    tests = [
        {
            'title': 'HAPPY FLOW => CORRECT',
            'inject_post_update_error': False,
            'use_delivery_in_progress': False,
        },
        {
            'title': 'POST-UPDATE FAILURE => WRONG',
            'inject_post_update_error': True,
            'use_delivery_in_progress': False,
        },
        {
            'title': 'POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT',
            'inject_post_update_error': True,
            'use_delivery_in_progress': True,
        },
    ]

    for test in tests:    
        print('\n{}\n{}'.format(test['title'], '-'*40))
        ds = DeliverService(
            inject_post_update_error=test['inject_post_update_error'],
            use_delivery_in_progress=test['use_delivery_in_progress'],
        )
        for tick in range(2):
            # debug
            print('on tick-{}, messages:'.format(tick))
            for message in ds.messages:
                print('- {}'.format(message))
            # deliver messages
            counts = ds.deliver_messages()
            # debug
            print('on tick-{} messages delivered to remote system: {}'.format(tick, counts[0]))
            if test['inject_post_update_error']:
                print('on tick-{} post-update error count: {}'.format(tick, counts[2]))

if __name__ == '__main__':
    main()

Und das Ergebnis nach der Ausführung:

HAPPY FLOW => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = True, delivery_in_progress = False>
- <Message: data = data2, is_delivered = True, delivery_in_progress = False>
on tick-1 messages delivered to remote system: 0

POST-UPDATE FAILURE => WRONG
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-1 messages delivered to remote system: 2
on tick-1 post-update error count: 2

POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = True>
- <Message: data = data2, is_delivered = False, delivery_in_progress = True>
on tick-1 messages delivered to remote system: 0
on tick-1 post-update error count: 0

Was genau haben wir gelöst?

Was wir gelöst haben, ist das mehrmalige Senden der gleichen Nachricht im Falle eines Post-Update-Fehlers. Unser Dienst versucht es weiter, das ist gut. Wenn eine Post-Update-Aktion ein- oder zweimal oder für eine kurze Zeit fehlschlägt, werden die Nachrichten nicht mehrfach gesendet, und unser Dienst läuft weiter, er wird nicht abgebrochen. Das ist eine große Erleichterung.

Der Nachteil ist, dass wir immer noch nicht wissen, ob die Nachrichten tatsächlich gesendet wurden, als der Post-Update-Fehler auftrat. Eine Möglichkeit, dies zu umgehen, besteht darin, die Antwort des entfernten Systems zusammen mit der Record-ID der Nachricht in einem anderen Datenspeicher (mit minimalen Abhängigkeiten) zu speichern.

Idempotente Operationen

Wikipedia: Die Eigenschaft bestimmter Operationen in der Mathematik und Informatik, dass sie mehrfach angewendet werden können, ohne dass sich das Ergebnis über die erste Anwendung hinaus ändert.

Eine gute Möglichkeit ist, ein entferntes System (oder einen Dienst) zu haben, das auf doppelte Nachrichten prüfen kann, indem es ein bestimmtes Feld in der Nachricht überprüft. Dann fügen wir einfach eine eindeutige ID zu jeder Nachricht hinzu, die wir erstellen. Das entfernte System verwirft automatisch doppelte Nachrichten, die es erhält.

Zusammenfassung

Die Berücksichtigung aller möglichen Fehler ist eine der wichtigsten Aufgaben beim Entwurf einer Softwareanwendung. Sie steht in völligem Widerspruch zu dem Druck auf Designer und Programmierer, eine funktionierende Anwendung so schnell wie möglich zu liefern. Viele Programmierer wissen, dass ihr Code einige Schwachstellen aufweist, aber aus Zeitgründen ist es oft unmöglich, alle zu beheben, bevor eine Anwendung in Produktion geht.

Im obigen Code wurde nur die schwerwiegendste Folge - das wiederholte Senden derselben Nachricht - berücksichtigt. Dies wurde durch Hinzufügen eines 'delivery_in_progress'-Flags gelöst. Viele andere Probleme müssen ebenfalls gelöst werden, wie z. B.:

  • Wie viele Wiederholungsversuche, bevor das Senden von Nachrichten an das entfernte System aufgegeben wird
  • Wie können Auswahlfehler, Zustellungsfehler, Aktualisierungsfehler usw. überwacht werden?
  • Wie kann man sich von Fehlern erholen?
  • Darf sich die Reihenfolge der Nachrichten beim Versenden ändern?

Schließlich werden die meisten Softwareprojekte nie abgeschlossen, sondern nur freigegeben.

Links / Impressum

[rabbitmq-discuss] Exactly Once Delivery
https://groups.google.com/g/rabbitmq-discuss/c/eniPTe1aKvk

Handling Duplicate Messages (Idempotent Consumers)
https://codeopinion.com/handling-duplicate-messages-idempotent-consumers

Einen Kommentar hinterlassen

Kommentieren Sie anonym oder melden Sie sich zum Kommentieren an.

Kommentare

Eine Antwort hinterlassen

Antworten Sie anonym oder melden Sie sich an, um zu antworten.