angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Prueba de los ejemplos de publicación de RabbitMQ Pika

Con RabbitMQ Pika publicación asíncrona no se olvide de incluir todas las condiciones para evitar la pérdida de datos.

25 marzo 2022
post main image
https://www.pexels.com/nl-nl/@reezky11

Se ha escrito mucho sobre la publicación síncrona vs asíncrona con RabbitMQ, ver enlaces más abajo, no voy a repetirlo aquí. Como es la primera vez que utilizo RabbitMQ, he querido probar tanto la versión de publicación síncrona como la asíncrona,
utilizando los ejemplos incluidos con Pika, la librería cliente RabbitMQ (AMQP 0-9-1) para Python.

Para mi sorpresa, el ejemplo asíncrono de Pika no detectaba cuando se eliminaba la cola, sino que seguía enviando mensajes. ¿Qué está pasando aquí?

Cómo probarlo

Usamos el RabbitMQ con gestión Docker image, la interfaz de gestión está en:

http://127.0.0.1:15672

Aquí he cambiado las credenciales a 'user'/'password'.

He creado dos scripts Python basados en los ejemplos Pika :

  • publicación sincrónica, utilizando BlockingConnection
  • publicación asíncrona, utilizando SelectConnection

Los scripts siguen publicando mensajes cada 2 segundos.

Cuando un script se está ejecutando (publicando mensajes en una cola) creo las siguientes condiciones:

  1. Terminate RabbitMQ
  2. Borrar el intercambio
  3. Borrar la cola

Terminar RabbitMQ se hace matando el contenedor Docker . La eliminación del intercambio y de la cola se realiza desde la interfaz de gestión de RabbitMQ . Veamos qué ocurre.

Publicación síncrona utilizando BlockingConnection

El código:

# blockingconnection.py
# based on:
# Using Delivery Confirmations with the BlockingConnection
# https://pika.readthedocs.io/en/stable/examples/blocking_delivery_confirmations.html?highlight=blockingconnection
# changes made:
# - username/password = user/password
# - added to exchange_declare: durable=True, auto_delete=False
# - added to queue_declare: durable=True, exclusive=False, auto_delete=False

import pika
from pika.exchange_type import ExchangeType
import time

username = 'user'
password = 'password'
my_exchange = 'my_test_exchange'
my_exchange_type = ExchangeType.direct
my_queue = 'my_test_queue'
my_routing_key = my_queue


# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        credentials=pika.PlainCredentials(
            username,
            password
        ),
    ),
)

# get channel
channel = connection.channel()

# declare our exchange
channel.exchange_declare(
    exchange=my_exchange,
    exchange_type=my_exchange_type,
    durable=True,
    auto_delete=False,
)

# declare our queue
channel.queue_declare(
    queue=my_queue,
    durable=True,
    exclusive=False,
    auto_delete=False,
)

# bind queue to exchange
channel.queue_bind(
    my_queue,
    my_exchange,
    routing_key=my_routing_key,
)

# Turn on delivery confirmations
channel.confirm_delivery()

# Send messages
for i in range(20):
    try:
        channel.basic_publish(
            exchange=my_exchange,
            routing_key=my_routing_key,
            body='Hello World!',
            properties=pika.BasicProperties(
                content_type='text/plain',
                delivery_mode=2,
            ),
            mandatory=True,
        ),
        print('Message {} publish was confirmed'.format(i))
    except Exception as e:
        e_name = type(e).__name__
        print('Message {} could not be confirmed, exception = {}, e = {}'.format(i, e_name, e))
    time.sleep(2)

1. Sincrónico: Terminar RabbitMQ

Cuando se termina RabbitMQ , el mensaje no puede ser confirmado. Bien.

