angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Voorkomen dat dubbele berichten naar een extern systeem worden gestuurd

Beschouwing van alle mogelijke toestanden en voorwaarden is niet wenselijk maar een vereiste.

21 februari 2023 Bijgewerkt 21 februari 2023
post main image
https://www.pexels.com/nl-nl/@monoar-rahman-22660

Vaak moeten toepassingen berichten sturen naar een systeem op afstand. In een perfecte wereld hebben we alleen te maken met de Happy Flow: er gebeuren geen slechte dingen zoals uitzonderingen of fouten.

Helaas is de wereld niet perfect. Naast programmeerfouten kunnen verbindingen falen, databasesystemen kunnen falen, systemen op afstand kunnen falen. Een eenvoudige vraag - kun je wat code schrijven om berichten naar een systeem op afstand te sturen - kan gemakkelijk een zeer complex project worden.

In dit bericht laat ik een manier zien om te voorkomen dat dubbele berichten worden verstuurd als het bijwerken van een database berichtrecord mislukt.

Afwegingen

Als je een systeem ontwikkelt dat financiële transacties doet, zijn de eisen totaal verschillend van een systeem dat tickets per e-mail verstuurt voor een concert. Het eerste mag nooit falen en moet opnieuw kunnen opstarten en precies verder gaan waar het gebleven was. Het systeem dat tickets verstuurt kan een paar keer per maand een probleem hebben. De consument ontvangt zijn tickets dan niet, of twee keer.

Wat ik hiermee bedoel is dat er altijd afwegingen zijn. Is het de moeite waard om veel tijd te spenderen en een kogelvrije applicatie te ontwikkelen, of accepteer je sporadische storingen? Als zo'n sporadische storing zich voordoet, moet je weten wat de gevolgen zijn. Een storing waardoor een e-mail eens in de zoveel tijd niet kan worden verzonden, is niet zo erg als een storing waardoor dezelfde e-mails honderden of zelfs duizenden keren worden verzonden.

Onze bezorgdienst

Onze deliver-service maakt verbinding met een database met berichten die naar een extern systeem moeten worden gestuurd.

  +----------+      +----------+      +----------+
  | database |      | deliver- |      | remote   |
  | message  |<-----| service  |----->| system   |
  | records  |      |          |      |          |
  +----------+      +----------+      +----------+

De deliver-service, draait op elke tik, bijvoorbeeld elke seconde. Het berichtrecord heeft een 'is_delivered'-vlag. Deze vlag is aanvankelijk "False" en wordt bijgewerkt tot "True" nadat een bericht is verzonden. Deze vlag wordt ook gebruikt bij het selecteren van berichtrecords uit de database.

Stappen:

  1. Selecteer berichten uit de database die nog niet zijn afgeleverd.
  2. Eén voor één een bericht afleveren aan het externe systeem.
  3. (post-update) Zodra een bericht is afgeleverd, werken we de 'is_delivered'-vlag bij.

Ik noem de actie in stap 3. post-update omdat het een update is na het verzenden van het bericht.

Hier is de Happy Flow code zonder enige foutcontrole en foutafhandeling:

# happy flow code

class RemoteSystem:
    def receive_data(self, data):
        self.data = data
        print('remote system: received data = {}'.format(data))

# database message record
class Message:
    def __init__(self, data):
        self.data = data
        self.is_delivered = False
    def __str__(self):
        return '<Message: data = {}, is_delivered = {}>'.format(self.data, self.is_delivered)

class DeliverService:
    def __init__(self):
        # database
        self.messages = [Message('data1'), Message('data2')]
        # remote system
        self.rem_sys = RemoteSystem()

    def select_messages_to_deliver(self):
        return [message for message in self.messages if not message.is_delivered]

    def deliver_message(self, message):
        # send to remote system
        self.rem_sys.receive_data(message.data)

    def deliver_messages(self):
        delivered_count = 0
        for message in self.select_messages_to_deliver():
            # deliver message
            self.deliver_message(message)
            delivered_count += 1
            # update message record
            message.is_delivered = True
        return delivered_count

