angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Test des exemples de publication RabbitMQ Pika

Avec la publication asynchrone RabbitMQ Pika , n'oubliez pas d'inclure toutes les conditions pour éviter la perte de données.

25 mars 2022
Dans RabbitMQ
post main image
https://www.pexels.com/nl-nl/@reezky11

Beaucoup de choses ont été écrites sur la publication synchrone et asynchrone avec RabbitMQ, voir les liens ci-dessous, je ne vais pas les répéter ici. Comme c'est la première fois que j'utilise RabbitMQ, j'ai voulu essayer les versions de publication synchrone et asynchrone,
en utilisant les exemples inclus dans Pika, la bibliothèque client RabbitMQ (AMQP 0-9-1) pour Python.

À ma grande surprise, l'exemple asynchrone Pika n'a pas détecté la suppression de la file d'attente, il a continué à envoyer des messages. Que se passe-t-il ici ?

Comment tester

Nous utilisons le RabbitMQ avec la gestion Docker image, l'interface de gestion est à :

http://127.0.0.1:15672

J'ai changé les informations d'identification en 'user'/'password'.

J'ai créé deux scripts Python basés sur les exemples Pika :

  • publication synchrone, en utilisant BlockingConnection
  • publication asynchrone, en utilisant SelectConnection

Les scripts publient des messages toutes les 2 secondes.

Lorsqu'un script est en cours d'exécution (publication de messages dans une file d'attente), je crée les conditions suivantes :

  1. Terminate RabbitMQ
  2. Supprimer l'échange
  3. Supprimer la file d'attente

Terminer RabbitMQ est fait en tuant le conteneur Docker . La suppression de l'échange et de la file d'attente se fait à partir de l'interface de gestion de RabbitMQ . Voyons ce qui se passe.

Publication synchrone en utilisant BlockingConnection

Le code :

# blockingconnection.py
# based on:
# Using Delivery Confirmations with the BlockingConnection
# https://pika.readthedocs.io/en/stable/examples/blocking_delivery_confirmations.html?highlight=blockingconnection
# changes made:
# - username/password = user/password
# - added to exchange_declare: durable=True, auto_delete=False
# - added to queue_declare: durable=True, exclusive=False, auto_delete=False

import pika
from pika.exchange_type import ExchangeType
import time

username = 'user'
password = 'password'
my_exchange = 'my_test_exchange'
my_exchange_type = ExchangeType.direct
my_queue = 'my_test_queue'
my_routing_key = my_queue


# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        credentials=pika.PlainCredentials(
            username,
            password
        ),
    ),
)

# get channel
channel = connection.channel()

# declare our exchange
channel.exchange_declare(
    exchange=my_exchange,
    exchange_type=my_exchange_type,
    durable=True,
    auto_delete=False,
)

# declare our queue
channel.queue_declare(
    queue=my_queue,
    durable=True,
    exclusive=False,
    auto_delete=False,
)

# bind queue to exchange
channel.queue_bind(
    my_queue,
    my_exchange,
    routing_key=my_routing_key,
)

# Turn on delivery confirmations
channel.confirm_delivery()

# Send messages
for i in range(20):
    try:
        channel.basic_publish(
            exchange=my_exchange,
            routing_key=my_routing_key,
            body='Hello World!',
            properties=pika.BasicProperties(
                content_type='text/plain',
                delivery_mode=2,
            ),
            mandatory=True,
        ),
        print('Message {} publish was confirmed'.format(i))
    except Exception as e:
        e_name = type(e).__name__
        print('Message {} could not be confirmed, exception = {}, e = {}'.format(i, e_name, e))
    time.sleep(2)

1. Synchrone : Terminer RabbitMQ

Lorsque RabbitMQ est terminé, le message ne peut pas être confirmé. Bon.

