angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Prevent sending duplicate messages to a remote system

Consideration of all possible states, and conditions, is not desirable but a requirement.

21 February 2023 Updated 21 February 2023
post main image
https://www.pexels.com/nl-nl/@monoar-rahman-22660

Many times applications must send messages to a remote system. In a perfect world we only have to deal with the Happy Flow: There are no bad things happening like exceptions, errors.

Unfortunately, the world is not perfect. Besides programming errors, connections can fail, database systems can fail, remote systems can fail. A simple question - can you write some code to send messages to a remote system - easily can become a very complex project.

In this post I show a way to prevent sending duplicate messages in case the update of a database message record fails.

Trade-offs

If you're developing a system that does financial transactions then the requirements are totally different from a system that sends out tickets by email for a concert. The former must never fail and must be able to restart and continue exactly from where it left off. The system that sends tickets may have a problem a few times a month.  Consumers may then not receive their tickets, or receive their tickets twice.

What I mean by this is that there are always trade-offs. Is it worth spending a lot of time and developing a bulletproof application, or do you accept sporadic failures? If such a sporadic failure happens, you must know the consequences. A failure that prevents an email to be send once in a while is not as bad as a failure that keeps sending out the same emails hundreds or even thousands of times.

Our deliver service

Our deliver service connects to a database with messages that must be send to a remote system.

  +----------+      +----------+      +----------+
  | database |      | deliver- |      | remote   |
  | message  |<-----| service  |----->| system   |
  | records  |      |          |      |          |
  +----------+      +----------+      +----------+

The deliver-service, runs on every tick, for example, it runs every second. The message record has a 'is_delivered'-flag. This flag is initially 'False' and updated to 'True' after a message has been sent. This flag is also used when selecting message records from the database.

Steps:

  1. Select messages from the database that not have been delivered yet.
  2. One-by-one deliver a message to the remote system.
  3. (post-update) Once a message has been delivered, we update the 'is_delivered'-flag.

I call the action in step 3. post-update because it is an update after sending the message.

Here is the Happy Flow code without any error checking and error handling:

# happy flow code

class RemoteSystem:
    def receive_data(self, data):
        self.data = data
        print('remote system: received data = {}'.format(data))

# database message record
class Message:
    def __init__(self, data):
        self.data = data
        self.is_delivered = False
    def __str__(self):
        return '<Message: data = {}, is_delivered = {}>'.format(self.data, self.is_delivered)

class DeliverService:
    def __init__(self):
        # database
        self.messages = [Message('data1'), Message('data2')]
        # remote system
        self.rem_sys = RemoteSystem()

    def select_messages_to_deliver(self):
        return [message for message in self.messages if not message.is_delivered]

    def deliver_message(self, message):
        # send to remote system
        self.rem_sys.receive_data(message.data)

    def deliver_messages(self):
        delivered_count = 0
        for message in self.select_messages_to_deliver():
            # deliver message
            self.deliver_message(message)
            delivered_count += 1
            # update message record
            message.is_delivered = True
        return delivered_count

def main():
    print('\nHAPPY FLOW\n{}'.format('-'*40))
    ds = DeliverService()
    for tick in range(2):
        # debug
        print('on tick-{} messages:'.format(tick))
        for message in ds.messages:
            print('- {}'.format(message))
        # deliver_messages
        delivered_count = ds.deliver_messages()
        # debug
        print('on tick-{} messages delivered to remote system: {}'.format(tick, delivered_count))

if __name__ == '__main__':
    main()

And here is the result after running:

HAPPY FLOW
----------------------------------------
on tick-0 messages:
- <Message: data = data1, is_delivered = False>
- <Message: data = data2, is_delivered = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1 messages:
- <Message: data = data1, is_delivered = True>
- <Message: data = data2, is_delivered = True>
on tick-1 messages delivered to remote system: 0

The messages were delivered on the first tick, on the second tick, no messages were delivered.

Done, ready! That was easy ... in the perfect world ...

Make a list of all things that can go wrong, look at the implications and how to solve them