...
Message 1 publish was confirmed
Message 2 publish was confirmed
Message 3 publish was confirmed
Message 4 could not be confirmed, exception = ConnectionClosedByBroker, e = (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
Message 5 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 6 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 7 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
...

2. Sincrónico: Borrar intercambio

Cuando se elimina el intercambio, el mensaje no puede confirmarse. Bien.

...
Message 3 publish was confirmed
Message 4 publish was confirmed
Message 5 publish was confirmed
Message 6 could not be confirmed, exception = ChannelClosedByBroker, e = (404, "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'")
Message 7 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 8 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 9 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
...

3. Sincrónico: Borrar cola

Cuando se borra la cola, el mensaje no se puede confirmar. Bien.

...
Message 6 publish was confirmed
Message 7 publish was confirmed
Message 8 publish was confirmed
Message 9 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
Message 10 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
Message 11 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
...

Publicación asíncrona utilizando SelectConnection

El código:

# selectconnection.py
# based on:
# https://github.com/pika/pika/blob/master/examples/asynchronous_publisher_example.py
# changes made:
# - username/password = user/password
# - added username, password to connect string
# - added to exchange_declare: durable=True, auto_delete=False
# - added to queue_declare: durable=True, exclusive=False, auto_delete=False
# - added to basic_publish: mandatory=True
# - added to basic_publish properties: delivery_mode=2

# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205

import functools
import logging
import json
import pika
from pika.exchange_type import ExchangeType

LOG_FORMAT = ('%(asctime)s %(levelname) -8s [%(funcName) -35s %(lineno) 5d] %(message)s')
LOGGER = logging.getLogger(__name__)
logging_level = logging.INFO


username = 'user'
password = 'password'
my_exchange = 'my_test_exchange'
my_exchange_type = ExchangeType.direct
my_queue = 'my_test_queue'
my_routing_key = my_queue


class ExamplePublisher(object):
    """This is an example publisher that will handle unexpected interactions
    with RabbitMQ such as channel and connection closures.
    If RabbitMQ closes the connection, it will reopen it. You should
    look at the output, as there are limited reasons why the connection may
    be closed, which usually are tied to permission related issues or
    socket timeouts.
    It uses delivery confirmations and illustrates one way to keep track of
    messages that have been sent and if they've been confirmed by RabbitMQ.
    """
    EXCHANGE = my_exchange
    EXCHANGE_TYPE = my_exchange_type
    QUEUE = my_queue
    ROUTING_KEY = my_routing_key
    PUBLISH_INTERVAL = 2

    def __init__(self, amqp_url):
        """Setup the example publisher object, passing in the URL we will use
        to connect to RabbitMQ.
        :param str amqp_url: The URL for connecting to RabbitMQ
        """
        self._connection = None
        self._channel = None

        self._deliveries = None
        self._acked = None
        self._nacked = None
        self._message_number = None

        self._stopping = False
        self._url = amqp_url

    def connect(self):
        """This method connects to RabbitMQ, returning the connection handle.
        When the connection is established, the on_connection_open method
        will be invoked by pika.
        :rtype: pika.SelectConnection
        """
        LOGGER.info('Connecting to %s', self._url)
        return pika.SelectConnection(
            pika.URLParameters(self._url),
            on_open_callback=self.on_connection_open,
            on_open_error_callback=self.on_connection_open_error,
            on_close_callback=self.on_connection_closed)

    def on_connection_open(self, _unused_connection):
        """This method is called by pika once the connection to RabbitMQ has
        been established. It passes the handle to the connection object in
        case we need it, but in this case, we'll just mark it unused.
        :param pika.SelectConnection _unused_connection: The connection
        """
        LOGGER.info('Connection opened')
        self.open_channel()

    def on_connection_open_error(self, _unused_connection, err):
        """This method is called by pika if the connection to RabbitMQ
        can't be established.
        :param pika.SelectConnection _unused_connection: The connection
        :param Exception err: The error
        """
        LOGGER.error('Connection open failed, reopening in 5 seconds: %s', err)
        self._connection.ioloop.call_later(5, self._connection.ioloop.stop)

    def on_connection_closed(self, _unused_connection, reason):
        """This method is invoked by pika when the connection to RabbitMQ is
        closed unexpectedly. Since it is unexpected, we will reconnect to
        RabbitMQ if it disconnects.
        :param pika.connection.Connection connection: The closed connection obj
        :param Exception reason: exception representing reason for loss of
            connection.
        """
        self._channel = None
        if self._stopping:
            self._connection.ioloop.stop()
        else:
            LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
                           reason)
            self._connection.ioloop.call_later(5, self._connection.ioloop.stop)

    def open_channel(self):
        """This method will open a new channel with RabbitMQ by issuing the
        Channel.Open RPC command. When RabbitMQ confirms the channel is open
        by sending the Channel.OpenOK RPC reply, the on_channel_open method
        will be invoked.
        """
        LOGGER.info('Creating a new channel')
        self._connection.channel(on_open_callback=self.on_channel_open)

    def on_channel_open(self, channel):
        """This method is invoked by pika when the channel has been opened.
        The channel object is passed in so we can make use of it.
        Since the channel is now open, we'll declare the exchange to use.
        :param pika.channel.Channel channel: The channel object
        """
        LOGGER.info('Channel opened')
        self._channel = channel
        self.add_on_channel_close_callback()
        # # (1 of 3) add to detect unroutable messages
        # # self.add_on_channel_return_callback()
        self.setup_exchange(self.EXCHANGE)

    def add_on_channel_close_callback(self):
        """This method tells pika to call the on_channel_closed method if
        RabbitMQ unexpectedly closes the channel.
        """
        LOGGER.info('Adding channel close callback')
        self._channel.add_on_close_callback(self.on_channel_closed)

    def on_channel_closed(self, channel, reason):
        """Invoked by pika when RabbitMQ unexpectedly closes the channel.
        Channels are usually closed if you attempt to do something that
        violates the protocol, such as re-declare an exchange or queue with
        different parameters. In this case, we'll close the connection
        to shutdown the object.
        :param pika.channel.Channel channel: The closed channel
        :param Exception reason: why the channel was closed
        """
        LOGGER.warning('Channel %i was closed: %s', channel, reason)
        self._channel = None
        if not self._stopping:
            self._connection.close()

    # # (2 of 3) add to detect unroutable messages
    def add_on_channel_return_callback(self):
        LOGGER.info('Adding channel return callback')
        self._channel.add_on_return_callback(self.on_channel_returned)

    # # (3 of 3) add to detect unroutable messages
    def on_channel_returned(self, channel, method, properties, body):
        """Invoked by pika when RabbitMQ unexpectedly returns from the channel.

        """
        LOGGER.warning('Channel = {} returned, method = {}, properties = {}, body = {}'.format(channel, method, properties, body))
        self._channel = None
        if not self._stopping:
            self._connection.close()

    def setup_exchange(self, exchange_name):
        """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
        command. When it is complete, the on_exchange_declareok method will
        be invoked by pika.
        :param str|unicode exchange_name: The name of the exchange to declare
        """
        LOGGER.info('Declaring exchange %s', exchange_name)
        # Note: using functools.partial is not required, it is demonstrating
        # how arbitrary data can be passed to the callback when it is called
        cb = functools.partial(
            self.on_exchange_declareok, userdata=exchange_name)
        self._channel.exchange_declare(
            exchange=exchange_name,
            exchange_type=self.EXCHANGE_TYPE,
            durable=True,
            auto_delete=False,
            callback=cb)

    def on_exchange_declareok(self, _unused_frame, userdata):
        """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
        command.
        :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
        :param str|unicode userdata: Extra user data (exchange name)
        """
        LOGGER.info('Exchange declared: %s', userdata)
        self.setup_queue(self.QUEUE)

    def setup_queue(self, queue_name):
        """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
        command. When it is complete, the on_queue_declareok method will
        be invoked by pika.
        :param str|unicode queue_name: The name of the queue to declare.
        """
        LOGGER.info('Declaring queue %s', queue_name)
        self._channel.queue_declare(
            queue=queue_name,
            durable=True,
            exclusive=False,
            auto_delete=False,
            callback=self.on_queue_declareok,
        )

    def on_queue_declareok(self, _unused_frame):
        """Method invoked by pika when the Queue.Declare RPC call made in
        setup_queue has completed. In this method we will bind the queue
        and exchange together with the routing key by issuing the Queue.Bind
        RPC command. When this command is complete, the on_bindok method will
        be invoked by pika.
        :param pika.frame.Method method_frame: The Queue.DeclareOk frame
        """
        LOGGER.info('Binding %s to %s with %s', self.EXCHANGE, self.QUEUE,
                    self.ROUTING_KEY)
        self._channel.queue_bind(
            self.QUEUE,
            self.EXCHANGE,
            routing_key=self.ROUTING_KEY,
            callback=self.on_bindok)

    def on_bindok(self, _unused_frame):
        """This method is invoked by pika when it receives the Queue.BindOk
        response from RabbitMQ. Since we know we're now setup and bound, it's
        time to start publishing."""
        LOGGER.info('Queue bound')
        self.start_publishing()

    def start_publishing(self):
        """This method will enable delivery confirmations and schedule the
        first message to be sent to RabbitMQ
        """
        LOGGER.info('Issuing consumer related RPC commands')
        self.enable_delivery_confirmations()
        self.schedule_next_message()

    def enable_delivery_confirmations(self):
        """Send the Confirm.Select RPC method to RabbitMQ to enable delivery
        confirmations on the channel. The only way to turn this off is to close
        the channel and create a new one.
        When the message is confirmed from RabbitMQ, the
        on_delivery_confirmation method will be invoked passing in a Basic.Ack
        or Basic.Nack method from RabbitMQ that will indicate which messages it
        is confirming or rejecting.
        """
        LOGGER.info('Issuing Confirm.Select RPC command')
        self._channel.confirm_delivery(self.on_delivery_confirmation)

    def on_delivery_confirmation(self, method_frame):
        """Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
        command, passing in either a Basic.Ack or Basic.Nack frame with
        the delivery tag of the message that was published. The delivery tag
        is an integer counter indicating the message number that was sent
        on the channel via Basic.Publish. Here we're just doing house keeping
        to keep track of stats and remove message numbers that we expect
        a delivery confirmation of from the list used to keep track of messages
        that are pending confirmation.
        :param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame
        """
        LOGGER.info('Received method_frame: {}'.format(method_frame))
        confirmation_type = method_frame.method.NAME.split('.')[1].lower()
        LOGGER.info('Received %s for delivery tag: %i', confirmation_type,
                    method_frame.method.delivery_tag)
        if confirmation_type == 'ack':
            self._acked += 1
        elif confirmation_type == 'nack':
            self._nacked += 1
        self._deliveries.remove(method_frame.method.delivery_tag)
        LOGGER.info(
            'Published %i messages, %i have yet to be confirmed, '
            '%i were acked and %i were nacked', self._message_number,
            len(self._deliveries), self._acked, self._nacked)

    def schedule_next_message(self):
        """If we are not closing our connection to RabbitMQ, schedule another
        message to be delivered in PUBLISH_INTERVAL seconds.
        """
        LOGGER.info('Scheduling next message for %0.1f seconds',
                    self.PUBLISH_INTERVAL)
        self._connection.ioloop.call_later(self.PUBLISH_INTERVAL,
                                           self.publish_message)

    def publish_message(self):
        """If the class is not stopping, publish a message to RabbitMQ,
        appending a list of deliveries with the message number that was sent.
        This list will be used to check for delivery confirmations in the
        on_delivery_confirmations method.
        Once the message has been sent, schedule another message to be sent.
        The main reason I put scheduling in was just so you can get a good idea
        of how the process is flowing by slowing down and speeding up the
        delivery intervals by changing the PUBLISH_INTERVAL constant in the
        class.
        """
        if self._channel is None or not self._channel.is_open:
            return

        hdrs = {u'مفتاح': u' قيمة', u'键': u'值', u'キー': u'値'}
        properties = pika.BasicProperties(
            app_id='example-publisher',
            content_type='application/json',
            headers=hdrs,
            delivery_mode=2,
        )

        message = u'مفتاح قيمة 键 值 キー 値'
        self._channel.basic_publish(
            self.EXCHANGE, 
            self.ROUTING_KEY,
            json.dumps(message, ensure_ascii=False),
            properties,
            mandatory=True,
        )

        self._message_number += 1
        self._deliveries.append(self._message_number)
        LOGGER.info('Published message # %i', self._message_number)
        self.schedule_next_message()

    def run(self):
        """Run the example code by connecting and then starting the IOLoop.
        """
        while not self._stopping:
            self._connection = None
            self._deliveries = []
            self._acked = 0
            self._nacked = 0
            self._message_number = 0

            try:
                self._connection = self.connect()
                self._connection.ioloop.start()
            except KeyboardInterrupt:
                self.stop()
                if (self._connection is not None and
                        not self._connection.is_closed):
                    # Finish closing
                    self._connection.ioloop.start()

        LOGGER.info('Stopped')

    def stop(self):
        """Stop the example by closing the channel and connection. We
        set a flag here so that we stop scheduling new messages to be
        published. The IOLoop is started because this method is
        invoked by the Try/Catch below when KeyboardInterrupt is caught.
        Starting the IOLoop again will allow the publisher to cleanly
        disconnect from RabbitMQ.
        """
        LOGGER.info('Stopping')
        self._stopping = True
        self.close_channel()
        self.close_connection()

    def close_channel(self):
        """Invoke this command to close the channel with RabbitMQ by sending
        the Channel.Close RPC command.
        """
        if self._channel is not None:
            LOGGER.info('Closing the channel')
            self._channel.close()

    def close_connection(self):
        """This method closes the connection to RabbitMQ."""
        if self._connection is not None:
            LOGGER.info('Closing connection')
            self._connection.close()

