angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Sending messages to Slack using chat_postMessage

Writing a wrapper class is not a bad way to learn the ins-and-outs of a Python package.

23 March 2023 Updated 24 March 2023
In
post main image
https://www.pexels.com/nl-nl/@thngocbich/

For a project I was already sending messages by email, but now I also wanted to send messages to Slack. Of course, we are using the Python Slack SDK.

The documentation can be found on the page: Python Slack SDK - Web Client. In this post I create a simple SlackAPI class with its own SlackError exception class.

Create and configure a new Slack App

We will be sending our messages to a Slack Channel. First we must create a Slack App, set permissions (Scopes: 'chat:write'), and add it to our Slack Channel. At the end we have the following information:

  • Slack Bot Token
  • Slack Channel (Id or name)
  • Slack User (optional)

Refer to the Slack documentation on how to do this.

Slack and rate limiting

Slack can be limiting the rate of our messages. If we send too much messages within a short time, it will show the alert message:

Due to a high volume of activity, we are not displaying some messages sent by this application.

In our code, a SlackApiError with error 'ratelimited' is raised. The status_code will be 429 and 'retry_after' gives the seconds when to try again:

    try:
        ...
    except SlackApiError as e:
        error_message=e.response.get('error', None),
        status_code=e.response.status_code,
        retry_after=e.response.headers.get('Retry-After', None),

Fortunately, the 'Python Slack SDK' comes with a RetryHandler that does the retries automatically for us when status_code 429 is returned. By adding this to our code, we do not have to handle these retries ourselves!

Important: By default, the RetryHandler adds (!) one retry. This means that if you initialize the WebClient like this:

    client = WebClient(
        timeout=10.
    )
    client.chat_postMessage(...)

Then the chat_postMessage will return after twice the specified timeout: 20 seconds. You cannot change this by calling the RetryHandler with the parameter:

max_retry_count

At the moment I did not figure out how we can make the timeout for the entire operation.

The SlackError exception class

In our SlackAPI, we distinghuish between:

  • Permanent errors
  • Temporary errors

Permanent errors are for example a bad or invalid Slack Bot Token, Slack Channel. In this case we should not retry sending this message. An example of a temporary error is when the connection to Slack is down. In this case we must retry sending this message after some time. We must also retry sending when timeout occurs. Note that in case of a timeout we may end up sending a message twice. This happens when the server response occurs at the same time of the timeout in the client.

The Slack SDK has it's own error class, SlackApiError. Some errors are:

- invalid_auth        invalid bot_token
- channel_not_found   unknown channel
- no_text             text is missing, e.g. text=None
- invalid_arguments   e.g. channel=None
- ratelimited         when you send too many messages in s short time

If we use the RetryHandler, see above, the ratelimited error does not occur. We can get the error data from the error object like this:

e.response.status_code
e.response.error
e.response.headers

There is another error that can occur here. This is when the URL to the Slack API is not correct (not found). This will raise a HTTP 404 error. The returned error data is different from the 'normal' error data. In this case, the error object returns a dictionary(!):

e.response['status']
e.response['headers']
e.response['body']

The two other, temporary errors, are:

  • connection_error
  • timeout_error

We create a SlackError exception class that returns all the permanent and temporary errors, meaning our application only needs to capture SlackError errors.

Adding our SlackResponse object

We are not really interested in all kinds of data, we just want to know what happened and if we must retry sending. We can return a dictionary, but to make access more clean, we use a namedtuple here:

ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after')

We return the same SlackResponse object for both cases no errors or errors. In case of errors, we get the SlackResponse object from e.args[0].

The code

Here is the code if you want to try this. First set at least the following parameters:

SLACK_BOT_TOKEN
SLACK_CHANNEL

Slack Channel can be the Id or the name of the channel. If you want to observe the rate limiting feature, you can set:

number_of_messages = 100

The code:

import collections
import copy
import datetime
import logging
import os
import sys
import time    

from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.http_retry.builtin_handlers import RateLimitErrorRetryHandler

class SlackError(Exception):
    pass
    
class SlackApi:
    def __init__(
        self, 
        config=None,
        logger=None,
        base_url=None,
        slack_bot_token=None,
        timeout=None,
        use_rate_limit_handler=True
    ):
        self.config = config
        self.logger = logger
        self.base_url = base_url
        self.slack_bot_token = slack_bot_token
        self.timeout = timeout or 5.
        self.use_rate_limit_handler = use_rate_limit_handler
        web_client_params = {
            'token': self.slack_bot_token,
            'timeout': self.timeout
        }
        if self.base_url is not None:
            web_client_params['base_url'] = self.base_url
        self.client = WebClient(**web_client_params)
        if use_rate_limit_handler:
            # this handler does retries when HTTP status 429 is returned
            rate_limit_handler = RateLimitErrorRetryHandler(
                max_retry_count=1
            )
            self.client.retry_handlers.append(rate_limit_handler)

    def make_response(self, has_errors=False, error_message=None, status_code=None, retry_after=None):
        request_msecs = int((datetime.datetime.now() - self.ts).total_seconds() * 1000)
        ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after request_msecs')
        return ResponseObj(has_errors, error_message, status_code, retry_after, request_msecs)

    def chat_post_message(self, channel, text=None, blocks=None, user=None):
        self.ts = datetime.datetime.now()
        # change empty string user to None
        if user is not None:
            user = user.strip()
            if user == '':
                user = None
        try:
            response = self.client.chat_postMessage(
                channel=channel,
                text=text,
                blocks=blocks,
                user=user,
            )
            return self.make_response(
                status_code=response.status_code,
            )
        except SlackApiError as e:
            # first check for http errors
            status = e.response.get('status', None)
            self.logger.debug('status = {}'.format(status))
            if status is not None:
                status_code = status
                error_message = 'http_' + str(status)
                headers = e.response['headers']
            else:
                status_code = e.response.get('status_code', None)
                error_message = e.response.get('error', None)
                headers = e.response.headers
            raise SlackError(self.make_response(
                has_errors=True,
                status_code=status_code,
                error_message=error_message,
                retry_after=headers.get('Retry-After', None),
            ))
        except Exception as e:
            exception = type(e).__name__
            if exception == 'TimeoutError':
                error_message='timeout_error'
            else:
                # all other errors are considered connection errors
                error_message='connection_error'
            raise SlackError(self.make_response(
                has_errors=True,
                error_message=error_message,
            ))