def main():
    print('\nHAPPY FLOW\n{}'.format('-'*40))
    ds = DeliverService()
    for tick in range(2):
        # debug
        print('on tick-{} messages:'.format(tick))
        for message in ds.messages:
            print('- {}'.format(message))
        # deliver_messages
        delivered_count = ds.deliver_messages()
        # debug
        print('on tick-{} messages delivered to remote system: {}'.format(tick, delivered_count))

if __name__ == '__main__':
    main()

En hier is het resultaat na het uitvoeren:

HAPPY FLOW
----------------------------------------
on tick-0 messages:
- <Message: data = data1, is_delivered = False>
- <Message: data = data2, is_delivered = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1 messages:
- <Message: data = data1, is_delivered = True>
- <Message: data = data2, is_delivered = True>
on tick-1 messages delivered to remote system: 0

De berichten werden op de eerste tick afgeleverd, op de tweede tick werden geen berichten afgeleverd.

Klaar, klaar! Dat was makkelijk ... in de perfecte wereld ...

Maak een lijst van alle dingen die fout kunnen gaan, kijk naar de gevolgen en hoe ze op te lossen

Omdat onze wereld niet perfect is, moeten we rekening houden met alle mogelijke fouten die kunnen optreden. Van elk van deze fouten moeten we kijken naar de gevolgen die deze fout heeft voor de werking van onze dienst.

Denk aan tijdelijke, of voorbijgaande, fouten en permanente fouten. Een tijdelijke fout is meestal eenmalig, of van korte duur. Als een systeem op afstand korte tijd niet beschikbaar is, krijgen we een time-out. Een voorbeeld van een permanente fout is een databaseserver die uitvalt.

Voor onze leveringsdienst is hier een (beperkte) lijst van dingen die fout kunnen gaan, hun gevolgen en hoe ze op te lossen:

  • Database-select fout: mislukte selectie van berichten uit de database.
    Onze berichten worden niet afgeleverd. Dat is niet goed, maar dat is het wel. Misschien is onze databaseserver tijdelijk down, we kunnen dit verder onderzoeken.
    Hoe op te lossen: We blijven gewoon proberen tot de database weer reageert.
  • Remote systeem (tijdelijk) niet beschikbaar.
    Opnieuw worden onze berichten niet afgeleverd. Dat is niet goed, maar dat is het wel.
    Hoe op te lossen: We hebben al de 'is_delivered'-vlag. We updaten deze vlag alleen naar 'True' als het externe systeem de ontvangst van ons bericht bevestigt.
  • Database-post-update fout: falen om een berichtrecord bij te werken na (!) levering.
    In dit geval wordt de "is_delivered"-vlag niet bijgewerkt. Ons bericht werd wel of niet afgeleverd bij het externe systeem, maar deze voorwaarde werd niet gemarkeerd in het berichtrecord. Het gevolg is dat het bericht wordt geselecteerd en afgeleverd op het vinkje, enz. Het systeem op afstand zal ons bericht meerdere keren ontvangen. Verschrikkelijk!
    Hoe op te lossen: Zie volgende sectie.

Voorkomen dat een bericht meerdere keren wordt verzonden bij een post-update fout

De meest eenvoudige manier is om onze bezorgservice volledig te stoppen (af te breken). Dan controleren we wat er mis is en starten we opnieuw op nadat we de post-update fout hebben opgelost. We moeten handmatig de 'is_delivered'-flag van het laatste bericht instellen.

Om meer controle te krijgen, voegen we een 'delivery_in_progress'-vlag toe aan het berichtrecord. We zetten deze vlag op 'True' voordat (!) we het bericht naar het externe systeem sturen. We zetten deze vlag op 'False' wanneer we het berichtrecord bijwerken om de 'is_delivered'-vlag in te stellen. Bij het selecteren van berichten voor aflevering nemen we nu ook de 'delivery_in_progress'-vlag op.

