Testing the RabbitMQ Pika publishing examples
With RabbitMQ Pika asynchronous publishing do not forget to include all conditions to avoid data loss.

A lot has been written about synchronous vs asynchronous publishing with RabbitMQ, see links below, I am not going to repeat this here. As this is my first time using RabbitMQ, I wanted to try both the synchronous and asynchronous publishing versions,
using examples included with Pika, the RabbitMQ (AMQP 0-9-1) client library for Python.
To my surprise, the Pika asynchronous example did not detect when the queue was removed, it kept sending messages. What is going on here?
How to test
We use the RabbitMQ with management Docker image, the management interface is at:
http://127.0.0.1:15672
I changed the credentials here to 'user'/'password'.
I created two Python scripts based on Pika examples:
- synchronous publishing, using BlockingConnection
- asynchronous publishing, using SelectConnection
The scripts keep publishing messages every 2 seconds.
When a script is running (publishing messages to a queue) I create the following conditions:
- Terminate RabbitMQ
- Delete the exchange
- Delete the queue
Terminating RabbitMQ is done by killing the Docker container. Deleting the exchange and deleting the queue is done from the RabbitMQ management interface. Let's see what happens.
Synchronous publishing using BlockingConnection
The code:
# blockingconnection.py
# based on:
# Using Delivery Confirmations with the BlockingConnection
# https://pika.readthedocs.io/en/stable/examples/blocking_delivery_confirmations.html?highlight=blockingconnection
# changes made:
# - username/password = user/password
# - added to exchange_declare: durable=True, auto_delete=False
# - added to queue_declare: durable=True, exclusive=False, auto_delete=False
import pika
from pika.exchange_type import ExchangeType
import time
username = 'user'
password = 'password'
my_exchange = 'my_test_exchange'
my_exchange_type = ExchangeType.direct
my_queue = 'my_test_queue'
my_routing_key = my_queue
# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection(
pika.ConnectionParameters(
credentials=pika.PlainCredentials(
username,
password
),
),
)
# get channel
channel = connection.channel()
# declare our exchange
channel.exchange_declare(
exchange=my_exchange,
exchange_type=my_exchange_type,
durable=True,
auto_delete=False,
)
# declare our queue
channel.queue_declare(
queue=my_queue,
durable=True,
exclusive=False,
auto_delete=False,
)
# bind queue to exchange
channel.queue_bind(
my_queue,
my_exchange,
routing_key=my_routing_key,
)
# Turn on delivery confirmations
channel.confirm_delivery()
# Send messages
for i in range(20):
try:
channel.basic_publish(
exchange=my_exchange,
routing_key=my_routing_key,
body='Hello World!',
properties=pika.BasicProperties(
content_type='text/plain',
delivery_mode=2,
),
mandatory=True,
),
print('Message {} publish was confirmed'.format(i))
except Exception as e:
e_name = type(e).__name__
print('Message {} could not be confirmed, exception = {}, e = {}'.format(i, e_name, e))
time.sleep(2)
1. Synchronous: Terminate RabbitMQ
When RabbitMQ is terminated, the message cannot be confirmed. Good.
...
Message 1 publish was confirmed
Message 2 publish was confirmed
Message 3 publish was confirmed
Message 4 could not be confirmed, exception = ConnectionClosedByBroker, e = (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
Message 5 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 6 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 7 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
...
2. Synchronous: Delete exchange
When the exchange is deleted, the message cannot be confirmed. Good.
...
Message 3 publish was confirmed
Message 4 publish was confirmed
Message 5 publish was confirmed
Message 6 could not be confirmed, exception = ChannelClosedByBroker, e = (404, "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'")
Message 7 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 8 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 9 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
...
3. Synchronous: Delete queue
When the queue is deleted, the message cannot be confirmed. Good.
...
Message 6 publish was confirmed
Message 7 publish was confirmed
Message 8 publish was confirmed
Message 9 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
Message 10 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
Message 11 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
...
Asynchronous publishing using SelectConnection
The code:
# selectconnection.py
# based on:
# https://github.com/pika/pika/blob/master/examples/asynchronous_publisher_example.py
# changes made:
# - username/password = user/password
# - added username, password to connect string
# - added to exchange_declare: durable=True, auto_delete=False
# - added to queue_declare: durable=True, exclusive=False, auto_delete=False
# - added to basic_publish: mandatory=True
# - added to basic_publish properties: delivery_mode=2
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205
import functools
import logging
import json
import pika
from pika.exchange_type import ExchangeType
LOG_FORMAT = ('%(asctime)s %(levelname) -8s [%(funcName) -35s %(lineno) 5d] %(message)s')
LOGGER = logging.getLogger(__name__)
logging_level = logging.INFO
username = 'user'
password = 'password'
my_exchange = 'my_test_exchange'
my_exchange_type = ExchangeType.direct
my_queue = 'my_test_queue'
my_routing_key = my_queue
class ExamplePublisher(object):
"""This is an example publisher that will handle unexpected interactions
with RabbitMQ such as channel and connection closures.
If RabbitMQ closes the connection, it will reopen it. You should
look at the output, as there are limited reasons why the connection may
be closed, which usually are tied to permission related issues or
socket timeouts.
It uses delivery confirmations and illustrates one way to keep track of
messages that have been sent and if they've been confirmed by RabbitMQ.
"""
EXCHANGE = my_exchange
EXCHANGE_TYPE = my_exchange_type
QUEUE = my_queue
ROUTING_KEY = my_routing_key
PUBLISH_INTERVAL = 2
def __init__(self, amqp_url):
"""Setup the example publisher object, passing in the URL we will use
to connect to RabbitMQ.
:param str amqp_url: The URL for connecting to RabbitMQ
"""
self._connection = None
self._channel = None
self._deliveries = None
self._acked = None
self._nacked = None
self._message_number = None
self._stopping = False
self._url = amqp_url
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika.
:rtype: pika.SelectConnection
"""
LOGGER.info('Connecting to %s', self._url)
return pika.SelectConnection(
pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed)
def on_connection_open(self, _unused_connection):
"""This method is called by pika once the connection to RabbitMQ has
been established. It passes the handle to the connection object in
case we need it, but in this case, we'll just mark it unused.
:param pika.SelectConnection _unused_connection: The connection
"""
LOGGER.info('Connection opened')
self.open_channel()
def on_connection_open_error(self, _unused_connection, err):
"""This method is called by pika if the connection to RabbitMQ
can't be established.
:param pika.SelectConnection _unused_connection: The connection
:param Exception err: The error
"""
LOGGER.error('Connection open failed, reopening in 5 seconds: %s', err)
self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
def on_connection_closed(self, _unused_connection, reason):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.
:param pika.connection.Connection connection: The closed connection obj
:param Exception reason: exception representing reason for loss of
connection.
"""
self._channel = None
if self._stopping:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
reason)
self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
def open_channel(self):
"""This method will open a new channel with RabbitMQ by issuing the
Channel.Open RPC command. When RabbitMQ confirms the channel is open
by sending the Channel.OpenOK RPC reply, the on_channel_open method
will be invoked.
"""
LOGGER.info('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, channel):
"""This method is invoked by pika when the channel has been opened.
The channel object is passed in so we can make use of it.
Since the channel is now open, we'll declare the exchange to use.
:param pika.channel.Channel channel: The channel object
"""
LOGGER.info('Channel opened')
self._channel = channel
self.add_on_channel_close_callback()
# # (1 of 3) add to detect unroutable messages
# # self.add_on_channel_return_callback()
self.setup_exchange(self.EXCHANGE)
def add_on_channel_close_callback(self):
"""This method tells pika to call the on_channel_closed method if
RabbitMQ unexpectedly closes the channel.
"""
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)
def on_channel_closed(self, channel, reason):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.
:param pika.channel.Channel channel: The closed channel
:param Exception reason: why the channel was closed
"""
LOGGER.warning('Channel %i was closed: %s', channel, reason)
self._channel = None
if not self._stopping:
self._connection.close()
# # (2 of 3) add to detect unroutable messages
def add_on_channel_return_callback(self):
LOGGER.info('Adding channel return callback')
self._channel.add_on_return_callback(self.on_channel_returned)
# # (3 of 3) add to detect unroutable messages
def on_channel_returned(self, channel, method, properties, body):
"""Invoked by pika when RabbitMQ unexpectedly returns from the channel.
"""
LOGGER.warning('Channel = {} returned, method = {}, properties = {}, body = {}'.format(channel, method, properties, body))
self._channel = None
if not self._stopping:
self._connection.close()
def setup_exchange(self, exchange_name):
"""Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
command. When it is complete, the on_exchange_declareok method will
be invoked by pika.
:param str|unicode exchange_name: The name of the exchange to declare
"""
LOGGER.info('Declaring exchange %s', exchange_name)
# Note: using functools.partial is not required, it is demonstrating
# how arbitrary data can be passed to the callback when it is called
cb = functools.partial(
self.on_exchange_declareok, userdata=exchange_name)
self._channel.exchange_declare(
exchange=exchange_name,
exchange_type=self.EXCHANGE_TYPE,
durable=True,
auto_delete=False,
callback=cb)
def on_exchange_declareok(self, _unused_frame, userdata):
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
command.
:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
:param str|unicode userdata: Extra user data (exchange name)
"""
LOGGER.info('Exchange declared: %s', userdata)
self.setup_queue(self.QUEUE)
def setup_queue(self, queue_name):
"""Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
command. When it is complete, the on_queue_declareok method will
be invoked by pika.
:param str|unicode queue_name: The name of the queue to declare.
"""
LOGGER.info('Declaring queue %s', queue_name)
self._channel.queue_declare(
queue=queue_name,
durable=True,
exclusive=False,
auto_delete=False,
callback=self.on_queue_declareok,
)
def on_queue_declareok(self, _unused_frame):
"""Method invoked by pika when the Queue.Declare RPC call made in
setup_queue has completed. In this method we will bind the queue
and exchange together with the routing key by issuing the Queue.Bind
RPC command. When this command is complete, the on_bindok method will
be invoked by pika.
:param pika.frame.Method method_frame: The Queue.DeclareOk frame
"""
LOGGER.info('Binding %s to %s with %s', self.EXCHANGE, self.QUEUE,
self.ROUTING_KEY)
self._channel.queue_bind(
self.QUEUE,
self.EXCHANGE,
routing_key=self.ROUTING_KEY,
callback=self.on_bindok)
def on_bindok(self, _unused_frame):
"""This method is invoked by pika when it receives the Queue.BindOk
response from RabbitMQ. Since we know we're now setup and bound, it's
time to start publishing."""
LOGGER.info('Queue bound')
self.start_publishing()
def start_publishing(self):
"""This method will enable delivery confirmations and schedule the
first message to be sent to RabbitMQ
"""
LOGGER.info('Issuing consumer related RPC commands')
self.enable_delivery_confirmations()
self.schedule_next_message()
def enable_delivery_confirmations(self):
"""Send the Confirm.Select RPC method to RabbitMQ to enable delivery
confirmations on the channel. The only way to turn this off is to close
the channel and create a new one.
When the message is confirmed from RabbitMQ, the
on_delivery_confirmation method will be invoked passing in a Basic.Ack
or Basic.Nack method from RabbitMQ that will indicate which messages it
is confirming or rejecting.
"""
LOGGER.info('Issuing Confirm.Select RPC command')
self._channel.confirm_delivery(self.on_delivery_confirmation)
def on_delivery_confirmation(self, method_frame):
"""Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
command, passing in either a Basic.Ack or Basic.Nack frame with
the delivery tag of the message that was published. The delivery tag
is an integer counter indicating the message number that was sent
on the channel via Basic.Publish. Here we're just doing house keeping
to keep track of stats and remove message numbers that we expect
a delivery confirmation of from the list used to keep track of messages
that are pending confirmation.
:param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame
"""
LOGGER.info('Received method_frame: {}'.format(method_frame))
confirmation_type = method_frame.method.NAME.split('.')[1].lower()
LOGGER.info('Received %s for delivery tag: %i', confirmation_type,
method_frame.method.delivery_tag)
if confirmation_type == 'ack':
self._acked += 1
elif confirmation_type == 'nack':
self._nacked += 1
self._deliveries.remove(method_frame.method.delivery_tag)
LOGGER.info(
'Published %i messages, %i have yet to be confirmed, '
'%i were acked and %i were nacked', self._message_number,
len(self._deliveries), self._acked, self._nacked)
def schedule_next_message(self):
"""If we are not closing our connection to RabbitMQ, schedule another
message to be delivered in PUBLISH_INTERVAL seconds.
"""
LOGGER.info('Scheduling next message for %0.1f seconds',
self.PUBLISH_INTERVAL)
self._connection.ioloop.call_later(self.PUBLISH_INTERVAL,
self.publish_message)
def publish_message(self):
"""If the class is not stopping, publish a message to RabbitMQ,
appending a list of deliveries with the message number that was sent.
This list will be used to check for delivery confirmations in the
on_delivery_confirmations method.
Once the message has been sent, schedule another message to be sent.
The main reason I put scheduling in was just so you can get a good idea
of how the process is flowing by slowing down and speeding up the
delivery intervals by changing the PUBLISH_INTERVAL constant in the
class.
"""
if self._channel is None or not self._channel.is_open:
return
hdrs = {u'مفتاح': u' قيمة', u'键': u'值', u'キー': u'値'}
properties = pika.BasicProperties(
app_id='example-publisher',
content_type='application/json',
headers=hdrs,
delivery_mode=2,
)
message = u'مفتاح قيمة 键 值 キー 値'
self._channel.basic_publish(
self.EXCHANGE,
self.ROUTING_KEY,
json.dumps(message, ensure_ascii=False),
properties,
mandatory=True,
)
self._message_number += 1
self._deliveries.append(self._message_number)
LOGGER.info('Published message # %i', self._message_number)
self.schedule_next_message()
def run(self):
"""Run the example code by connecting and then starting the IOLoop.
"""
while not self._stopping:
self._connection = None
self._deliveries = []
self._acked = 0
self._nacked = 0
self._message_number = 0
try:
self._connection = self.connect()
self._connection.ioloop.start()
except KeyboardInterrupt:
self.stop()
if (self._connection is not None and
not self._connection.is_closed):
# Finish closing
self._connection.ioloop.start()
LOGGER.info('Stopped')
def stop(self):
"""Stop the example by closing the channel and connection. We
set a flag here so that we stop scheduling new messages to be
published. The IOLoop is started because this method is
invoked by the Try/Catch below when KeyboardInterrupt is caught.
Starting the IOLoop again will allow the publisher to cleanly
disconnect from RabbitMQ.
"""
LOGGER.info('Stopping')
self._stopping = True
self.close_channel()
self.close_connection()
def close_channel(self):
"""Invoke this command to close the channel with RabbitMQ by sending
the Channel.Close RPC command.
"""
if self._channel is not None:
LOGGER.info('Closing the channel')
self._channel.close()
def close_connection(self):
"""This method closes the connection to RabbitMQ."""
if self._connection is not None:
LOGGER.info('Closing connection')
self._connection.close()
def main():
#logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
logging.basicConfig(level=logging_level, format=LOG_FORMAT)
# Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F)
example = ExamplePublisher(
'amqp://' + username + ':' + password + '@localhost:5672/%2F?connection_attempts=3&heartbeat=3600'
)
example.run()
if __name__ == '__main__':
main()
1. Asynchronous: Terminate rabbitmq
When this happens the program terminates (I removed the tracebacks). Good.
...
2022-03-18 13:44:49,693 INFO [on_delivery_confirmation 253] Published 6 messages, 0 have yet to be confirmed, 6 were acked and 0 were nacked
2022-03-18 13:44:50,680 INFO [publish_message 299] Published message # 7
2022-03-18 13:44:50,681 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:44:50,689 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 7
2022-03-18 13:44:50,689 INFO [on_delivery_confirmation 253] Published 7 messages, 0 have yet to be confirmed, 7 were acked and 0 were nacked
2022-03-18 13:44:51,203 INFO [abort 731] Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO [_initiate_abort 904] _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO [_deactivate 869] Deactivating transport: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO [_on_stream_terminated 1996] AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=None; pending-error=ConnectionClosedByBroker: (320) "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
2022-03-18 13:44:51,204 WARNING [on_channel_closed 146] Channel 1 was closed: (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
2022-03-18 13:44:51,204 ERROR [close 1282] Illegal close(200, 'Normal shutdown') request on <SelectConnection CLOSED transport=None params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>> because it was called while connection state=CLOSED.
2022-03-18 13:44:51,204 ERROR [process 235] Calling <bound method ExamplePublisher.on_channel_closed of <__main__.ExamplePublisher object at 0x7efed2c60a90>> for "1:_on_channel_close" failed
2022-03-18 13:44:51,205 INFO [_close_and_finalize 882] Closing transport socket and unlinking: state=3; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,205 ERROR [log_exception_func_wrap 55] Wrapped func exited with exception. Caller's stack:
...
2. Asynchronous: Delete exchange
What we see here is that the connection is closed and restarted after 5 seconds. Good.
...
2022-03-18 13:43:29,755 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 31
2022-03-18 13:43:29,755 INFO [on_delivery_confirmation 253] Published 31 messages, 0 have yet to be confirmed, 31 were acked and 0 were nacked
2022-03-18 13:43:30,753 INFO [publish_message 299] Published message # 32
2022-03-18 13:43:30,753 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:30,755 WARNING [_on_close_from_broker 1070] Received remote Channel.Close (404): "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'" on <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a602bf40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>>
2022-03-18 13:43:30,755 WARNING [on_channel_closed 146] Channel 1 was closed: (404, "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'")
2022-03-18 13:43:30,755 INFO [close 1295] Closing connection (200): 'Normal shutdown'
2022-03-18 13:43:30,755 INFO [close 1322] Connection.close is waiting for 1 channels to close: <SelectConnection CLOSING transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a602bf40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:30,757 INFO [abort 731] Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,757 INFO [_initiate_abort 904] _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,757 INFO [_deactivate 869] Deactivating transport: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,758 INFO [_on_stream_terminated 1996] AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=None; pending-error=ConnectionClosedByClient: (200) 'Normal shutdown'
2022-03-18 13:43:30,758 INFO [_on_stream_terminated 2065] Stack terminated due to ConnectionClosedByClient: (200) 'Normal shutdown'
2022-03-18 13:43:30,758 WARNING [on_connection_closed 106] Connection closed, reopening in 5 seconds: (200, 'Normal shutdown')
2022-03-18 13:43:30,758 INFO [_close_and_finalize 882] Closing transport socket and unlinking: state=3; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:35,763 INFO [connect 69] Connecting to amqp://user:password@localhost:5672/%2F?connection_attempts=3&heartbeat=3600
2022-03-18 13:43:35,764 INFO [start 179] Pika version 1.2.0 connecting to ('127.0.0.1', 5672)
2022-03-18 13:43:35,764 INFO [_on_writable 345] Socket connected: <socket.socket fd=8, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44168), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:35,764 INFO [_on_transport_establishment_done 428] Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>).
2022-03-18 13:43:35,766 INFO [on_connection_open 82] Connection opened
2022-03-18 13:43:35,766 INFO [open_channel 116] Creating a new channel
2022-03-18 13:43:35,767 INFO [_report_completion_and_cleanup 293] AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:35,767 INFO [_report_completion_and_cleanup 725] AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:35,767 INFO [on_channel_open 125] Channel opened
2022-03-18 13:43:35,767 INFO [add_on_channel_close_callback 134] Adding channel close callback
2022-03-18 13:43:35,767 INFO [setup_exchange 157] Declaring exchange my_test_exchange
2022-03-18 13:43:35,768 INFO [on_exchange_declareok 173] Exchange declared: my_test_exchange
2022-03-18 13:43:35,768 INFO [setup_queue 182] Declaring queue my_test_queue
2022-03-18 13:43:35,769 INFO [on_queue_declareok 199] Binding my_test_exchange to my_test_queue with my_test_queue
2022-03-18 13:43:35,769 INFO [on_bindok 211] Queue bound
2022-03-18 13:43:35,769 INFO [start_publishing 218] Issuing consumer related RPC commands
2022-03-18 13:43:35,769 INFO [enable_delivery_confirmations 231] Issuing Confirm.Select RPC command
2022-03-18 13:43:35,769 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:36,771 INFO [publish_message 299] Published message # 1
2022-03-18 13:43:36,771 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:36,785 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 1
2022-03-18 13:43:36,785 INFO [on_delivery_confirmation 253] Published 1 messages, 0 have yet to be confirmed, 1 were acked and 0 were nacked
2022-03-18 13:43:37,773 INFO [publish_message 299] Published message # 2---
...
3. Asynchronous: Delete queue
Nothing happens when we delete the queue. Not good! Why is this not detected?
...
2022-03-18 13:46:42,361 INFO [publish_message 299] Published message # 16
2022-03-18 13:46:42,361 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:42,363 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 16
2022-03-18 13:46:42,363 INFO [on_delivery_confirmation 253] Published 16 messages, 0 have yet to be confirmed, 16 were acked and 0 were nacked
2022-03-18 13:46:43,363 INFO [publish_message 299] Published message # 17
2022-03-18 13:46:43,363 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:43,364 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 17
2022-03-18 13:46:43,364 INFO [on_delivery_confirmation 253] Published 17 messages, 0 have yet to be confirmed, 17 were acked and 0 were nacked
2022-03-18 13:46:44,365 INFO [publish_message 299] Published message # 18
2022-03-18 13:46:44,366 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:44,367 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 18
2022-03-18 13:46:44,367 INFO [on_delivery_confirmation 253] Published 18 messages, 0 have yet to be confirmed, 18 were acked and 0 were nacked
2022-03-18 13:46:45,367 INFO [publish_message 299] Published message # 19
2022-03-18 13:46:45,367 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:45,368 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 19
2022-03-18 13:46:45,368 INFO [on_delivery_confirmation 253] Published 19 messages, 0 have yet to be confirmed, 19 were acked and 0 were nacked
2022-03-18 13:46:46,368 INFO [publish_message 299] Published message # 20
...
Asynchronous publishing: basic.ack and basic.nack are not enough
From the documentation:
'When Will Published Messages Be Confirmed by the Broker?
For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).'
This means we must capture the basic.return in our script. I already added the lines to the code above that take care of this. To enable them change this line:
# # self.add_on_channel_return_callback()
to:
self.add_on_channel_return_callback()
If you run the script again, you will notice that after removing the queue, the script tries to restart the connection. The on_channel_returned() method, returns the message, before basic.ack is received.
Because the basic.return is sent to the client before (!) basic.ack, the message will be retried.
Summary
Always test all conditions before using example code. I did only three tests but you can do a lot more, for example check if the exchange and queue are still present after a restart of RabbitMQ. And check if the messages are still in the queue after a restart of RabbitMQ.
Besides that, asynchronous publishing, without data loss meaning with confirms, introduces a lot of complexity. The throughput can dramatically increase but this also depends very much on the architecture of your application. Do you really need asynchronous publishing?
Links / credits
Notify consumer when a queue is deleted on rabbitmq
https://stackoverflow.com/questions/15527226/notify-consumer-when-a-queue-is-deleted-on-rabbitmq
Part 1: RabbitMQ Best Practices
https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html
Publishing Throughput - Asynchronous vs Synchronous
https://www.cloudamqp.com/blog/publishing-throughput-asynchronous-vs-synchronous.html
RabbitMQ - AMQP 0-9-1 Complete Reference Guide
https://www.rabbitmq.com/amqp-0-9-1-reference.html
RabbitMQ - Publisher Confirms
https://www.rabbitmq.com/confirms.html
RabbitMQ - Reliability Guide
https://www.rabbitmq.com/reliability.html
Read more
Message broker Pika RabbitMQ
Most viewed
- Using UUIDs instead of Integer Autoincrement Primary Keys with SQLAlchemy and MariaDb
- Using Python's pyOpenSSL to verify SSL certificates downloaded from a host
- Using PyInstaller and Cython to create a Python executable
- Connect to a service on a Docker host from a Docker container
- SQLAlchemy: Using Cascade Deletes to delete related objects
- Flask RESTful API request parameter validation with Marshmallow schemas