...
Message 1 publish was confirmed
Message 2 publish was confirmed
Message 3 publish was confirmed
Message 4 could not be confirmed, exception = ConnectionClosedByBroker, e = (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
Message 5 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 6 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 7 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
...

2. Synchrone : Suppression de l'échange

Lorsque l'échange est supprimé, le message ne peut pas être confirmé. Bon.

...
Message 3 publish was confirmed
Message 4 publish was confirmed
Message 5 publish was confirmed
Message 6 could not be confirmed, exception = ChannelClosedByBroker, e = (404, "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'")
Message 7 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 8 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 9 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
...

3. Synchrone : Supprimer la file d'attente

Lorsque la file d'attente est supprimée, le message ne peut pas être confirmé. Bon.

...
Message 6 publish was confirmed
Message 7 publish was confirmed
Message 8 publish was confirmed
Message 9 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
Message 10 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
Message 11 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
...

Publication asynchrone avec SelectConnection

Le code :

# selectconnection.py
# based on:
# https://github.com/pika/pika/blob/master/examples/asynchronous_publisher_example.py
# changes made:
# - username/password = user/password
# - added username, password to connect string
# - added to exchange_declare: durable=True, auto_delete=False
# - added to queue_declare: durable=True, exclusive=False, auto_delete=False
# - added to basic_publish: mandatory=True
# - added to basic_publish properties: delivery_mode=2

# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205

import functools
import logging
import json
import pika
from pika.exchange_type import ExchangeType

LOG_FORMAT = ('%(asctime)s %(levelname) -8s [%(funcName) -35s %(lineno) 5d] %(message)s')
LOGGER = logging.getLogger(__name__)
logging_level = logging.INFO


username = 'user'
password = 'password'
my_exchange = 'my_test_exchange'
my_exchange_type = ExchangeType.direct
my_queue = 'my_test_queue'
my_routing_key = my_queue


class ExamplePublisher(object):
    """This is an example publisher that will handle unexpected interactions
    with RabbitMQ such as channel and connection closures.
    If RabbitMQ closes the connection, it will reopen it. You should
    look at the output, as there are limited reasons why the connection may
    be closed, which usually are tied to permission related issues or
    socket timeouts.
    It uses delivery confirmations and illustrates one way to keep track of
    messages that have been sent and if they've been confirmed by RabbitMQ.
    """
    EXCHANGE = my_exchange
    EXCHANGE_TYPE = my_exchange_type
    QUEUE = my_queue
    ROUTING_KEY = my_routing_key
    PUBLISH_INTERVAL = 2

    def __init__(self, amqp_url):
        """Setup the example publisher object, passing in the URL we will use
        to connect to RabbitMQ.
        :param str amqp_url: The URL for connecting to RabbitMQ
        """
        self._connection = None
        self._channel = None

        self._deliveries = None
        self._acked = None
        self._nacked = None
        self._message_number = None

        self._stopping = False
        self._url = amqp_url

    def connect(self):
        """This method connects to RabbitMQ, returning the connection handle.
        When the connection is established, the on_connection_open method
        will be invoked by pika.
        :rtype: pika.SelectConnection
        """
        LOGGER.info('Connecting to %s', self._url)
        return pika.SelectConnection(
            pika.URLParameters(self._url),
            on_open_callback=self.on_connection_open,
            on_open_error_callback=self.on_connection_open_error,
            on_close_callback=self.on_connection_closed)

    def on_connection_open(self, _unused_connection):
        """This method is called by pika once the connection to RabbitMQ has
        been established. It passes the handle to the connection object in
        case we need it, but in this case, we'll just mark it unused.
        :param pika.SelectConnection _unused_connection: The connection
        """
        LOGGER.info('Connection opened')
        self.open_channel()

    def on_connection_open_error(self, _unused_connection, err):
        """This method is called by pika if the connection to RabbitMQ
        can't be established.
        :param pika.SelectConnection _unused_connection: The connection
        :param Exception err: The error
        """
        LOGGER.error('Connection open failed, reopening in 5 seconds: %s', err)
        self._connection.ioloop.call_later(5, self._connection.ioloop.stop)

    def on_connection_closed(self, _unused_connection, reason):
        """This method is invoked by pika when the connection to RabbitMQ is
        closed unexpectedly. Since it is unexpected, we will reconnect to
        RabbitMQ if it disconnects.
        :param pika.connection.Connection connection: The closed connection obj
        :param Exception reason: exception representing reason for loss of
            connection.
        """
        self._channel = None
        if self._stopping:
            self._connection.ioloop.stop()
        else:
            LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
                           reason)
            self._connection.ioloop.call_later(5, self._connection.ioloop.stop)

    def open_channel(self):
        """This method will open a new channel with RabbitMQ by issuing the
        Channel.Open RPC command. When RabbitMQ confirms the channel is open
        by sending the Channel.OpenOK RPC reply, the on_channel_open method
        will be invoked.
        """
        LOGGER.info('Creating a new channel')
        self._connection.channel(on_open_callback=self.on_channel_open)

    def on_channel_open(self, channel):
        """This method is invoked by pika when the channel has been opened.
        The channel object is passed in so we can make use of it.
        Since the channel is now open, we'll declare the exchange to use.
        :param pika.channel.Channel channel: The channel object
        """
        LOGGER.info('Channel opened')
        self._channel = channel
        self.add_on_channel_close_callback()
        # # (1 of 3) add to detect unroutable messages
        # # self.add_on_channel_return_callback()
        self.setup_exchange(self.EXCHANGE)

    def add_on_channel_close_callback(self):
        """This method tells pika to call the on_channel_closed method if
        RabbitMQ unexpectedly closes the channel.
        """
        LOGGER.info('Adding channel close callback')
        self._channel.add_on_close_callback(self.on_channel_closed)

    def on_channel_closed(self, channel, reason):
        """Invoked by pika when RabbitMQ unexpectedly closes the channel.
        Channels are usually closed if you attempt to do something that
        violates the protocol, such as re-declare an exchange or queue with
        different parameters. In this case, we'll close the connection
        to shutdown the object.
        :param pika.channel.Channel channel: The closed channel
        :param Exception reason: why the channel was closed
        """
        LOGGER.warning('Channel %i was closed: %s', channel, reason)
        self._channel = None
        if not self._stopping:
            self._connection.close()

    # # (2 of 3) add to detect unroutable messages
    def add_on_channel_return_callback(self):
        LOGGER.info('Adding channel return callback')
        self._channel.add_on_return_callback(self.on_channel_returned)

    # # (3 of 3) add to detect unroutable messages
    def on_channel_returned(self, channel, method, properties, body):
        """Invoked by pika when RabbitMQ unexpectedly returns from the channel.

        """
        LOGGER.warning('Channel = {} returned, method = {}, properties = {}, body = {}'.format(channel, method, properties, body))
        self._channel = None
        if not self._stopping:
            self._connection.close()

    def setup_exchange(self, exchange_name):
        """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
        command. When it is complete, the on_exchange_declareok method will
        be invoked by pika.
        :param str|unicode exchange_name: The name of the exchange to declare
        """
        LOGGER.info('Declaring exchange %s', exchange_name)
        # Note: using functools.partial is not required, it is demonstrating
        # how arbitrary data can be passed to the callback when it is called
        cb = functools.partial(
            self.on_exchange_declareok, userdata=exchange_name)
        self._channel.exchange_declare(
            exchange=exchange_name,
            exchange_type=self.EXCHANGE_TYPE,
            durable=True,
            auto_delete=False,
            callback=cb)

    def on_exchange_declareok(self, _unused_frame, userdata):
        """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
        command.
        :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
        :param str|unicode userdata: Extra user data (exchange name)
        """
        LOGGER.info('Exchange declared: %s', userdata)
        self.setup_queue(self.QUEUE)

    def setup_queue(self, queue_name):
        """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
        command. When it is complete, the on_queue_declareok method will
        be invoked by pika.
        :param str|unicode queue_name: The name of the queue to declare.
        """
        LOGGER.info('Declaring queue %s', queue_name)
        self._channel.queue_declare(
            queue=queue_name,
            durable=True,
            exclusive=False,
            auto_delete=False,
            callback=self.on_queue_declareok,
        )

    def on_queue_declareok(self, _unused_frame):
        """Method invoked by pika when the Queue.Declare RPC call made in
        setup_queue has completed. In this method we will bind the queue
        and exchange together with the routing key by issuing the Queue.Bind
        RPC command. When this command is complete, the on_bindok method will
        be invoked by pika.
        :param pika.frame.Method method_frame: The Queue.DeclareOk frame
        """
        LOGGER.info('Binding %s to %s with %s', self.EXCHANGE, self.QUEUE,
                    self.ROUTING_KEY)
        self._channel.queue_bind(
            self.QUEUE,
            self.EXCHANGE,
            routing_key=self.ROUTING_KEY,
            callback=self.on_bindok)

    def on_bindok(self, _unused_frame):
        """This method is invoked by pika when it receives the Queue.BindOk
        response from RabbitMQ. Since we know we're now setup and bound, it's
        time to start publishing."""
        LOGGER.info('Queue bound')
        self.start_publishing()

    def start_publishing(self):
        """This method will enable delivery confirmations and schedule the
        first message to be sent to RabbitMQ
        """
        LOGGER.info('Issuing consumer related RPC commands')
        self.enable_delivery_confirmations()
        self.schedule_next_message()

    def enable_delivery_confirmations(self):
        """Send the Confirm.Select RPC method to RabbitMQ to enable delivery
        confirmations on the channel. The only way to turn this off is to close
        the channel and create a new one.
        When the message is confirmed from RabbitMQ, the
        on_delivery_confirmation method will be invoked passing in a Basic.Ack
        or Basic.Nack method from RabbitMQ that will indicate which messages it
        is confirming or rejecting.
        """
        LOGGER.info('Issuing Confirm.Select RPC command')
        self._channel.confirm_delivery(self.on_delivery_confirmation)

    def on_delivery_confirmation(self, method_frame):
        """Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
        command, passing in either a Basic.Ack or Basic.Nack frame with
        the delivery tag of the message that was published. The delivery tag
        is an integer counter indicating the message number that was sent
        on the channel via Basic.Publish. Here we're just doing house keeping
        to keep track of stats and remove message numbers that we expect
        a delivery confirmation of from the list used to keep track of messages
        that are pending confirmation.
        :param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame
        """
        LOGGER.info('Received method_frame: {}'.format(method_frame))
        confirmation_type = method_frame.method.NAME.split('.')[1].lower()
        LOGGER.info('Received %s for delivery tag: %i', confirmation_type,
                    method_frame.method.delivery_tag)
        if confirmation_type == 'ack':
            self._acked += 1
        elif confirmation_type == 'nack':
            self._nacked += 1
        self._deliveries.remove(method_frame.method.delivery_tag)
        LOGGER.info(
            'Published %i messages, %i have yet to be confirmed, '
            '%i were acked and %i were nacked', self._message_number,
            len(self._deliveries), self._acked, self._nacked)

    def schedule_next_message(self):
        """If we are not closing our connection to RabbitMQ, schedule another
        message to be delivered in PUBLISH_INTERVAL seconds.
        """
        LOGGER.info('Scheduling next message for %0.1f seconds',
                    self.PUBLISH_INTERVAL)
        self._connection.ioloop.call_later(self.PUBLISH_INTERVAL,
                                           self.publish_message)

    def publish_message(self):
        """If the class is not stopping, publish a message to RabbitMQ,
        appending a list of deliveries with the message number that was sent.
        This list will be used to check for delivery confirmations in the
        on_delivery_confirmations method.
        Once the message has been sent, schedule another message to be sent.
        The main reason I put scheduling in was just so you can get a good idea
        of how the process is flowing by slowing down and speeding up the
        delivery intervals by changing the PUBLISH_INTERVAL constant in the
        class.
        """
        if self._channel is None or not self._channel.is_open:
            return

        hdrs = {u'مفتاح': u' قيمة', u'键': u'值', u'キー': u'値'}
        properties = pika.BasicProperties(
            app_id='example-publisher',
            content_type='application/json',
            headers=hdrs,
            delivery_mode=2,
        )

        message = u'مفتاح قيمة 键 值 キー 値'
        self._channel.basic_publish(
            self.EXCHANGE, 
            self.ROUTING_KEY,
            json.dumps(message, ensure_ascii=False),
            properties,
            mandatory=True,
        )

        self._message_number += 1
        self._deliveries.append(self._message_number)
        LOGGER.info('Published message # %i', self._message_number)
        self.schedule_next_message()

    def run(self):
        """Run the example code by connecting and then starting the IOLoop.
        """
        while not self._stopping:
            self._connection = None
            self._deliveries = []
            self._acked = 0
            self._nacked = 0
            self._message_number = 0

            try:
                self._connection = self.connect()
                self._connection.ioloop.start()
            except KeyboardInterrupt:
                self.stop()
                if (self._connection is not None and
                        not self._connection.is_closed):
                    # Finish closing
                    self._connection.ioloop.start()

        LOGGER.info('Stopped')

    def stop(self):
        """Stop the example by closing the channel and connection. We
        set a flag here so that we stop scheduling new messages to be
        published. The IOLoop is started because this method is
        invoked by the Try/Catch below when KeyboardInterrupt is caught.
        Starting the IOLoop again will allow the publisher to cleanly
        disconnect from RabbitMQ.
        """
        LOGGER.info('Stopping')
        self._stopping = True
        self.close_channel()
        self.close_connection()

    def close_channel(self):
        """Invoke this command to close the channel with RabbitMQ by sending
        the Channel.Close RPC command.
        """
        if self._channel is not None:
            LOGGER.info('Closing the channel')
            self._channel.close()

    def close_connection(self):
        """This method closes the connection to RabbitMQ."""
        if self._connection is not None:
            LOGGER.info('Closing connection')
            self._connection.close()