def main():
    #logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
    logging.basicConfig(level=logging_level, format=LOG_FORMAT)

    # Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F)
    example = ExamplePublisher(
        'amqp://' + username + ':' + password + '@localhost:5672/%2F?connection_attempts=3&heartbeat=3600'
    )
    example.run()

if __name__ == '__main__':
    main()

1. Asíncrono: Terminar rabbitmq

Cuando esto sucede, el programa termina (he eliminado las trazas). Bien.

...
2022-03-18 13:44:49,693 INFO     [on_delivery_confirmation              253] Published 6 messages, 0 have yet to be confirmed, 6 were acked and 0 were nacked
2022-03-18 13:44:50,680 INFO     [publish_message                       299] Published message # 7
2022-03-18 13:44:50,681 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:44:50,689 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 7
2022-03-18 13:44:50,689 INFO     [on_delivery_confirmation              253] Published 7 messages, 0 have yet to be confirmed, 7 were acked and 0 were nacked
2022-03-18 13:44:51,203 INFO     [abort                                 731] Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO     [_initiate_abort                       904] _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO     [_deactivate                           869] Deactivating transport: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO     [_on_stream_terminated                1996] AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=None; pending-error=ConnectionClosedByBroker: (320) "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
2022-03-18 13:44:51,204 WARNING  [on_channel_closed                     146] Channel 1 was closed: (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
2022-03-18 13:44:51,204 ERROR    [close                                1282] Illegal close(200, 'Normal shutdown') request on <SelectConnection CLOSED transport=None params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>> because it was called while connection state=CLOSED.
2022-03-18 13:44:51,204 ERROR    [process                               235] Calling <bound method ExamplePublisher.on_channel_closed of <__main__.ExamplePublisher object at 0x7efed2c60a90>> for "1:_on_channel_close" failed
2022-03-18 13:44:51,205 INFO     [_close_and_finalize                   882] Closing transport socket and unlinking: state=3; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,205 ERROR    [log_exception_func_wrap                55] Wrapped func exited with exception. Caller's stack:
...

2. Asíncrono: Borrar intercambio

Lo que vemos aquí es que la conexión se cierra y se reinicia después de 5 segundos. Bien.

...
2022-03-18 13:43:29,755 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 31
2022-03-18 13:43:29,755 INFO     [on_delivery_confirmation              253] Published 31 messages, 0 have yet to be confirmed, 31 were acked and 0 were nacked
2022-03-18 13:43:30,753 INFO     [publish_message                       299] Published message # 32
2022-03-18 13:43:30,753 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:30,755 WARNING  [_on_close_from_broker                1070] Received remote Channel.Close (404): "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'" on <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a602bf40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>>
2022-03-18 13:43:30,755 WARNING  [on_channel_closed                     146] Channel 1 was closed: (404, "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'")
2022-03-18 13:43:30,755 INFO     [close                                1295] Closing connection (200): 'Normal shutdown'
2022-03-18 13:43:30,755 INFO     [close                                1322] Connection.close is waiting for 1 channels to close: <SelectConnection CLOSING transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a602bf40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:30,757 INFO     [abort                                 731] Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,757 INFO     [_initiate_abort                       904] _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,757 INFO     [_deactivate                           869] Deactivating transport: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,758 INFO     [_on_stream_terminated                1996] AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=None; pending-error=ConnectionClosedByClient: (200) 'Normal shutdown'
2022-03-18 13:43:30,758 INFO     [_on_stream_terminated                2065] Stack terminated due to ConnectionClosedByClient: (200) 'Normal shutdown'
2022-03-18 13:43:30,758 WARNING  [on_connection_closed                  106] Connection closed, reopening in 5 seconds: (200, 'Normal shutdown')
2022-03-18 13:43:30,758 INFO     [_close_and_finalize                   882] Closing transport socket and unlinking: state=3; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:35,763 INFO     [connect                                69] Connecting to amqp://user:password@localhost:5672/%2F?connection_attempts=3&heartbeat=3600
2022-03-18 13:43:35,764 INFO     [start                                 179] Pika version 1.2.0 connecting to ('127.0.0.1', 5672)
2022-03-18 13:43:35,764 INFO     [_on_writable                          345] Socket connected: <socket.socket fd=8, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44168), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:35,764 INFO     [_on_transport_establishment_done      428] Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>).
2022-03-18 13:43:35,766 INFO     [on_connection_open                     82] Connection opened
2022-03-18 13:43:35,766 INFO     [open_channel                          116] Creating a new channel
2022-03-18 13:43:35,767 INFO     [_report_completion_and_cleanup        293] AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:35,767 INFO     [_report_completion_and_cleanup        725] AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:35,767 INFO     [on_channel_open                       125] Channel opened
2022-03-18 13:43:35,767 INFO     [add_on_channel_close_callback         134] Adding channel close callback
2022-03-18 13:43:35,767 INFO     [setup_exchange                        157] Declaring exchange my_test_exchange
2022-03-18 13:43:35,768 INFO     [on_exchange_declareok                 173] Exchange declared: my_test_exchange
2022-03-18 13:43:35,768 INFO     [setup_queue                           182] Declaring queue my_test_queue
2022-03-18 13:43:35,769 INFO     [on_queue_declareok                    199] Binding my_test_exchange to my_test_queue with my_test_queue
2022-03-18 13:43:35,769 INFO     [on_bindok                             211] Queue bound
2022-03-18 13:43:35,769 INFO     [start_publishing                      218] Issuing consumer related RPC commands
2022-03-18 13:43:35,769 INFO     [enable_delivery_confirmations         231] Issuing Confirm.Select RPC command
2022-03-18 13:43:35,769 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:36,771 INFO     [publish_message                       299] Published message # 1
2022-03-18 13:43:36,771 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:36,785 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 1
2022-03-18 13:43:36,785 INFO     [on_delivery_confirmation              253] Published 1 messages, 0 have yet to be confirmed, 1 were acked and 0 were nacked
2022-03-18 13:43:37,773 INFO     [publish_message                       299] Published message # 2---
...