# the code below is typically in another file

# Example from:
# Stacking multiple blocks
# https://api.slack.com/messaging/composing/layouts
blocks_example = [
    {
        "type": "header",
        "text": {
            "type": "plain_text",
            "text": "New request"
        }
    },
    {
        "type": "section",
        "fields": [
            {
                "type": "mrkdwn",
                "text": "*Type:*\nPaid Time Off"
            },
            {
                "type": "mrkdwn",
                "text": "*Created by:*\n<example.com|Fred Enriquez>"
            }
        ]
    },
    {
        "type": "section",
        "fields": [
            {
                "type": "mrkdwn",
                "text": "*When:*\nAug 10 - Aug 13"
            }
        ]
    },
    {
        "type": "section",
        "text": {
            "type": "mrkdwn",
            "text": "<https://example.com|View request>"
        }
    }
]


def get_logger(logger_name=None, logger_level=logging.DEBUG):
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    # console
    console_handler = logging.StreamHandler()
    console_logger_format = '%(levelname)-8.8s [%(filename)-15s%(funcName)15s():%(lineno)03s] %(message)s'
    console_handler.setFormatter(logging.Formatter(console_logger_format))
    console_handler.setLevel(logger_level)
    logger.addHandler(console_handler)
    return logger


config = {
    'SLACK_BOT_TOKEN': '',
    'SLACK_CHANNEL': '',
    'SLACK_USER': None
}

def main():
    logger = get_logger(logger_name='slack_api')

    # your bot token and channel
    slack_bot_token = config.get('SLACK_BOT_TOKEN')
    slack_channel = config.get('SLACK_CHANNEL')
    slack_user = config.get('SLACK_USER')

    # create 'messages table'
    number_of_messages = 100
    #number_of_messages = 1
    messages = []
    for i in range(number_of_messages):
        # deepcopy the dictionary here!
        blocks = copy.deepcopy(blocks_example)
        # patch message number
        blocks[0]['text']['text'] = "New request" + ': {:03d}'.format(i)
        messages.append({
            'slack_bot_token': slack_bot_token,
            'channel': slack_channel,
            'user': slack_user,
            'text': 'message {:03d}: this is a test message'.format(i),
            'blocks': blocks,
            # delivery status
            'is_delivery_completed': False,
            'is_delivered': False,
            'delivery_tries': 0,
            'last_delivery_error': None,
        })

    # send the messages
    for i, message in enumerate(messages):
        if message['is_delivered']:
            continue
        logger.debug('sending message {} ...'.format(i))
        slack_api = SlackApi(
            logger=logger,
            slack_bot_token=message.get('slack_bot_token'),
            #timeout=0.1,
        )
        try:
            r = slack_api.chat_post_message(
                channel=message.get('channel'),
                text=message.get('text'),
                blocks=message.get('blocks'),
                user=message.get('user'),
            )
            logger.debug('success sending message[{}]: r = {}'.format(i, r))
            # delivery status
            message['is_delivery_completed'] = True
            message['is_delivered'] = True
            message['delivery_tries'] += 1
        except SlackError as e:
            #logger.exception('error sending message[{}]'.format(i))
            r = e.args[0]
            logger.debug('problem sending message[{}]: r = {}'.format(i, r))
            message['delivery_tries'] += 1
            message['last_delivery_error'] = r.error_message
            if r.error_message in ['timeout_error', 'connection_error']:
                # delivery status, must retry
                message['is_delivery_completed'] = False
                message['is_delivered'] = False
            else:
                # delivery status, done
                message['is_delivery_completed'] = True
                message['is_delivered'] = False


if __name__ == '__main__':
    main()

Summary

First we have to figure our how to create a Slack App, give it the proper permissions, and add it to a Slack Channel. To send messages to Slack we use the Slack SDK.  The RetryHandler takes care of the rate limiting feature of Slack. It makes our code much more compact.  We wrapped the Slack SDK into our custom SlackApi class. We also added our own SlackError class which handles all (!) errors that can occur, including timeouts and connection errors.

Our application now can create the messages and blocks and send them to Slack using our SlackApi.

The nice thing of writing a wrapper class, is that we have go through all possible parameters and responses. That's one way (and not such a bad one) to learn the ins-and-outs of a Python package.

Links / credits

Python Slack SDK - RetryHandler
https://slack.dev/python-slack-sdk/web/index.html#retryhandler

Python Slack SDK - Web Client
https://slack.dev/python-slack-sdk/web/index.html

Slack - Package slack_sdk
https://slack.dev/python-slack-sdk/api-docs/slack_sdk

Slack - Rate Limits
https://api.slack.com/docs/rate-limits

Slack API - Python Slack SDK
https://slack.dev/python-slack-sdk

Using the Slack Web API
https://api.slack.com/web

Read more

Slack

Leave a comment

Comment anonymously or log in to comment.

Comments

Leave a reply

Reply anonymously or log in to reply.