def main():
    #logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
    logging.basicConfig(level=logging_level, format=LOG_FORMAT)

    # Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F)
    example = ExamplePublisher(
        'amqp://' + username + ':' + password + '@localhost:5672/%2F?connection_attempts=3&heartbeat=3600'
    )
    example.run()

if __name__ == '__main__':
    main()

1. Asynchrone : Terminer rabbitmq

Lorsque cela se produit, le programme se termine (j'ai supprimé les tracebacks). Bon.

...
2022-03-18 13:44:49,693 INFO     [on_delivery_confirmation              253] Published 6 messages, 0 have yet to be confirmed, 6 were acked and 0 were nacked
2022-03-18 13:44:50,680 INFO     [publish_message                       299] Published message # 7
2022-03-18 13:44:50,681 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:44:50,689 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 7
2022-03-18 13:44:50,689 INFO     [on_delivery_confirmation              253] Published 7 messages, 0 have yet to be confirmed, 7 were acked and 0 were nacked
2022-03-18 13:44:51,203 INFO     [abort                                 731] Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO     [_initiate_abort                       904] _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO     [_deactivate                           869] Deactivating transport: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO     [_on_stream_terminated                1996] AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=None; pending-error=ConnectionClosedByBroker: (320) "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
2022-03-18 13:44:51,204 WARNING  [on_channel_closed                     146] Channel 1 was closed: (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
2022-03-18 13:44:51,204 ERROR    [close                                1282] Illegal close(200, 'Normal shutdown') request on <SelectConnection CLOSED transport=None params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>> because it was called while connection state=CLOSED.
2022-03-18 13:44:51,204 ERROR    [process                               235] Calling <bound method ExamplePublisher.on_channel_closed of <__main__.ExamplePublisher object at 0x7efed2c60a90>> for "1:_on_channel_close" failed
2022-03-18 13:44:51,205 INFO     [_close_and_finalize                   882] Closing transport socket and unlinking: state=3; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,205 ERROR    [log_exception_func_wrap                55] Wrapped func exited with exception. Caller's stack:
...

2. Asynchrone : Supprimer l'échange

Ce que nous voyons ici est que la connexion est fermée et redémarrée après 5 secondes. Bien.

...
2022-03-18 13:43:29,755 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 31
2022-03-18 13:43:29,755 INFO     [on_delivery_confirmation              253] Published 31 messages, 0 have yet to be confirmed, 31 were acked and 0 were nacked
2022-03-18 13:43:30,753 INFO     [publish_message                       299] Published message # 32
2022-03-18 13:43:30,753 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:30,755 WARNING  [_on_close_from_broker                1070] Received remote Channel.Close (404): "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'" on <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a602bf40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>>
2022-03-18 13:43:30,755 WARNING  [on_channel_closed                     146] Channel 1 was closed: (404, "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'")
2022-03-18 13:43:30,755 INFO     [close                                1295] Closing connection (200): 'Normal shutdown'
2022-03-18 13:43:30,755 INFO     [close                                1322] Connection.close is waiting for 1 channels to close: <SelectConnection CLOSING transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a602bf40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:30,757 INFO     [abort                                 731] Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,757 INFO     [_initiate_abort                       904] _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,757 INFO     [_deactivate                           869] Deactivating transport: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,758 INFO     [_on_stream_terminated                1996] AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=None; pending-error=ConnectionClosedByClient: (200) 'Normal shutdown'
2022-03-18 13:43:30,758 INFO     [_on_stream_terminated                2065] Stack terminated due to ConnectionClosedByClient: (200) 'Normal shutdown'
2022-03-18 13:43:30,758 WARNING  [on_connection_closed                  106] Connection closed, reopening in 5 seconds: (200, 'Normal shutdown')
2022-03-18 13:43:30,758 INFO     [_close_and_finalize                   882] Closing transport socket and unlinking: state=3; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:35,763 INFO     [connect                                69] Connecting to amqp://user:password@localhost:5672/%2F?connection_attempts=3&heartbeat=3600
2022-03-18 13:43:35,764 INFO     [start                                 179] Pika version 1.2.0 connecting to ('127.0.0.1', 5672)
2022-03-18 13:43:35,764 INFO     [_on_writable                          345] Socket connected: <socket.socket fd=8, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44168), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:35,764 INFO     [_on_transport_establishment_done      428] Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>).
2022-03-18 13:43:35,766 INFO     [on_connection_open                     82] Connection opened
2022-03-18 13:43:35,766 INFO     [open_channel                          116] Creating a new channel
2022-03-18 13:43:35,767 INFO     [_report_completion_and_cleanup        293] AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:35,767 INFO     [_report_completion_and_cleanup        725] AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:35,767 INFO     [on_channel_open                       125] Channel opened
2022-03-18 13:43:35,767 INFO     [add_on_channel_close_callback         134] Adding channel close callback
2022-03-18 13:43:35,767 INFO     [setup_exchange                        157] Declaring exchange my_test_exchange
2022-03-18 13:43:35,768 INFO     [on_exchange_declareok                 173] Exchange declared: my_test_exchange
2022-03-18 13:43:35,768 INFO     [setup_queue                           182] Declaring queue my_test_queue
2022-03-18 13:43:35,769 INFO     [on_queue_declareok                    199] Binding my_test_exchange to my_test_queue with my_test_queue
2022-03-18 13:43:35,769 INFO     [on_bindok                             211] Queue bound
2022-03-18 13:43:35,769 INFO     [start_publishing                      218] Issuing consumer related RPC commands
2022-03-18 13:43:35,769 INFO     [enable_delivery_confirmations         231] Issuing Confirm.Select RPC command
2022-03-18 13:43:35,769 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:36,771 INFO     [publish_message                       299] Published message # 1
2022-03-18 13:43:36,771 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:36,785 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 1
2022-03-18 13:43:36,785 INFO     [on_delivery_confirmation              253] Published 1 messages, 0 have yet to be confirmed, 1 were acked and 0 were nacked
2022-03-18 13:43:37,773 INFO     [publish_message                       299] Published message # 2---
...