De nieuwe stappen:

  1. Selecteer berichten uit de database die niet zijn afgeleverd en waarvan de aflevering niet bezig is.
  2. (pre-update) Zet de berichtrecord 'delivery_in_progress'-flag op 'True'.
  3. Eén voor één een bericht afleveren bij het externe systeem.
  4. (post-update) Zodra een bericht is afgeleverd, werken we de "is_delivered"-vlag bij naar "True" en de "delivery_in_progress"-vlag naar "False".

Als nu de post-update actie mislukt, wordt het bericht niet verzonden, maar de operatie wordt niet geblokkeerd. Nieuwe berichten worden nog steeds verzonden.

Selecteren van berichten, voorheen:

def select_messages(self):
    return [message for message in self.messages if not message.is_delivered]

Wordt:

def select_messages(self):
    return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]

Deze methode is niet gratis, omdat er een extra update-commando voor nodig is.

Voor het geval je het wilt proberen, hier is de code. Om de code te testen heb ik de volgende 'switches' toegevoegd aan de deliver service:

  • 'inject_post_update_error': om een post-update fout te simuleren
  • use_delivery_in_progress': om de 'delivery_in_progress'-flag te gebruiken in het berichtrecord.

We doen twee 'ticks' (runs) voor elk van de volgende voorwaarden:

  • Happy Flow
  • Post-update mislukking => fout
  • Post-update mislukking + 'delivery_in_progress'-flag => correct
# prevent duplicate messages caused by post-update errors
class SendError(Exception):
    pass

class PostUpdateError(Exception):
    pass

class RemoteSystem:
    def send_data(self, data):
        print('remote system: received data = {}'.format(data))

# database message record
class Message:
    def __init__(self, data):
        self.data = data
        self.is_delivered = False
        self.delivery_in_progress = False
    def __str__(self):
        return '<Message: data = {}, is_delivered = {}, delivery_in_progress = {}>'.format(self.data, self.is_delivered, self.delivery_in_progress)

class DeliverService:
    def __init__(self, inject_post_update_error, use_delivery_in_progress):
        # switch to inject update_failure
        self.inject_post_update_error = inject_post_update_error
        # switch to prevent sending duplicates
        self.use_delivery_in_progress = use_delivery_in_progress
        # database
        self.messages = [Message('data1'), Message('data2')]
        # remote system
        self.rem_sys = RemoteSystem()

    def select_messages(self):
        if self.use_delivery_in_progress:
            return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]
        else:
            return [message for message in self.messages if not message.is_delivered]

    def deliver_message(self, message):
        try:
            self.rem_sys.send_data(message.data)
            return True
        except Exception as e:
            raise SendError()

    def deliver_messages(self):
        tick_delivered_count = 0
        tick_send_error_count = 0
        tick_post_update_error_count = 0
        for message in self.select_messages():
            # pre-update message record
            if self.use_delivery_in_progress:
                message.delivery_in_progress = True
            # deliver message
            try:
                self.deliver_message(message)
                tick_delivered_count += 1
            except SendError:
                tick_send_error_count += 1
            # post-update message record
            try:
                if self.inject_post_update_error:
                    raise PostUpdateError()
                message.is_delivered = True
                if self.use_delivery_in_progress:
                    message.delivery_in_progress = False
            except PostUpdateError:
                tick_post_update_error_count += 1
        return (tick_delivered_count, tick_send_error_count, tick_post_update_error_count)

def main():

    tests = [
        {
            'title': 'HAPPY FLOW => CORRECT',
            'inject_post_update_error': False,
            'use_delivery_in_progress': False,
        },
        {
            'title': 'POST-UPDATE FAILURE => WRONG',
            'inject_post_update_error': True,
            'use_delivery_in_progress': False,
        },
        {
            'title': 'POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT',
            'inject_post_update_error': True,
            'use_delivery_in_progress': True,
        },
    ]

    for test in tests:    
        print('\n{}\n{}'.format(test['title'], '-'*40))
        ds = DeliverService(
            inject_post_update_error=test['inject_post_update_error'],
            use_delivery_in_progress=test['use_delivery_in_progress'],
        )
        for tick in range(2):
            # debug
            print('on tick-{}, messages:'.format(tick))
            for message in ds.messages:
                print('- {}'.format(message))
            # deliver messages
            counts = ds.deliver_messages()
            # debug
            print('on tick-{} messages delivered to remote system: {}'.format(tick, counts[0]))
            if test['inject_post_update_error']:
                print('on tick-{} post-update error count: {}'.format(tick, counts[2]))