3. Asíncrono: Borrar cola

No pasa nada cuando borramos la cola. No es bueno. ¿Por qué no se detecta?

...
2022-03-18 13:46:42,361 INFO     [publish_message                       299] Published message # 16
2022-03-18 13:46:42,361 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:42,363 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 16
2022-03-18 13:46:42,363 INFO     [on_delivery_confirmation              253] Published 16 messages, 0 have yet to be confirmed, 16 were acked and 0 were nacked
2022-03-18 13:46:43,363 INFO     [publish_message                       299] Published message # 17
2022-03-18 13:46:43,363 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:43,364 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 17
2022-03-18 13:46:43,364 INFO     [on_delivery_confirmation              253] Published 17 messages, 0 have yet to be confirmed, 17 were acked and 0 were nacked
2022-03-18 13:46:44,365 INFO     [publish_message                       299] Published message # 18
2022-03-18 13:46:44,366 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:44,367 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 18
2022-03-18 13:46:44,367 INFO     [on_delivery_confirmation              253] Published 18 messages, 0 have yet to be confirmed, 18 were acked and 0 were nacked
2022-03-18 13:46:45,367 INFO     [publish_message                       299] Published message # 19
2022-03-18 13:46:45,367 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:45,368 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 19
2022-03-18 13:46:45,368 INFO     [on_delivery_confirmation              253] Published 19 messages, 0 have yet to be confirmed, 19 were acked and 0 were nacked
2022-03-18 13:46:46,368 INFO     [publish_message                       299] Published message # 20
...