3. Asynchrone : Supprimer la file d'attente

Rien ne se passe lorsque nous supprimons la file d'attente. Pas bon ! Pourquoi cela n'est-il pas détecté ?

...
2022-03-18 13:46:42,361 INFO     [publish_message                       299] Published message # 16
2022-03-18 13:46:42,361 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:42,363 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 16
2022-03-18 13:46:42,363 INFO     [on_delivery_confirmation              253] Published 16 messages, 0 have yet to be confirmed, 16 were acked and 0 were nacked
2022-03-18 13:46:43,363 INFO     [publish_message                       299] Published message # 17
2022-03-18 13:46:43,363 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:43,364 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 17
2022-03-18 13:46:43,364 INFO     [on_delivery_confirmation              253] Published 17 messages, 0 have yet to be confirmed, 17 were acked and 0 were nacked
2022-03-18 13:46:44,365 INFO     [publish_message                       299] Published message # 18
2022-03-18 13:46:44,366 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:44,367 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 18
2022-03-18 13:46:44,367 INFO     [on_delivery_confirmation              253] Published 18 messages, 0 have yet to be confirmed, 18 were acked and 0 were nacked
2022-03-18 13:46:45,367 INFO     [publish_message                       299] Published message # 19
2022-03-18 13:46:45,367 INFO     [schedule_next_message                 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:45,368 INFO     [on_delivery_confirmation              246] Received ack for delivery tag: 19
2022-03-18 13:46:45,368 INFO     [on_delivery_confirmation              253] Published 19 messages, 0 have yet to be confirmed, 19 were acked and 0 were nacked
2022-03-18 13:46:46,368 INFO     [publish_message                       299] Published message # 20
...

Publication asynchrone : basic.ack et basic.nack ne sont pas suffisants

Extrait de la documentation :

Quand les messages publiés seront-ils confirmés par le courtier ?

Pour les messages non acheminables, le courtier émet une confirmation lorsque l'échange vérifie qu'un message ne sera acheminé vers aucune file d'attente (il renvoie une liste vide de files d'attente). Si le message est également publié comme obligatoire, le basic.return est envoyé au client avant le basic.ack. Il en va de même pour les accusés de réception négatifs (basic.nack)".