Because our world is not perfect, we must take into account all the possible errors that can occur. Of each of these errors, we must look at the consequences that this error has on the operation of our service.

Consider temporary, or transient, errors and permanent errors. A transient error typically is one time, or of short duration. If a remote system is not available for a short time, we receive a timeout. An example of a permanent error is a database server that goes down.

For our deliver service, here is a (limited) list of things that can go wrong, their implications and how to solve them:

  • Database-select error: failure to select messages from the database.
    Our messages will not be delivered. That's not good, but that's it. Maybe our database server is temporarily down, we can investigate this further.
    How to solve: We just keep trying until the database responds again.
  • Remote system (temporary) not available.
    Again our messages will not be delivered. That's not good, but that's it.
    How to solve: We already have the 'is_delivered'-flag. We only update this flag to 'True' if the remote system acknowledges reception of our message.
  • Database-post-update error: failure to update a message record after (!) delivery.
    In this case the 'is_delivered'-flag is not updated. Our message was or was not delivered to the remote system, but this condition was not flagged in the message record. The consequence is that the message will be selected and delivered on the tick, etc. The remote system will receive our message multiple times. Terrible!
    How to solve: See next section.

Prevent sending a message multiple times on a post-update error

The most easy way is to completely stop (abort) our deliver service. Then we check what is wrong and restart after having fixed the post-update error. We must manually set the 'is_delivered'-flag of the last message.

To get more control, we add a 'delivery_in_progress'-flag to the message record. We set this flag to 'True' before (!) sending the message to the remote system. We reset this flag to 'False' when updating the message record to set the 'is_delivered'-flag. When selecting messages for delivery we now also include the 'delivery_in_progress'-flag.

The new steps:

  1. Select messages from the database that not have been delivered, and delivery is not in progress.
  2. (pre-update) Set the message record 'delivery_in_progress'-flag to 'True'.
  3. One-by-one deliver a message to the remote system.
  4. (post-update) Once a message has been delivered, we update the 'is_delivered'-flag to 'True' and the 'delivery_in_progress'-flag to 'False'.

Now if the post-update action fails, the message is not sent, but the operation is not blocked. New messages are still sent.

Selecting messages, before:

def select_messages(self):
    return [message for message in self.messages if not message.is_delivered]

Becomes:

def select_messages(self):
    return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]

This method is not for free as it requires one extra update command.

In case you want to try, here is the code. To test the code I added the following 'switches to the deliver service:

  • 'inject_post_update_error': to simulate a post-update error
  • 'use_delivery_in_progress': to use the 'delivery_in_progress'-flag in the message record

We do two 'ticks' (runs) for each of the following conditions:

  • Happy Flow
  • Post-update failure => wrong
  • Post-update failure + 'delivery_in_progress'-flag => correct
# prevent duplicate messages caused by post-update errors
class SendError(Exception):
    pass

class PostUpdateError(Exception):
    pass

class RemoteSystem:
    def send_data(self, data):
        print('remote system: received data = {}'.format(data))

# database message record
class Message:
    def __init__(self, data):
        self.data = data
        self.is_delivered = False
        self.delivery_in_progress = False
    def __str__(self):
        return '<Message: data = {}, is_delivered = {}, delivery_in_progress = {}>'.format(self.data, self.is_delivered, self.delivery_in_progress)

class DeliverService:
    def __init__(self, inject_post_update_error, use_delivery_in_progress):
        # switch to inject update_failure
        self.inject_post_update_error = inject_post_update_error
        # switch to prevent sending duplicates
        self.use_delivery_in_progress = use_delivery_in_progress
        # database
        self.messages = [Message('data1'), Message('data2')]
        # remote system
        self.rem_sys = RemoteSystem()

    def select_messages(self):
        if self.use_delivery_in_progress:
            return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]
        else:
            return [message for message in self.messages if not message.is_delivered]

    def deliver_message(self, message):
        try:
            self.rem_sys.send_data(message.data)
            return True
        except Exception as e:
            raise SendError()

    def deliver_messages(self):
        tick_delivered_count = 0
        tick_send_error_count = 0
        tick_post_update_error_count = 0
        for message in self.select_messages():
            # pre-update message record
            if self.use_delivery_in_progress:
                message.delivery_in_progress = True
            # deliver message
            try:
                self.deliver_message(message)
                tick_delivered_count += 1
            except SendError:
                tick_send_error_count += 1
            # post-update message record
            try:
                if self.inject_post_update_error:
                    raise PostUpdateError()
                message.is_delivered = True
                if self.use_delivery_in_progress:
                    message.delivery_in_progress = False
            except PostUpdateError:
                tick_post_update_error_count += 1
        return (tick_delivered_count, tick_send_error_count, tick_post_update_error_count)

