Sending messages to Slack using chat_postMessage
Writing a wrapper class is not a bad way to learn the ins-and-outs of a Python package.

For a project I was already sending messages by email, but now I also wanted to send messages to Slack. Of course, we are using the Python Slack SDK.
The documentation can be found on the page: Python Slack SDK - Web Client. In this post I create a simple SlackAPI class with its own SlackError exception class.
Create and configure a new Slack App
We will be sending our messages to a Slack Channel. First we must create a Slack App, set permissions (Scopes: 'chat:write'), and add it to our Slack Channel. At the end we have the following information:
- Slack Bot Token
- Slack Channel (Id or name)
- Slack User (optional)
Refer to the Slack documentation on how to do this.
Slack and rate limiting
Slack can be limiting the rate of our messages. If we send too much messages within a short time, it will show the alert message:
Due to a high volume of activity, we are not displaying some messages sent by this application.
In our code, a SlackApiError with error 'ratelimited' is raised. The status_code will be 429 and 'retry_after' gives the seconds when to try again:
try:
...
except SlackApiError as e:
error_message=e.response.get('error', None),
status_code=e.response.status_code,
retry_after=e.response.headers.get('Retry-After', None),
Fortunately, the 'Python Slack SDK' comes with a RetryHandler that does the retries automatically for us when status_code 429 is returned. By adding this to our code, we do not have to handle these retries ourselves!
Important: By default, the RetryHandler adds (!) one retry. This means that if you initialize the WebClient like this:
client = WebClient(
timeout=10.
)
client.chat_postMessage(...)
Then the chat_postMessage will return after twice the specified timeout: 20 seconds. You cannot change this by calling the RetryHandler with the parameter:
max_retry_count
At the moment I did not figure out how we can make the timeout for the entire operation.
The SlackError exception class
In our SlackAPI, we distinghuish between:
- Permanent errors
- Temporary errors
Permanent errors are for example a bad or invalid Slack Bot Token, Slack Channel. In this case we should not retry sending this message. An example of a temporary error is when the connection to Slack is down. In this case we must retry sending this message after some time. We must also retry sending when timeout occurs. Note that in case of a timeout we may end up sending a message twice. This happens when the server response occurs at the same time of the timeout in the client.
The Slack SDK has it's own error class, SlackApiError. Some errors are:
- invalid_auth invalid bot_token
- channel_not_found unknown channel
- no_text text is missing, e.g. text=None
- invalid_arguments e.g. channel=None
- ratelimited when you send too many messages in s short time
If we use the RetryHandler, see above, the ratelimited error does not occur. We can get the error data from the error object like this:
e.response.status_code
e.response.error
e.response.headers
There is another error that can occur here. This is when the URL to the Slack API is not correct (not found). This will raise a HTTP 404 error. The returned error data is different from the 'normal' error data. In this case, the error object returns a dictionary(!):
e.response['status']
e.response['headers']
e.response['body']
The two other, temporary errors, are:
- connection_error
- timeout_error
We create a SlackError exception class that returns all the permanent and temporary errors, meaning our application only needs to capture SlackError errors.
Adding our SlackResponse object
We are not really interested in all kinds of data, we just want to know what happened and if we must retry sending. We can return a dictionary, but to make access more clean, we use a namedtuple here:
ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after')
We return the same SlackResponse object for both cases no errors or errors. In case of errors, we get the SlackResponse object from e.args[0].
The code
Here is the code if you want to try this. First set at least the following parameters:
SLACK_BOT_TOKEN
SLACK_CHANNEL
Slack Channel can be the Id or the name of the channel. If you want to observe the rate limiting feature, you can set:
number_of_messages = 100
The code:
import collections
import copy
import datetime
import logging
import os
import sys
import time
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.http_retry.builtin_handlers import RateLimitErrorRetryHandler
class SlackError(Exception):
pass
class SlackApi:
def __init__(
self,
config=None,
logger=None,
base_url=None,
slack_bot_token=None,
timeout=None,
use_rate_limit_handler=True
):
self.config = config
self.logger = logger
self.base_url = base_url
self.slack_bot_token = slack_bot_token
self.timeout = timeout or 5.
self.use_rate_limit_handler = use_rate_limit_handler
web_client_params = {
'token': self.slack_bot_token,
'timeout': self.timeout
}
if self.base_url is not None:
web_client_params['base_url'] = self.base_url
self.client = WebClient(**web_client_params)
if use_rate_limit_handler:
# this handler does retries when HTTP status 429 is returned
rate_limit_handler = RateLimitErrorRetryHandler(
max_retry_count=1
)
self.client.retry_handlers.append(rate_limit_handler)
def make_response(self, has_errors=False, error_message=None, status_code=None, retry_after=None):
request_msecs = int((datetime.datetime.now() - self.ts).total_seconds() * 1000)
ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after request_msecs')
return ResponseObj(has_errors, error_message, status_code, retry_after, request_msecs)
def chat_post_message(self, channel, text=None, blocks=None, user=None):
self.ts = datetime.datetime.now()
# change empty string user to None
if user is not None:
user = user.strip()
if user == '':
user = None
try:
response = self.client.chat_postMessage(
channel=channel,
text=text,
blocks=blocks,
user=user,
)
return self.make_response(
status_code=response.status_code,
)
except SlackApiError as e:
# first check for http errors
status = e.response.get('status', None)
self.logger.debug('status = {}'.format(status))
if status is not None:
status_code = status
error_message = 'http_' + str(status)
headers = e.response['headers']
else:
status_code = e.response.get('status_code', None)
error_message = e.response.get('error', None)
headers = e.response.headers
raise SlackError(self.make_response(
has_errors=True,
status_code=status_code,
error_message=error_message,
retry_after=headers.get('Retry-After', None),
))
except Exception as e:
exception = type(e).__name__
if exception == 'TimeoutError':
error_message='timeout_error'
else:
# all other errors are considered connection errors
error_message='connection_error'
raise SlackError(self.make_response(
has_errors=True,
error_message=error_message,
))
# the code below is typically in another file
# Example from:
# Stacking multiple blocks
# https://api.slack.com/messaging/composing/layouts
blocks_example = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "New request"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Type:*\nPaid Time Off"
},
{
"type": "mrkdwn",
"text": "*Created by:*\n<example.com|Fred Enriquez>"
}
]
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*When:*\nAug 10 - Aug 13"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "<https://example.com|View request>"
}
}
]
def get_logger(logger_name=None, logger_level=logging.DEBUG):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
# console
console_handler = logging.StreamHandler()
console_logger_format = '%(levelname)-8.8s [%(filename)-15s%(funcName)15s():%(lineno)03s] %(message)s'
console_handler.setFormatter(logging.Formatter(console_logger_format))
console_handler.setLevel(logger_level)
logger.addHandler(console_handler)
return logger
config = {
'SLACK_BOT_TOKEN': '',
'SLACK_CHANNEL': '',
'SLACK_USER': None
}
def main():
logger = get_logger(logger_name='slack_api')
# your bot token and channel
slack_bot_token = config.get('SLACK_BOT_TOKEN')
slack_channel = config.get('SLACK_CHANNEL')
slack_user = config.get('SLACK_USER')
# create 'messages table'
number_of_messages = 100
#number_of_messages = 1
messages = []
for i in range(number_of_messages):
# deepcopy the dictionary here!
blocks = copy.deepcopy(blocks_example)
# patch message number
blocks[0]['text']['text'] = "New request" + ': {:03d}'.format(i)
messages.append({
'slack_bot_token': slack_bot_token,
'channel': slack_channel,
'user': slack_user,
'text': 'message {:03d}: this is a test message'.format(i),
'blocks': blocks,
# delivery status
'is_delivery_completed': False,
'is_delivered': False,
'delivery_tries': 0,
'last_delivery_error': None,
})
# send the messages
for i, message in enumerate(messages):
if message['is_delivered']:
continue
logger.debug('sending message {} ...'.format(i))
slack_api = SlackApi(
logger=logger,
slack_bot_token=message.get('slack_bot_token'),
#timeout=0.1,
)
try:
r = slack_api.chat_post_message(
channel=message.get('channel'),
text=message.get('text'),
blocks=message.get('blocks'),
user=message.get('user'),
)
logger.debug('success sending message[{}]: r = {}'.format(i, r))
# delivery status
message['is_delivery_completed'] = True
message['is_delivered'] = True
message['delivery_tries'] += 1
except SlackError as e:
#logger.exception('error sending message[{}]'.format(i))
r = e.args[0]
logger.debug('problem sending message[{}]: r = {}'.format(i, r))
message['delivery_tries'] += 1
message['last_delivery_error'] = r.error_message
if r.error_message in ['timeout_error', 'connection_error']:
# delivery status, must retry
message['is_delivery_completed'] = False
message['is_delivered'] = False
else:
# delivery status, done
message['is_delivery_completed'] = True
message['is_delivered'] = False
if __name__ == '__main__':
main()
Summary
First we have to figure our how to create a Slack App, give it the proper permissions, and add it to a Slack Channel. To send messages to Slack we use the Slack SDK. The RetryHandler takes care of the rate limiting feature of Slack. It makes our code much more compact. We wrapped the Slack SDK into our custom SlackApi class. We also added our own SlackError class which handles all (!) errors that can occur, including timeouts and connection errors.
Our application now can create the messages and blocks and send them to Slack using our SlackApi.
The nice thing of writing a wrapper class, is that we have go through all possible parameters and responses. That's one way (and not such a bad one) to learn the ins-and-outs of a Python package.
Links / credits
Python Slack SDK - RetryHandler
https://slack.dev/python-slack-sdk/web/index.html#retryhandler
Python Slack SDK - Web Client
https://slack.dev/python-slack-sdk/web/index.html
Slack - Package slack_sdk
https://slack.dev/python-slack-sdk/api-docs/slack_sdk
Slack - Rate Limits
https://api.slack.com/docs/rate-limits
Slack API - Python Slack SDK
https://slack.dev/python-slack-sdk
Using the Slack Web API
https://api.slack.com/web
Read more
Slack
Most viewed
- Using PyInstaller and Cython to create a Python executable
- Using Python's pyOpenSSL to verify SSL certificates downloaded from a host
- Connect to a service on a Docker host from a Docker container
- Using UUIDs instead of Integer Autoincrement Primary Keys with SQLAlchemy and MariaDb
- Basic job scheduling with APScheduler
- SQLAlchemy: Using Cascade Deletes to delete related objects