Cela signifie que nous devons capturer le basic.return dans notre script. J'ai déjà ajouté les lignes au code ci-dessus qui s'occupent de cela. Pour les activer, changez cette ligne :

# # self.add_on_channel_return_callback()

à :

self.add_on_channel_return_callback()

Si vous exécutez à nouveau le script, vous remarquerez qu'après avoir supprimé la file d'attente, le script essaie de redémarrer la connexion. La méthode on_channel_returned(), renvoie le message, avant que basic.ack ne soit reçu.

Comme le basic.return est envoyé au client avant ( !) le basic.ack, le message sera réessayé.

Résumé

Testez toujours toutes les conditions avant d'utiliser le code d'exemple. Je n'ai fait que trois tests mais vous pouvez en faire beaucoup plus, par exemple vérifier si l'échange et la queue sont toujours présents après un redémarrage de RabbitMQ. Et vérifier si les messages sont toujours dans la file d'attente après un redémarrage de RabbitMQ.

En outre, la publication asynchrone, sans perte de données, c'est-à-dire avec confirmation, introduit beaucoup de complexité. Le débit peut augmenter de façon spectaculaire, mais cela dépend aussi beaucoup de l'architecture de votre application. Avez-vous vraiment besoin de la publication asynchrone ?

Liens / crédits

Notify consumer when a queue is deleted on rabbitmq
https://stackoverflow.com/questions/15527226/notify-consumer-when-a-queue-is-deleted-on-rabbitmq

Part 1: RabbitMQ Best Practices
https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html

Publishing Throughput - Asynchronous vs Synchronous
https://www.cloudamqp.com/blog/publishing-throughput-asynchronous-vs-synchronous.html

RabbitMQ - AMQP 0-9-1 Complete Reference Guide
https://www.rabbitmq.com/amqp-0-9-1-reference.html

RabbitMQ - Publisher Confirms
https://www.rabbitmq.com/confirms.html

RabbitMQ - Reliability Guide
https://www.rabbitmq.com/reliability.html

Laissez un commentaire

Commentez anonymement ou connectez-vous pour commenter.

Commentaires (1)

Laissez une réponse

Répondez de manière anonyme ou connectez-vous pour répondre.

avatar

Exceptional blog - TYVM. I'm new to the pika library and set up a work queue and multiple producers and consumers. It worked just as expected right until the connection to my RabbitMQ server dropped out. All the consumers threw exceptions, but the producers? They happily posted their messages into the ether, never to be seen again.
Which is how I ended up here, in your comments.
The async test code - the default messages and headers are intriguing