Publicación asíncrona: basic.ack y basic.nack no son suficientes

De la documentación:

'¿Cuándo serán confirmados los mensajes publicados por el broker?

Para los mensajes no enrutables, el broker emitirá una confirmación una vez que el intercambio verifique que un mensaje no se enruta a ninguna cola (devuelve una lista vacía de colas). Si el mensaje también se publica como obligatorio, el basic.return se envía al cliente antes del basic.ack. Lo mismo ocurre con los acuses de recibo negativos (basic.nack)".

Esto significa que debemos capturar el basic.return en nuestro script. Ya he añadido las líneas al código anterior que se encargan de esto. Para habilitarlas cambia esta línea:

# # self.add_on_channel_return_callback()

a:

self.add_on_channel_return_callback()

Si vuelves a ejecutar el script, notarás que después de eliminar la cola, el script intenta reiniciar la conexión. El método on_channel_returned(), devuelve el mensaje, antes de que basic.ack sea recibido.

Debido a que el basic.return es enviado al cliente antes (!) del basic.ack, el mensaje será reintentado.

Resumen

Siempre pruebe todas las condiciones antes de usar el código de ejemplo. Yo sólo hice tres pruebas, pero puedes hacer muchas más, por ejemplo, comprobar si el intercambio y la cola siguen presentes después de un reinicio de RabbitMQ. Y comprobar si los mensajes siguen en la cola después de un reinicio de RabbitMQ.