def main():

    tests = [
        {
            'title': 'HAPPY FLOW => CORRECT',
            'inject_post_update_error': False,
            'use_delivery_in_progress': False,
        },
        {
            'title': 'POST-UPDATE FAILURE => WRONG',
            'inject_post_update_error': True,
            'use_delivery_in_progress': False,
        },
        {
            'title': 'POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT',
            'inject_post_update_error': True,
            'use_delivery_in_progress': True,
        },
    ]

    for test in tests:    
        print('\n{}\n{}'.format(test['title'], '-'*40))
        ds = DeliverService(
            inject_post_update_error=test['inject_post_update_error'],
            use_delivery_in_progress=test['use_delivery_in_progress'],
        )
        for tick in range(2):
            # debug
            print('on tick-{}, messages:'.format(tick))
            for message in ds.messages:
                print('- {}'.format(message))
            # deliver messages
            counts = ds.deliver_messages()
            # debug
            print('on tick-{} messages delivered to remote system: {}'.format(tick, counts[0]))
            if test['inject_post_update_error']:
                print('on tick-{} post-update error count: {}'.format(tick, counts[2]))

if __name__ == '__main__':
    main()

And the result after running:

HAPPY FLOW => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = True, delivery_in_progress = False>
- <Message: data = data2, is_delivered = True, delivery_in_progress = False>
on tick-1 messages delivered to remote system: 0

POST-UPDATE FAILURE => WRONG
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-1 messages delivered to remote system: 2
on tick-1 post-update error count: 2

POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = True>
- <Message: data = data2, is_delivered = False, delivery_in_progress = True>
on tick-1 messages delivered to remote system: 0
on tick-1 post-update error count: 0

What exactly did we solve?

What we solved is sending the same message multiple times in case of a post-update failure. Our service keeps trying, that is good. If a post-update action fails once or twice, or for a short time, messages will not be sent multiple times, and our service will keep running, it is not aborted. That is a big relief.

The bad thing is that we still do not know if messages actually have been sent when the post-update failure occurred. One way to get around this, is to store the response of the remote system, together with the message record-id, in another data store (with minimal dependencies).

Idempotent operations

Wikipedia: 'The property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application.'

A good way is to have a remote system (or service) that can check for duplicate messages by checking a specified field in the message. Then we simply add a unique-id to every message we create. The remote system will automatically discard duplicate messages it receives.

Summary

Taking all possible errors into account is one of the most important tasks when designing a software application. It completely conflicts with the pressure on designers and programmers to deliver a working application as quickly as possible. Many programmers know that their code has some vulnerabilities, but time constraints often make it impossible to fix all of them before an application goes into production.

In the code above, only the most serious consequence - sending the same message repeatedly - has been considered. This was solved by adding a 'delivery_in_progress'-flag. Many other problems must also be solved, such as:

  • How many retries before giving up sending messages to the remote system
  • How to monitor select failures, delivery failures and update failures, etc.
  • How to recover from failures
  • Is the order of the messages allowed to change when sending messages

Finally, most software projects are never finished, they are released.

Links / credits

[rabbitmq-discuss] Exactly Once Delivery
https://groups.google.com/g/rabbitmq-discuss/c/eniPTe1aKvk

Handling Duplicate Messages (Idempotent Consumers)
https://codeopinion.com/handling-duplicate-messages-idempotent-consumers

Leave a comment

Comment anonymously or log in to comment.

Comments

Leave a reply

Reply anonymously or log in to reply.