if __name__ == '__main__':
    main()

En het resultaat na het uitvoeren:

HAPPY FLOW => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = True, delivery_in_progress = False>
- <Message: data = data2, is_delivered = True, delivery_in_progress = False>
on tick-1 messages delivered to remote system: 0

POST-UPDATE FAILURE => WRONG
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-1 messages delivered to remote system: 2
on tick-1 post-update error count: 2

POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = True>
- <Message: data = data2, is_delivered = False, delivery_in_progress = True>
on tick-1 messages delivered to remote system: 0
on tick-1 post-update error count: 0

Wat hebben we precies opgelost?

Wat we hebben opgelost is het meerdere keren versturen van hetzelfde bericht bij een post-update mislukking. Onze service blijft het proberen, dat is goed. Als een post-update actie één of twee keer mislukt, of voor een korte tijd, worden berichten niet meerdere keren verzonden, en onze dienst blijft draaien, hij wordt niet afgebroken. Dat is een grote opluchting.

Het vervelende is dat we nog steeds niet weten of de berichten daadwerkelijk zijn verzonden toen de post-update mislukte. Een manier om dit te omzeilen is het antwoord van het systeem op afstand, samen met het record-id van het bericht, op te slaan in een andere data store (met minimale afhankelijkheden).

Idempotente operaties

Wikipedia: "De eigenschap van bepaalde bewerkingen in de wiskunde en informatica waarbij ze meerdere keren kunnen worden toegepast zonder dat het resultaat verandert na de eerste toepassing.

Een goede manier is een systeem (of dienst) op afstand dat kan controleren op dubbele berichten door een bepaald veld in het bericht te controleren. Dan voegen we gewoon een unique-id toe aan elk bericht dat we maken. Het externe systeem zal automatisch dubbele berichten die het ontvangt, verwijderen.

Samenvatting

Rekening houden met alle mogelijke fouten is een van de belangrijkste taken bij het ontwerpen van een softwaretoepassing. Het botst volledig met de druk op ontwerpers en programmeurs om zo snel mogelijk een werkende applicatie af te leveren. Veel programmeurs weten dat hun code een aantal kwetsbaarheden bevat, maar tijdsdruk maakt het vaak onmogelijk om ze allemaal op te lossen voordat een applicatie in productie gaat.

In de bovenstaande code is alleen rekening gehouden met het ernstigste gevolg - het herhaaldelijk verzenden van hetzelfde bericht. Dit werd opgelost door een 'delivery_in_progress'-flag toe te voegen. Veel andere problemen moeten ook worden opgelost, zoals:

  • Hoeveel nieuwe pogingen voordat het verzenden van berichten naar het systeem op afstand wordt opgegeven?
  • Hoe controleer je selectiefouten, afleveringsfouten en updatefouten, enz.
  • Hoe te herstellen van mislukkingen
  • Mag de volgorde van de berichten veranderen bij het verzenden van berichten

Tenslotte zijn de meeste softwareprojecten nooit af, ze worden vrijgegeven.

Links / credits

[rabbitmq-discuss] Exactly Once Delivery
https://groups.google.com/g/rabbitmq-discuss/c/eniPTe1aKvk

Handling Duplicate Messages (Idempotent Consumers)
https://codeopinion.com/handling-duplicate-messages-idempotent-consumers

Laat een reactie achter

Reageer anoniem of log in om commentaar te geven.

Opmerkingen

Laat een antwoord achter

Antwoord anoniem of log in om te antwoorden.