Предотвращение отправки дубликатов сообщений в удаленную систему
Рассмотрение всех возможных состояний и условий является не желательным, а обязательным условием.
Много раз приложения должны отправлять сообщения в удаленную систему. В идеальном мире нам приходится иметь дело только со счастливым потоком: не происходит никаких плохих вещей, таких как исключения, ошибки.
К сожалению, мир не идеален. Помимо ошибок программирования, соединения могут не работать, системы баз данных могут не работать, удаленные системы могут не работать. Простой вопрос - не могли бы вы написать код для отправки сообщений удаленной системе - легко может превратиться в очень сложный проект.
В этой заметке я покажу способ предотвращения отправки дубликатов сообщений в случае сбоя обновления записи сообщения в базе данных.
Компромиссы
Если вы разрабатываете систему, выполняющую финансовые операции, то требования к ней совершенно иные, чем к системе, рассылающей по электронной почте билеты на концерт. Первая никогда не должна давать сбоев и должна быть способна перезапуститься и продолжить работу с того места, на котором остановилась. Система, рассылающая билеты, может иметь проблемы несколько раз в месяц, и тогда потребители могут не получить свои билеты или получить их дважды.
Я хочу сказать, что всегда есть компромиссы. Стоит ли тратить много времени и разрабатывать пуленепробиваемое приложение, или лучше смириться со спорадическими сбоями? Если произойдет такой спорадический сбой, вы должны знать его последствия. Сбой, который не позволяет отправить электронное письмо время от времени, не так плох, как сбой, который продолжает отправлять одни и те же письма сотни или даже тысячи раз.
Наша служба доставки
Наша служба deliver подключается к базе данных с сообщениями, которые должны быть отправлены в удаленную систему.
+----------+ +----------+ +----------+
| database | | deliver- | | remote |
| message |<-----| service |----->| system |
| records | | | | |
+----------+ +----------+ +----------+
Служба доставки запускается каждый такт, например, каждую секунду. Запись сообщения имеет флаг 'is_delivered'-. Этот флаг изначально имеет значение 'False' и обновляется до 'True' после отправки сообщения. Этот флаг также используется при выборе записей сообщений из базы данных.
Шаги:
- Выберите из базы данных сообщения, которые еще не были доставлены.
- По очереди доставить сообщение в удаленную систему.
- (post-update) Как только сообщение доставлено, обновляем флаг 'is_delivered'-.
Я называю действие в шаге 3 post-update, потому что это обновление после отправки сообщения.
Вот код Happy Flow без проверки и обработки ошибок:
# happy flow code
class RemoteSystem:
def receive_data(self, data):
self.data = data
print('remote system: received data = {}'.format(data))
# database message record
class Message:
def __init__(self, data):
self.data = data
self.is_delivered = False
def __str__(self):
return '<Message: data = {}, is_delivered = {}>'.format(self.data, self.is_delivered)
class DeliverService:
def __init__(self):
# database
self.messages = [Message('data1'), Message('data2')]
# remote system
self.rem_sys = RemoteSystem()
def select_messages_to_deliver(self):
return [message for message in self.messages if not message.is_delivered]
def deliver_message(self, message):
# send to remote system
self.rem_sys.receive_data(message.data)
def deliver_messages(self):
delivered_count = 0
for message in self.select_messages_to_deliver():
# deliver message
self.deliver_message(message)
delivered_count += 1
# update message record
message.is_delivered = True
return delivered_count
def main():
print('\nHAPPY FLOW\n{}'.format('-'*40))
ds = DeliverService()
for tick in range(2):
# debug
print('on tick-{} messages:'.format(tick))
for message in ds.messages:
print('- {}'.format(message))
# deliver_messages
delivered_count = ds.deliver_messages()
# debug
print('on tick-{} messages delivered to remote system: {}'.format(tick, delivered_count))
if __name__ == '__main__':
main()
А вот результат после выполнения:
HAPPY FLOW
----------------------------------------
on tick-0 messages:
- <Message: data = data1, is_delivered = False>
- <Message: data = data2, is_delivered = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1 messages:
- <Message: data = data1, is_delivered = True>
- <Message: data = data2, is_delivered = True>
on tick-1 messages delivered to remote system: 0
На первом тике сообщения были доставлены, на втором тике сообщения не были доставлены.
Готово, готово! Это было просто... в идеальном мире...
Составьте список всех вещей, которые могут пойти не так, посмотрите на последствия и способы их решения
Поскольку наш мир не идеален, мы должны учесть все возможные ошибки, которые могут произойти. Для каждой из этих ошибок мы должны рассмотреть последствия, которые эта ошибка имеет для работы нашего сервиса.
Рассмотрим временные, или переходные, ошибки и постоянные ошибки. Переходная ошибка обычно бывает однократной или кратковременной. Если удаленная система недоступна в течение короткого времени, мы получаем тайм-аут. Примером постоянной ошибки является выход из строя сервера базы данных.
Для нашей службы доставки вот (ограниченный) список вещей, которые могут пойти не так, их последствия и способы их решения:
- Ошибка выбора базы данных: невозможно выбрать сообщения из базы данных.
Наши сообщения не будут доставлены. Это нехорошо, но это все. Возможно, наш сервер базы данных временно не работает, мы можем выяснить это дополнительно.
Как решить: Просто продолжаем попытки, пока база данных снова не ответит. - Удаленная система (временно) недоступна.
И снова наши сообщения не будут доставлены. Это не очень хорошо, но это все.
Как решить: У нас уже есть флаг 'is_delivered'-. Мы обновляем этот флаг до 'True', только если удаленная система подтверждает получение нашего сообщения. - Ошибка обновления базы данных: невозможность обновления записи сообщения после (!) доставки.
В этом случае флаг 'is_delivered'- не обновляется. Наше сообщение было или не было доставлено в удаленную систему, но это условие не было отмечено в записи сообщения. Следствием этого является то, что сообщение будет выбрано и доставлено по тику и т.д. Удаленная система получит наше сообщение несколько раз. Ужасно!
Как решить: См. следующий раздел.
Предотвращение многократной отправки сообщения при ошибке после обновления
Самый простой способ - полностью остановить (прервать) нашу службу доставки. Затем мы проверяем, что не так, и перезапускаем после устранения ошибки обновления. Мы должны вручную установить 'is_delivered'-флаг последнего сообщения.
Чтобы получить больше контроля, мы добавляем флаг 'delivery_in_progress'- к записи сообщения. Мы устанавливаем этот флаг в значение 'True' перед (!) отправкой сообщения в удаленную систему. Мы сбрасываем этот флаг на 'False' при обновлении записи сообщения для установки флага 'is_delivered'-. При выборе сообщений для доставки мы теперь также включаем флаг 'delivery_in_progress'-.
Новые шаги:
- Выбрать из базы данных сообщения, которые не были доставлены, и доставка которых не выполняется.
- (предварительное обновление) Установите флаг 'delivery_in_progress'-записи сообщения в 'True'.
- Поочередно доставить сообщение в удаленную систему.
- (post-update) Как только сообщение доставлено, мы обновляем флаг 'is_delivered'- на 'True' и флаг 'delivery_in_progress'- на 'False'.
Теперь, если действие post-update завершится неудачей, сообщение не будет отправлено, но операция не будет заблокирована. Новые сообщения по-прежнему отправляются.
Выбор сообщений, раньше:
def select_messages(self):
return [message for message in self.messages if not message.is_delivered]
Становится:
def select_messages(self):
return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]
Этот метод не бесплатный, так как требует одной дополнительной команды обновления.
Если вы хотите попробовать, вот код. Для проверки кода я добавил следующие 'переключатели в службу доставки:
- 'inject_post_update_error': для имитации ошибки после обновления
- 'use_delivery_in_progress': для использования флага 'delivery_in_progress'- в записи сообщения.
Мы делаем два "тика" (прогона) для каждого из следующих условий:
- Счастливый поток
- Сбой после обновления => неправильно
- Сбой после обновления + 'delivery_in_progress'-flag => правильно
# prevent duplicate messages caused by post-update errors
class SendError(Exception):
pass
class PostUpdateError(Exception):
pass
class RemoteSystem:
def send_data(self, data):
print('remote system: received data = {}'.format(data))
# database message record
class Message:
def __init__(self, data):
self.data = data
self.is_delivered = False
self.delivery_in_progress = False
def __str__(self):
return '<Message: data = {}, is_delivered = {}, delivery_in_progress = {}>'.format(self.data, self.is_delivered, self.delivery_in_progress)
class DeliverService:
def __init__(self, inject_post_update_error, use_delivery_in_progress):
# switch to inject update_failure
self.inject_post_update_error = inject_post_update_error
# switch to prevent sending duplicates
self.use_delivery_in_progress = use_delivery_in_progress
# database
self.messages = [Message('data1'), Message('data2')]
# remote system
self.rem_sys = RemoteSystem()
def select_messages(self):
if self.use_delivery_in_progress:
return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]
else:
return [message for message in self.messages if not message.is_delivered]
def deliver_message(self, message):
try:
self.rem_sys.send_data(message.data)
return True
except Exception as e:
raise SendError()
def deliver_messages(self):
tick_delivered_count = 0
tick_send_error_count = 0
tick_post_update_error_count = 0
for message in self.select_messages():
# pre-update message record
if self.use_delivery_in_progress:
message.delivery_in_progress = True
# deliver message
try:
self.deliver_message(message)
tick_delivered_count += 1
except SendError:
tick_send_error_count += 1
# post-update message record
try:
if self.inject_post_update_error:
raise PostUpdateError()
message.is_delivered = True
if self.use_delivery_in_progress:
message.delivery_in_progress = False
except PostUpdateError:
tick_post_update_error_count += 1
return (tick_delivered_count, tick_send_error_count, tick_post_update_error_count)
def main():
tests = [
{
'title': 'HAPPY FLOW => CORRECT',
'inject_post_update_error': False,
'use_delivery_in_progress': False,
},
{
'title': 'POST-UPDATE FAILURE => WRONG',
'inject_post_update_error': True,
'use_delivery_in_progress': False,
},
{
'title': 'POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT',
'inject_post_update_error': True,
'use_delivery_in_progress': True,
},
]
for test in tests:
print('\n{}\n{}'.format(test['title'], '-'*40))
ds = DeliverService(
inject_post_update_error=test['inject_post_update_error'],
use_delivery_in_progress=test['use_delivery_in_progress'],
)
for tick in range(2):
# debug
print('on tick-{}, messages:'.format(tick))
for message in ds.messages:
print('- {}'.format(message))
# deliver messages
counts = ds.deliver_messages()
# debug
print('on tick-{} messages delivered to remote system: {}'.format(tick, counts[0]))
if test['inject_post_update_error']:
print('on tick-{} post-update error count: {}'.format(tick, counts[2]))
if __name__ == '__main__':
main()
И результат после выполнения:
HAPPY FLOW => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = True, delivery_in_progress = False>
- <Message: data = data2, is_delivered = True, delivery_in_progress = False>
on tick-1 messages delivered to remote system: 0
POST-UPDATE FAILURE => WRONG
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-1 messages delivered to remote system: 2
on tick-1 post-update error count: 2
POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = True>
- <Message: data = data2, is_delivered = False, delivery_in_progress = True>
on tick-1 messages delivered to remote system: 0
on tick-1 post-update error count: 0
Что именно мы решили?
Мы решили проблему отправки одного и того же сообщения несколько раз в случае сбоя обновления. Наш сервис продолжает попытки, это хорошо. Если действие после обновления не удается один или два раза, или в течение короткого времени, сообщения не будут отправляться несколько раз, и наш сервис будет продолжать работать, он не будет прерван. Это большое облегчение.
Плохо то, что мы все еще не знаем, были ли сообщения на самом деле отправлены, когда произошел сбой после обновления. Один из способов обойти это - хранить ответ удаленной системы вместе с идентификатором записи сообщения в другом хранилище данных (с минимальными зависимостями).
Идемпотентные операции
Википедия: "Свойство некоторых операций в математике и информатике, при котором они могут применяться многократно без изменения результата после первоначального применения".
Хороший способ - иметь удаленную систему (или службу), которая может проверять дубликаты сообщений путем проверки определенного поля в сообщении. Тогда мы просто добавляем уникальный идентификатор к каждому создаваемому нами сообщению. Удаленная система будет автоматически отбрасывать дубликаты сообщений, которые она получает.
Резюме
Учет всех возможных ошибок - одна из самых важных задач при разработке программного приложения. Она полностью противоречит давлению на дизайнеров и программистов, требующих как можно быстрее предоставить работающее приложение. Многие программисты знают, что в их коде есть уязвимости, но нехватка времени часто не позволяет устранить их все до того, как приложение будет запущено в производство.
В приведенном выше коде было учтено только самое серьезное последствие - многократная отправка одного и того же сообщения. Это было решено путем добавления флага 'delivery_in_progress'-. Необходимо также решить множество других проблем, таких как:
- Сколько повторных попыток нужно сделать, прежде чем отказаться от отправки сообщений в удаленную систему.
- Как отслеживать сбои выбора, сбои доставки, сбои обновления и т.д.
- Как восстанавливаться после сбоев
- Можно ли менять порядок сообщений при их отправке.
Наконец, большинство программных проектов никогда не завершаются, они выпускаются.
Ссылки / кредиты
[rabbitmq-discuss] Exactly Once Delivery
https://groups.google.com/g/rabbitmq-discuss/c/eniPTe1aKvk
Handling Duplicate Messages (Idempotent Consumers)
https://codeopinion.com/handling-duplicate-messages-idempotent-consumers
Подробнее
API Exceptions Internet
Недавний
Большинство просмотренных
- Используя Python pyOpenSSL для проверки SSL-сертификатов, загруженных с хоста
- Использование PyInstaller и Cython для создания исполняемого файла Python
- Уменьшение времени отклика на запросы на странице Flask SQLAlchemy веб-сайта
- Подключение к службе на хосте Docker из контейнера Docker
- SQLAlchemy: Использование Cascade Deletes для удаления связанных объектов
- Использование UUID вместо Integer Autoincrement Primary Keys с SQLAlchemy и MariaDb