Además, la publicación asíncrona, sin pérdida de datos, es decir, con confirmaciones, introduce mucha complejidad. El rendimiento puede aumentar drásticamente, pero esto también depende en gran medida de la arquitectura de su aplicación. ¿Realmente necesita la publicación asíncrona?

Enlaces / créditos

Notify consumer when a queue is deleted on rabbitmq
https://stackoverflow.com/questions/15527226/notify-consumer-when-a-queue-is-deleted-on-rabbitmq

Part 1: RabbitMQ Best Practices
https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html

Publishing Throughput - Asynchronous vs Synchronous
https://www.cloudamqp.com/blog/publishing-throughput-asynchronous-vs-synchronous.html

RabbitMQ - AMQP 0-9-1 Complete Reference Guide
https://www.rabbitmq.com/amqp-0-9-1-reference.html

RabbitMQ - Publisher Confirms
https://www.rabbitmq.com/confirms.html

RabbitMQ - Reliability Guide
https://www.rabbitmq.com/reliability.html

Deje un comentario

Comente de forma anónima o inicie sesión para comentar.

Comentarios (1)

Deje una respuesta.

Responda de forma anónima o inicie sesión para responder.

avatar

Exceptional blog - TYVM. I'm new to the pika library and set up a work queue and multiple producers and consumers. It worked just as expected right until the connection to my RabbitMQ server dropped out. All the consumers threw exceptions, but the producers? They happily posted their messages into the ether, never to be seen again.
Which is how I ended up here, in your comments.
The async test code - the default messages and headers are intriguing